From 822bafede4cf4f38a0a36856b4148794bb2866f5 Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Tue, 14 Jun 2022 13:44:40 +0200 Subject: [PATCH 1/3] [Messages] No blocking on end of queue (#303) Fixed an issue where the pending message job would block on the final messages in the queue and stop processing newer messages. Once the job finishes the loop on all the messages in the pending message collection, the previous implementation waits until all the message tasks finish. This causes a delay of several hours until the node finishes these tasks and is able to process newer pending messages again. Messages end up being processed, but far later than expected. The issue arises because we never remove messages from the pending queue if we fail to retrieve the associated content. The job then always has messages in the queue, causing the issue. Fixed the issue by allowing the loop to restart without waiting for messages to be processed. We now compute an individual ID for each pending message and add it to a set. The job will simply ignore any message that is already being processed, allowing for newer messages to be taken into account. --- src/aleph/jobs/process_pending_messages.py | 43 ++++++++++++++++++---- 1 file changed, 36 insertions(+), 7 deletions(-) diff --git a/src/aleph/jobs/process_pending_messages.py b/src/aleph/jobs/process_pending_messages.py index 3fbfd779d..31a2816d9 100644 --- a/src/aleph/jobs/process_pending_messages.py +++ b/src/aleph/jobs/process_pending_messages.py @@ -72,6 +72,7 @@ async def process_message_job_results( finished_tasks: Set[asyncio.Task], task_message_dict: Dict[asyncio.Task, Dict], shared_stats: Dict[str, Any], + processing_messages: Set[Tuple], ): await process_job_results( finished_tasks, @@ -84,6 +85,9 @@ async def process_message_job_results( shared_stats["retry_messages_job_tasks"] -= 1 shared_stats["message_jobs"][message_type] -= 1 + pending_message_id = get_pending_message_id(pending) + processing_messages.remove(pending_message_id) + del task_message_dict[message_task] @@ -100,12 +104,26 @@ def validate_pending_message(pending: Dict): ) +def get_pending_message_id(pending_message: Dict) -> Tuple: + source = pending_message.get("source", {}) + chain_name = source.get("chain_name", None) + height = source.get("height", None) + + return ( + pending_message["message"]["item_hash"], + pending_message["message"]["sender"], + chain_name, + height, + ) + + async def process_pending_messages(config: Config, shared_stats: Dict): """ Processes all the messages in the pending message queue. """ seen_ids: Dict[Tuple, int] = dict() + processing_messages = set() find_params: Dict = {} max_concurrent_tasks = config.aleph.jobs.pending_messages.max_concurrency.value @@ -116,21 +134,30 @@ async def process_pending_messages(config: Config, shared_stats: Dict): for message_type in MessageType: shared_stats["message_jobs"][message_type] = 0 + # Using a set is required as asyncio.wait takes and returns sets. + pending_tasks: Set[asyncio.Task] = set() + task_message_dict: Dict[asyncio.Task, Dict] = {} + while await PendingMessage.collection.count_documents(find_params): - # Using a set is required as asyncio.wait takes and returns sets. - pending_tasks: Set[asyncio.Task] = set() - task_message_dict: Dict[asyncio.Task, Dict] = {} async for pending in PendingMessage.collection.find(find_params).sort( [("retries", ASCENDING), ("message.time", ASCENDING)] ).batch_size(max_concurrent_tasks): + # Check if the message is already processing + pending_message_id = get_pending_message_id(pending) + if pending_message_id in processing_messages: + # Skip the message, we're already processing it + continue + + processing_messages.add(pending_message_id) + if len(pending_tasks) == max_concurrent_tasks: finished_tasks, pending_tasks = await asyncio.wait( pending_tasks, return_when=asyncio.FIRST_COMPLETED ) await process_message_job_results( - finished_tasks, task_message_dict, shared_stats + finished_tasks, task_message_dict, shared_stats, processing_messages ) validate_pending_message(pending) @@ -148,13 +175,15 @@ async def process_pending_messages(config: Config, shared_stats: Dict): pending_tasks.add(message_task) task_message_dict[message_task] = pending - # Wait for the last tasks + # This synchronization point is required when a few pending messages remain. + # We wait for at least one task to finish; the remaining tasks will be collected + # on the next iterations of the loop. if pending_tasks: finished_tasks, _ = await asyncio.wait( - pending_tasks, return_when=asyncio.ALL_COMPLETED + pending_tasks, return_when=asyncio.FIRST_COMPLETED ) await process_message_job_results( - finished_tasks, task_message_dict, shared_stats + finished_tasks, task_message_dict, shared_stats, processing_messages ) # TODO: move this to a dedicated job and/or check unicity on insertion From d10cdd0cdaa6a3677fcd808a93fc1e8b5bb72226 Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Tue, 14 Jun 2022 13:58:08 +0200 Subject: [PATCH 2/3] [Release] Bump to 0.3.2 (#304) --- CHANGELOG.rst | 10 ++++++++++ deployment/samples/docker-compose/docker-compose.yml | 2 +- .../samples/docker-monitoring/docker-compose.yml | 2 +- docs/conf.py | 2 +- 4 files changed, 13 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index bb113ff20..079fcb48e 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -2,6 +2,16 @@ Changelog ========= +Version 0.3.2 +============= + +This release fixes a synchronisation delay issue. The pending message job blocked +while waiting for the last pending messages in the queue to be processed. This caused +a delay of several hours until the job could loop again on the pending messages collection +and start processing new pending messages. We removed the blocking synchronisation point +and now let the job pick up new pending messages while ignoring the ones that are already +being processed. + Version 0.3.1 ============= diff --git a/deployment/samples/docker-compose/docker-compose.yml b/deployment/samples/docker-compose/docker-compose.yml index 49a810adb..665b1c2e1 100644 --- a/deployment/samples/docker-compose/docker-compose.yml +++ b/deployment/samples/docker-compose/docker-compose.yml @@ -7,7 +7,7 @@ volumes: services: pyaleph: restart: always - image: alephim/pyaleph-node:v0.3.1 + image: alephim/pyaleph-node:v0.3.2 command: --config /opt/pyaleph/config.yml --key-dir /opt/pyaleph/keys -v ports: - "127.0.0.1:8000:8000/tcp" diff --git a/deployment/samples/docker-monitoring/docker-compose.yml b/deployment/samples/docker-monitoring/docker-compose.yml index 8c8fd48e7..cc0a67580 100644 --- a/deployment/samples/docker-monitoring/docker-compose.yml +++ b/deployment/samples/docker-monitoring/docker-compose.yml @@ -9,7 +9,7 @@ volumes: services: pyaleph: restart: always - image: alephim/pyaleph-node:v0.3.1 + image: alephim/pyaleph-node:v0.3.2 command: --config /opt/pyaleph/config.yml --key-dir /opt/pyaleph/keys -vv ports: - "127.0.0.1:8000:8000/tcp" diff --git a/docs/conf.py b/docs/conf.py index ee1e0776b..1f0057ab6 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -293,5 +293,5 @@ rst_epilog = """ -.. |pyaleph_version| replace:: v0.3.1 +.. |pyaleph_version| replace:: v0.3.2 """ From 2df8d7b71fcd764561786ca4b4c526ca3a085cda Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Mon, 23 May 2022 10:08:55 +0200 Subject: [PATCH 3/3] [Messages] Sync job Added a new job that synchronizes unconfirmed messages across the network. The goal of this job is to re-send messages missed by nodes with the ability to push data on-chain. This can happen because of various issues like server downtimes or bugs. This job works in three parts: * the publisher task periodically sends the list of all the messages older than the last TX block that have yet to be confirmed by on-chain data. * the receiver task stores the list of unconfirmed messages for each peer detected on the network. * the sync/aggregator task aggregates the confirmation data from all the nodes and fetches messages using the HTTP API. These messages are added to the pending message queue. This solution is less expensive that constantly sharing all the messages across all the nodes and guarantees that the network will be synchronized eventually as long as the on-chain data synchronization jobs are working. With the current implementation, a message can remain out of sync at a maximum until a new TX is published on-chain + the job period (5 minutes currently). --- setup.cfg | 1 + src/aleph/config.py | 1 + src/aleph/jobs/__init__.py | 6 + src/aleph/jobs/sync_unconfirmed_messages.py | 374 ++++++++++++++++++ src/aleph/model/p2p.py | 10 + src/aleph/model/unconfirmed_messages.py | 15 + src/aleph/services/p2p/http.py | 14 +- src/aleph/toolkit/split.py | 7 + src/aleph/toolkit/string.py | 17 + tests/__init__.py | 0 tests/api/test_messages.py | 17 - tests/conftest.py | 22 +- .../{api => }/fixtures/fixture_messages.json | 0 tests/message_processing/conftest.py | 2 +- .../sync_unconfirmed_messages/__init__.py | 0 .../test_aggregate_unconfirmed_messages.py | 74 ++++ .../test_list_unconfirmed_messages.py | 27 ++ .../test_perform_db_operations.py | 10 +- tests/models/test_peer.py | 29 ++ 19 files changed, 601 insertions(+), 25 deletions(-) create mode 100644 src/aleph/jobs/sync_unconfirmed_messages.py create mode 100644 src/aleph/model/unconfirmed_messages.py create mode 100644 src/aleph/toolkit/string.py create mode 100644 tests/__init__.py rename tests/{api => }/fixtures/fixture_messages.json (100%) create mode 100644 tests/message_processing/sync_unconfirmed_messages/__init__.py create mode 100644 tests/message_processing/sync_unconfirmed_messages/test_aggregate_unconfirmed_messages.py create mode 100644 tests/message_processing/sync_unconfirmed_messages/test_list_unconfirmed_messages.py create mode 100644 tests/models/test_peer.py diff --git a/setup.cfg b/setup.cfg index 07ab13179..ab63278b2 100644 --- a/setup.cfg +++ b/setup.cfg @@ -95,6 +95,7 @@ testing = pytest-aiohttp pytest-asyncio pytest-mock + types-pytz types-requests types-setuptools nuls2 = diff --git a/src/aleph/config.py b/src/aleph/config.py index b6229f872..c26301b10 100644 --- a/src/aleph/config.py +++ b/src/aleph/config.py @@ -85,6 +85,7 @@ def get_defaults(): "gateway_port": 8080, "id": None, "alive_topic": "ALEPH_ALIVE", + "sync_topic": "ALEPH_SYNC", "reconnect_delay": 60, "peers": [ "/dnsaddr/api1.aleph.im/ipfs/12D3KooWNgogVS6o8fVsPdzh2FJpCdJJLVSgJT38XGE1BJoCerHx", diff --git a/src/aleph/jobs/__init__.py b/src/aleph/jobs/__init__.py index a66e2be52..900460404 100644 --- a/src/aleph/jobs/__init__.py +++ b/src/aleph/jobs/__init__.py @@ -4,6 +4,7 @@ from aleph.jobs.process_pending_messages import pending_messages_subprocess, retry_messages_task from aleph.jobs.process_pending_txs import pending_txs_subprocess, handle_txs_task +from aleph.jobs.sync_unconfirmed_messages import sync_unconfirmed_messages_subprocess from aleph.jobs.reconnect_ipfs import reconnect_ipfs_job LOGGER = logging.getLogger("jobs") @@ -32,8 +33,13 @@ def start_jobs( target=pending_txs_subprocess, args=(config_values, api_servers), ) + sync_unconfirmed_messages_process = Process( + target=sync_unconfirmed_messages_subprocess, + args=(config_values, api_servers), + ) p1.start() p2.start() + sync_unconfirmed_messages_process.start() else: tasks.append(retry_messages_task(config=config, shared_stats=shared_stats)) tasks.append(handle_txs_task(config)) diff --git a/src/aleph/jobs/sync_unconfirmed_messages.py b/src/aleph/jobs/sync_unconfirmed_messages.py new file mode 100644 index 000000000..c456aa9c9 --- /dev/null +++ b/src/aleph/jobs/sync_unconfirmed_messages.py @@ -0,0 +1,374 @@ +""" +Job that shares unconfirmed messages on the network periodically. + +Messages are shared in real-time on the network using P2P and IPFS topics. However, +some nodes may be offline at the time and miss these messages. While this is not an +issue usually for regular nodes (they will just receive the message later on from +on-chain data), this can become problematic if a node in charge of pushing messages +on-chain is down for a set amount of time. if this happens, the messages shared during +the downtime of this/these node(s) would simply not be published on-chain, causing +synchronisation issues on the network. + +This job provides a solution to this issue. It works in three parts: +* the publisher task periodically sends the list of all the messages older than + the last TX block that have yet to be confirmed by on-chain data. +* the receiver task stores the list of unconfirmed messages for each peer detected + on the network. +* the sync/aggregator task aggregates the confirmation data from all the nodes + and fetches messages using the HTTP API. These messages are added to the pending + message queue. + +Note: currently, we use an IPFS topic given the unreliability of the P2P daemon. +TODO: use a P2P topic once we have a solution (ex: Rust bindings). +""" + +import asyncio +import json +import logging +import random +import time +from typing import List, Dict, Optional + +import pytz +import sentry_sdk +from configmanager import Config +from setproctitle import setproctitle + +from aleph.exceptions import InvalidMessageError +from aleph.logging import setup_logging +from aleph.model.messages import Message +from aleph.model.pending import PendingMessage, PendingTX +from aleph.network import check_message, INCOMING_MESSAGE_AUTHORIZED_FIELDS +from aleph.services.ipfs.pubsub import pub as pub_ipfs, sub as sub_ipfs +from aleph.services.p2p import singleton +from aleph.services.p2p.http import get_messages_from_peer +from .job_utils import prepare_loop +from ..model.chains import Chain +from ..model.p2p import Peer +from ..model.unconfirmed_messages import UnconfirmedMessage +from ..toolkit.split import split_iterable +from ..toolkit.string import truncate_log + +PUB_LOGGER = logging.getLogger(f"{__name__}.publish") +SYNC_LOGGER = logging.getLogger(f"{__name__}.sync") + + +async def list_unconfirmed_message_hashes(older_than: float, limit: int) -> List[str]: + """ + Returns the list of the hashes of unconfirmed messages, up to `limit` messages. + :param older_than: Epoch timestamp. The function will only return unconfirmed + messages older than this value. + :param limit: Maximum number of messages to return. + """ + + unconfirmed_hashes = [ + msg["item_hash"] + async for msg in ( + Message.collection.find( + filter={"confirmed": False, "time": {"$lt": older_than}}, + projection={"_id": 0, "item_hash": 1}, + ) + .sort([("time", 1)]) + .limit(limit) + ) + ] + + return unconfirmed_hashes + + +async def publish_unconfirmed_messages( + topic: str, older_than: float, limit: int = 10000 +): + unconfirmed_messages = await list_unconfirmed_message_hashes( + older_than=older_than, limit=limit + ) + PUB_LOGGER.info("Publishing %d unconfirmed messages", len(unconfirmed_messages)) + data = json.dumps(unconfirmed_messages).encode("utf-8") + await pub_ipfs(topic, data) + + +async def receive_unconfirmed_messages(topic: str): + """ + Receives unconfirmed messages sync data from the network and stores it in the DB. + :param topic: The IPFS topic where unconfirmed messages sync data is published. + """ + + restart_wait_time = 2 + + while True: + try: + async for mvalue in sub_ipfs(topic): + + sender = mvalue["from"] + data = mvalue["data"].decode("utf-8") + try: + unconfirmed_hashes = json.loads(data) + except json.JSONDecodeError: + SYNC_LOGGER.warning( + "Could not parse sync data from %s: %s", + sender, + truncate_log(data, 100), + ) + continue + + SYNC_LOGGER.info( + "Peer %s notified %d unconfirmed messages", + sender, + len(unconfirmed_hashes), + ) + await UnconfirmedMessage.collection.update( + { + "$set": { + "peer_id": sender, + "hashes": unconfirmed_hashes, + "reception_time": time.time(), + }, + }, + upsert=True, + ) + except Exception: + SYNC_LOGGER.exception( + "Unexpected exception while syncing unconfirmed messages." + "Restarting in %d seconds...", + restart_wait_time, + ) + await asyncio.sleep(restart_wait_time) + + +async def aggregate_unconfirmed_hashes(from_time: float) -> Dict[str, List[str]]: + """ + Returns a dictionary of item_hash -> providers based on unconfirmed messages + sent by the other nodes. + + :param from_time: Minimum reception time. Documents received before this epoch time + will be ignored from the query. + :return: A dictionary of item_hash -> providers of all unconfirmed messages + notified by the nodes on the network. + """ + + unconfirmed_message_sources: Dict[str, List[str]] = {} + + async for result in UnconfirmedMessage.collection.aggregate( + [ + # Filter out old data + {"$match": {"reception_time": {"$gte": from_time}}}, + # List the senders by message item_hash + {"$unwind": "$hashes"}, + {"$group": {"_id": "$hashes", "providers": {"$push": "$sender"}}}, + # Only return messages not already present on the node + { + "$lookup": { + "from": "messages", + "localField": "_id", + "foreignField": "item_hash", + "as": "existing_messages", + }, + }, + {"$match": {"existing_messages": {"$eq": []}}}, + ] + ): + unconfirmed_message_sources[result["_id"]] = result["providers"] + + return unconfirmed_message_sources + + +async def fetch_and_verify_message( + item_hash: str, providers: List[str] +) -> Optional[Dict]: + """ + Fetches a message from the HTTP API of a peer ID randomly selected in the list + of providers. + + Validates the data sent by the node and discards corrupted data. + + :param item_hash: Item hash of the message to retrieve. + :param providers: List of peer IDs that announced they have the message. Each provider + is expected to be able to provide the message. + :return: The message, or None if it could not be fetched from any of the providers. + """ + + randomized_providers = random.sample(providers, len(providers)) + for provider in randomized_providers: + provider_uri = await Peer.get_peer_address(peer_id=provider, peer_type="HTTP") + if provider_uri is None: + SYNC_LOGGER.warning("Could not determine HTTP address for %s", provider) + continue + + messages = await get_messages_from_peer( + peer_uri=provider_uri, item_hash=item_hash, timeout=15 + ) + if messages is None: + SYNC_LOGGER.warning( + "Message %s could not be fetched from %s", item_hash, provider + ) + continue + + message = messages[0] + try: + # Check the message immediately, signature included, in order to + # ignore any node that tampers with the data. Calling the function + # this way also filters out fields added by the API and allows + # to add the message to the pending queue as is. + return await check_message(message, from_network=True, trusted=False) + except InvalidMessageError as e: + SYNC_LOGGER.warning( + "Message fetched from %s is invalid: %s", provider, str(e) + ) + + return None + + +async def fetch_and_verify_message_task( + item_hash: str, providers: List[str], task_semaphore: asyncio.Semaphore +) -> Optional[Dict]: + async with task_semaphore: + return await fetch_and_verify_message(item_hash, providers) + + +async def fetch_missing_messages(last_run_time: float): + unconfirmed_message_sources = await aggregate_unconfirmed_hashes( + from_time=last_run_time + ) + + if not unconfirmed_message_sources: + SYNC_LOGGER.info( + "No unconfirmed messages notified by the network, nothing to do." + ) + return + + SYNC_LOGGER.info( + "Found %d messages to fetch from the network", len(unconfirmed_message_sources) + ) + + message_semaphore = asyncio.BoundedSemaphore(100) + + tasks = [ + asyncio.create_task( + fetch_and_verify_message_task( + item_hash, providers, task_semaphore=message_semaphore + ) + ) + for item_hash, providers in unconfirmed_message_sources.items() + ] + + results = await asyncio.gather(*tasks, return_exceptions=True) + errors, messages = split_iterable( + results, lambda result: isinstance(result, BaseException) + ) + + for error in errors: + SYNC_LOGGER.exception( + "unexpected error while fetching unconfirmed messages", + exc_info=(type(error), error, error.__traceback__), + ) + + pending_messages = [ + { + # TODO: this filtering is redundant as long as we use check_message(trusted=True). + # update this to use the PendingMessage models added in #273. + "message": { + k: v + for k, v in message.items() + if k in INCOMING_MESSAGE_AUTHORIZED_FIELDS + }, + "source": { + "chain_name": None, + "tx_hash": None, + "height": None, + "check_message": True, + }, + } + for message in messages + ] + await PendingMessage.collection.bulk_write(pending_messages) + + +async def get_last_processed_tx_time() -> float: + """ + Returns the epoch timestamp of the last processed TX in the chains collection. + """ + + # TODO: check if this is correct in a multi-chain context + last_processed_tx = await Chain.collection.find( + {"_id": 0, "last_update": 1}, + sort={"last_update": -1}, + limit=1, + ) + if last_processed_tx is None: + raise ValueError("Could not find last processed TX.") + + localized_update_datetime = pytz.utc.localize(last_processed_tx["last_update"]) + return localized_update_datetime.timestamp() + + +async def publish_unconfirmed_messages_loop(topic: str, job_period: float): + while True: + try: + # Avoid publishing data if the node is currently syncing + nb_pending_txs = await PendingTX.collection.count_documents({}) + + if nb_pending_txs: + PUB_LOGGER.info( + "Node currently syncing (%d pending txs). Unconfirmed messages not published.", + nb_pending_txs, + ) + + else: + last_processed_tx = await Chain.collection.find_one( + {"chain": "ETH"}, {"_id": 0, "last_update": 1} + ) + await publish_unconfirmed_messages( + topic=topic, + older_than=last_processed_tx["last_update"].epoch(), + limit=10000, + ) + except Exception: + PUB_LOGGER.exception("Could not publish unconfirmed messages") + + await asyncio.sleep(job_period) + + +async def sync_unconfirmed_messages_loop(job_period: float): + await asyncio.sleep(4) + last_run_time = time.time() + + SYNC_LOGGER.info("Running sync aggregate task every %d seconds", job_period) + + while True: + try: + await fetch_missing_messages(last_run_time=last_run_time) + except Exception: + SYNC_LOGGER.exception("Error in sync unconfirmed messages job") + + await asyncio.sleep(job_period) + + +async def run_sync_message_tasks(config: Config): + topic = config.ipfs.sync_topic.value + job_period = 300 + + await asyncio.gather( + publish_unconfirmed_messages_loop(topic=topic, job_period=job_period), + receive_unconfirmed_messages(topic=topic), + sync_unconfirmed_messages_loop(job_period=job_period), + ) + + +def sync_unconfirmed_messages_subprocess(config_values: Dict, api_servers: List): + proctitle = "aleph.jobs.sync_unconfirmed_messages" + setproctitle(proctitle) + loop, config = prepare_loop(config_values) + + sentry_sdk.init( + dsn=config.sentry.dsn.value, + traces_sample_rate=config.sentry.traces_sample_rate.value, + ignore_errors=[KeyboardInterrupt], + ) + setup_logging( + loglevel=config.logging.level.value, + filename=f"/tmp/{proctitle.replace('.', '-')}.log", + max_log_file_size=config.logging.max_log_file_size.value, + ) + singleton.api_servers = api_servers + + loop.run_until_complete(run_sync_message_tasks(config)) diff --git a/src/aleph/model/p2p.py b/src/aleph/model/p2p.py index 8c0db47aa..517fb573b 100644 --- a/src/aleph/model/p2p.py +++ b/src/aleph/model/p2p.py @@ -21,6 +21,16 @@ class Peer(BaseClass): IndexModel([("last_seen", DESCENDING)]), ] + @classmethod + async def get_peer_address(cls, peer_id: str, peer_type: str) -> Optional[str]: + peer = await cls.collection.find_one( + {"sender": peer_id, "type": peer_type}, {"_id": 0, "address": 1} + ) + if peer is None: + return None + + return peer["address"] + async def get_peers(peer_type: Optional[str] = None, hours: int = 2) -> AsyncIterator[str]: """Returns current peers. diff --git a/src/aleph/model/unconfirmed_messages.py b/src/aleph/model/unconfirmed_messages.py new file mode 100644 index 000000000..068693f39 --- /dev/null +++ b/src/aleph/model/unconfirmed_messages.py @@ -0,0 +1,15 @@ +from pymongo import ASCENDING, DESCENDING, IndexModel + +from aleph.model.base import BaseClass + + +class UnconfirmedMessage(BaseClass): + """ + Synchronization messages sent by other nodes. Each document contains the hashes of + unconfirmed messages for one peer on the network. + + Refer to the unconfirmed messages synchronization job for more details. + """ + + COLLECTION = "unconfirmed_messages" + INDEXES = [IndexModel("peer_id", unique=True)] diff --git a/src/aleph/services/p2p/http.py b/src/aleph/services/p2p/http.py index 4cca035ff..4c404c1e7 100644 --- a/src/aleph/services/p2p/http.py +++ b/src/aleph/services/p2p/http.py @@ -6,7 +6,7 @@ import base64 import logging from random import sample -from typing import Optional +from typing import Optional, Dict import aiohttp @@ -45,6 +45,18 @@ async def api_get_request(base_uri, method, timeout=1): return result +async def get_messages_from_peer( + peer_uri: str, item_hash: str, timeout: int +) -> Optional[Dict]: + result = await api_get_request( + base_uri=peer_uri, method=f"messages.json?hashes={item_hash}", timeout=timeout + ) + if result is None: + return None + + return result["messages"] + + async def get_peer_hash_content( base_uri: str, item_hash: str, timeout: int = 1 ) -> Optional[bytes]: diff --git a/src/aleph/toolkit/split.py b/src/aleph/toolkit/split.py index be892811a..7cd52ca1d 100644 --- a/src/aleph/toolkit/split.py +++ b/src/aleph/toolkit/split.py @@ -5,6 +5,13 @@ def split_iterable(iterable: Iterable[T], cond: Callable[[T], bool]) -> Tuple[List[T], List[T]]: + """ + Splits an iterable in two based on the condition and returns the two lists as + a (matches, others) tuple. + :param iterable: The iterable to split. + :param cond: A condition to verify for each element of the iterable. + """ + matches = [] others = [] diff --git a/src/aleph/toolkit/string.py b/src/aleph/toolkit/string.py new file mode 100644 index 000000000..499ad8628 --- /dev/null +++ b/src/aleph/toolkit/string.py @@ -0,0 +1,17 @@ +def truncate_log(s: str, max_length: int, suffix: str = "...") -> str: + """ + Truncates a string down to the specified length. Appends a user-defined suffix + if the string is truncated. The string is returned unmodified if it is shorter + than the specified maximum length. + + :param s: String to truncate. + :param max_length: Maximum size of the returned string. + :param suffix: String to be appended to the base string if it is truncated. + :return: A string of len <= max_length where the end of the original string + is replaced by the suffix if it is longer than max_length. + """ + + if len(s) <= max_length: + return s + + return s[: max_length - len(suffix)] + suffix diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/api/test_messages.py b/tests/api/test_messages.py index ad9a07f7a..34fe99e58 100644 --- a/tests/api/test_messages.py +++ b/tests/api/test_messages.py @@ -1,12 +1,7 @@ import itertools -import json -from pathlib import Path from typing import Dict, Iterable, List import pytest -import pytest_asyncio - -from aleph.model.messages import Message MESSAGES_URI = "/api/v0/messages.json" @@ -29,18 +24,6 @@ def assert_messages_equal(messages: Iterable[Dict], expected_messages: Iterable[ assert message["signature"] == expected_message["signature"] -@pytest_asyncio.fixture -async def fixture_messages(test_db): - fixtures_dir = Path(__file__).parent / "fixtures" - fixtures_file = fixtures_dir / "fixture_messages.json" - - with fixtures_file.open() as f: - messages = json.load(f) - - await Message.collection.insert_many(messages) - return messages - - @pytest.mark.asyncio async def test_get_messages(fixture_messages, ccn_api_client): response = await ccn_api_client.get(MESSAGES_URI) diff --git a/tests/conftest.py b/tests/conftest.py index d433e1d03..8ffba1b93 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,5 +1,6 @@ import asyncio -import os +import json +from pathlib import Path import pymongo import pytest_asyncio @@ -7,6 +8,7 @@ from aleph.config import get_defaults from aleph.model import init_db +from aleph.model.messages import Message from aleph.web import create_app TEST_DB = "ccn_automated_tests" @@ -30,6 +32,7 @@ async def test_db(): init_db(config, ensure_indexes=True) from aleph.model import db + yield db @@ -43,3 +46,20 @@ async def ccn_api_client(aiohttp_client): client = await aiohttp_client(app) return client + + +@pytest_asyncio.fixture +async def fixture_messages(test_db): + """ + Some generic message fixtures to insert in the messages collection for + tests fetching data from the DB. + """ + + fixtures_dir = Path(__file__).parent / "fixtures" + fixtures_file = fixtures_dir / "fixture_messages.json" + + with fixtures_file.open() as f: + messages = json.load(f) + + await Message.collection.insert_many(messages) + return messages diff --git a/tests/api/fixtures/fixture_messages.json b/tests/fixtures/fixture_messages.json similarity index 100% rename from tests/api/fixtures/fixture_messages.json rename to tests/fixtures/fixture_messages.json diff --git a/tests/message_processing/conftest.py b/tests/message_processing/conftest.py index 1b6b8e9fc..702204bf0 100644 --- a/tests/message_processing/conftest.py +++ b/tests/message_processing/conftest.py @@ -4,5 +4,5 @@ @pytest.fixture -def fixture_messages(): +def fixture_chain_data(): return load_fixture_messages("test-data-pending-tx-messages.json") diff --git a/tests/message_processing/sync_unconfirmed_messages/__init__.py b/tests/message_processing/sync_unconfirmed_messages/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/message_processing/sync_unconfirmed_messages/test_aggregate_unconfirmed_messages.py b/tests/message_processing/sync_unconfirmed_messages/test_aggregate_unconfirmed_messages.py new file mode 100644 index 000000000..0a3529cd1 --- /dev/null +++ b/tests/message_processing/sync_unconfirmed_messages/test_aggregate_unconfirmed_messages.py @@ -0,0 +1,74 @@ +import pytest + +from aleph.jobs.sync_unconfirmed_messages import aggregate_unconfirmed_hashes +from aleph.model.messages import Message +from aleph.model.unconfirmed_messages import UnconfirmedMessage + + +@pytest.mark.asyncio +async def test_aggregate_unconfirmed_messages(test_db): + await UnconfirmedMessage.collection.insert_many( + [ + {"sender": "1", "hashes": ["123", "456", "789"], "reception_time": 1000000}, + {"sender": "2", "hashes": ["123", "789", "abc"], "reception_time": 1000000}, + ] + ) + + unconfirmed_message_sources = await aggregate_unconfirmed_hashes(from_time=0) + expected_unconfirmed_sources = { + "123": ["1", "2"], + "456": ["1"], + "789": ["1", "2"], + "abc": ["2"], + } + + assert unconfirmed_message_sources == expected_unconfirmed_sources + + +@pytest.mark.asyncio +async def test_aggregate_unconfirmed_messages_already_present(test_db): + """ + Tests that messages already present on the local node are ignored when aggregating + unconfirmed messages sent by the network. + """ + + await UnconfirmedMessage.collection.insert_many( + [ + {"sender": "1", "hashes": ["123", "456", "789"], "reception_time": 1000000}, + {"sender": "2", "hashes": ["123", "789", "abc"], "reception_time": 1000000}, + ] + ) + + await Message.collection.insert_many([{"item_hash": "123"}, {"item_hash": "abc"}]) + + unconfirmed_message_sources = await aggregate_unconfirmed_hashes(from_time=0) + expected_unconfirmed_sources = { + "456": ["1"], + "789": ["1", "2"], + } + + assert unconfirmed_message_sources == expected_unconfirmed_sources + + +@pytest.mark.asyncio +async def test_aggregate_unconfirmed_messages_ignore_old_data(test_db): + """ + Tests that messages already present on the local node are ignored when aggregating + unconfirmed messages sent by the network. + """ + + await UnconfirmedMessage.collection.insert_many( + [ + {"sender": "1", "hashes": ["123", "456", "789"], "reception_time": 100000}, + {"sender": "2", "hashes": ["123", "789", "abc"], "reception_time": 1000000}, + ] + ) + + unconfirmed_message_sources = await aggregate_unconfirmed_hashes(from_time=1000000) + expected_unconfirmed_sources = { + "123": ["2"], + "789": ["2"], + "abc": ["2"], + } + + assert unconfirmed_message_sources == expected_unconfirmed_sources diff --git a/tests/message_processing/sync_unconfirmed_messages/test_list_unconfirmed_messages.py b/tests/message_processing/sync_unconfirmed_messages/test_list_unconfirmed_messages.py new file mode 100644 index 000000000..c2661a87f --- /dev/null +++ b/tests/message_processing/sync_unconfirmed_messages/test_list_unconfirmed_messages.py @@ -0,0 +1,27 @@ +import pytest +from aleph.jobs.sync_unconfirmed_messages import list_unconfirmed_message_hashes +import datetime as dt + + +@pytest.mark.asyncio +async def test_list_unconfirmed_message_hashes(test_db, fixture_messages): + # List all unconfirmed messages + # TODO: update this by 3999, Dec 31st + in_a_long_time = dt.datetime(4000, 1, 1) + + expected_hashes = { + "9200cfab5950e5d173f07d7c61bb0524675d0305e808590e7d0a0752ce65f791", + "4c33dd1ebf61bbb4342d8258b591fcd52cca73fd7c425542f78311d8f45ba274", + } + hashes = await list_unconfirmed_message_hashes( + older_than=in_a_long_time.timestamp(), limit=1000 + ) + assert set(hashes) == expected_hashes + + # List only one message using older_than + filtered_hashes = await list_unconfirmed_message_hashes(older_than=1652126600, limit=1000) + assert filtered_hashes == ["9200cfab5950e5d173f07d7c61bb0524675d0305e808590e7d0a0752ce65f791"] + + # List 0 messages + no_hashes = await list_unconfirmed_message_hashes(older_than=0, limit=1000) + assert no_hashes == [] diff --git a/tests/message_processing/test_perform_db_operations.py b/tests/message_processing/test_perform_db_operations.py index 413e4d211..43cb0ce45 100644 --- a/tests/message_processing/test_perform_db_operations.py +++ b/tests/message_processing/test_perform_db_operations.py @@ -61,7 +61,7 @@ async def test_db_operations_delete_one(test_db): @pytest.mark.asyncio -async def test_db_operations_insert_and_delete(test_db, fixture_messages): +async def test_db_operations_insert_and_delete(test_db, fixture_chain_data): """ Test a typical case where we insert several messages and delete a pending TX. """ @@ -72,7 +72,7 @@ async def test_db_operations_insert_and_delete(test_db, fixture_messages): db_operations = [ DbBulkOperation(collection=PendingMessage, operation=InsertOne(msg)) - for msg in fixture_messages + for msg in fixture_chain_data ] db_operations.append( @@ -89,13 +89,13 @@ async def test_db_operations_insert_and_delete(test_db, fixture_messages): tx_end_count = await PendingTX.count({}) msg_end_count = await PendingMessage.count({}) assert tx_end_count - tx_start_count == -1 - assert msg_end_count - msg_start_count == len(fixture_messages) + assert msg_end_count - msg_start_count == len(fixture_chain_data) # Check each message - fixture_messages_by_hash = {msg["item_hash"]: msg for msg in fixture_messages} + fixture_messages_by_hash = {msg["item_hash"]: msg for msg in fixture_chain_data} async for pending_msg in PendingMessage.collection.find( - {"message.item_hash": {"$in": [msg["item_hash"] for msg in fixture_messages]}} + {"message.item_hash": {"$in": [msg["item_hash"] for msg in fixture_chain_data]}} ): pending_message = pending_msg["message"] expected_message = fixture_messages_by_hash[pending_message["item_hash"]] diff --git a/tests/models/test_peer.py b/tests/models/test_peer.py new file mode 100644 index 000000000..79cacc81f --- /dev/null +++ b/tests/models/test_peer.py @@ -0,0 +1,29 @@ +import pytest +from aleph.model.p2p import Peer +import datetime as dt + + +@pytest.mark.asyncio +async def test_get_peer_type_no_match(test_db): + peer_address = await Peer.get_peer_address(peer_id="123", peer_type="HTTP") + assert peer_address is None + + +@pytest.mark.asyncio +async def test_get_peer_type_match(test_db): + peer_id = "123" + http_address = "http://127.0.0.1:4024" + + await Peer.collection.insert_one( + { + "address": http_address, + "type": "HTTP", + "last_seen": dt.datetime.utcnow(), + "sender": peer_id, + "source": "p2p", + } + ) + + peer_address = await Peer.get_peer_address(peer_id=peer_id, peer_type="HTTP") + assert peer_address is not None + assert peer_address == http_address