Skip to content

Commit b80581f

Browse files
committed
[Messages] Sync job
Added a new job that synchronizes unconfirmed messages across the network. The goal of this job is to re-send messages missed by nodes with the ability to push data on-chain. This can happen because of various issues like server downtimes or bugs. This job works in three parts: * the publisher task periodically sends the list of all the messages older than the last TX block that have yet to be confirmed by on-chain data. * the receiver task stores the list of unconfirmed messages for each peer detected on the network. * the sync/aggregator task aggregates the confirmation data from all the nodes and fetches messages using the HTTP API. These messages are added to the pending message queue. This solution is less expensive that constantly sharing all the messages across all the nodes and guarantees that the network will be synchronized eventually as long as the on-chain data synchronization jobs are working. With the current implementation, a message can remain out of sync at a maximum until a new TX is published on-chain + the job period (5 minutes currently).
1 parent 22d9485 commit b80581f

19 files changed

+618
-25
lines changed

setup.cfg

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ testing =
9494
pytest-aiohttp
9595
pytest-asyncio
9696
pytest-mock
97+
types-pytz
9798
types-requests
9899
types-setuptools
99100
nuls2 =

src/aleph/config.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ def get_defaults():
7676
"gateway_port": 8080,
7777
"id": None,
7878
"alive_topic": "ALEPH_ALIVE",
79+
"sync_topic": "ALEPH_SYNC",
7980
"reconnect_delay": 60,
8081
"peers": [
8182
"/dnsaddr/api1.aleph.im/ipfs/12D3KooWNgogVS6o8fVsPdzh2FJpCdJJLVSgJT38XGE1BJoCerHx",

src/aleph/jobs/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
from aleph.jobs.process_pending_messages import pending_messages_subprocess, retry_messages_task
66
from aleph.jobs.process_pending_txs import pending_txs_subprocess, handle_txs_task
7+
from aleph.jobs.sync_unconfirmed_messages import sync_unconfirmed_messages_subprocess
78
from aleph.jobs.reconnect_ipfs import reconnect_ipfs_job
89

910
LOGGER = logging.getLogger("jobs")
@@ -32,8 +33,13 @@ def start_jobs(
3233
target=pending_txs_subprocess,
3334
args=(config_values, api_servers),
3435
)
36+
sync_unconfirmed_messages_process = Process(
37+
target=sync_unconfirmed_messages_subprocess,
38+
args=(config_values, api_servers),
39+
)
3540
p1.start()
3641
p2.start()
42+
sync_unconfirmed_messages_process.start()
3743
else:
3844
tasks.append(retry_messages_task(shared_stats=shared_stats))
3945
tasks.append(handle_txs_task())

0 commit comments

Comments
 (0)