diff --git a/CHANGELOG.rst b/CHANGELOG.rst index bb113ff20..079fcb48e 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -2,6 +2,16 @@ Changelog ========= +Version 0.3.2 +============= + +This release fixes a synchronisation delay issue. The pending message job blocked +while waiting for the last pending messages in the queue to be processed. This caused +a delay of several hours until the job could loop again on the pending messages collection +and start processing new pending messages. We removed the blocking synchronisation point +and now let the job pick up new pending messages while ignoring the ones that are already +being processed. + Version 0.3.1 ============= diff --git a/deployment/samples/docker-compose/docker-compose.yml b/deployment/samples/docker-compose/docker-compose.yml index 49a810adb..665b1c2e1 100644 --- a/deployment/samples/docker-compose/docker-compose.yml +++ b/deployment/samples/docker-compose/docker-compose.yml @@ -7,7 +7,7 @@ volumes: services: pyaleph: restart: always - image: alephim/pyaleph-node:v0.3.1 + image: alephim/pyaleph-node:v0.3.2 command: --config /opt/pyaleph/config.yml --key-dir /opt/pyaleph/keys -v ports: - "127.0.0.1:8000:8000/tcp" diff --git a/deployment/samples/docker-monitoring/docker-compose.yml b/deployment/samples/docker-monitoring/docker-compose.yml index 8c8fd48e7..cc0a67580 100644 --- a/deployment/samples/docker-monitoring/docker-compose.yml +++ b/deployment/samples/docker-monitoring/docker-compose.yml @@ -9,7 +9,7 @@ volumes: services: pyaleph: restart: always - image: alephim/pyaleph-node:v0.3.1 + image: alephim/pyaleph-node:v0.3.2 command: --config /opt/pyaleph/config.yml --key-dir /opt/pyaleph/keys -vv ports: - "127.0.0.1:8000:8000/tcp" diff --git a/docs/conf.py b/docs/conf.py index ee1e0776b..1f0057ab6 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -293,5 +293,5 @@ rst_epilog = """ -.. |pyaleph_version| replace:: v0.3.1 +.. |pyaleph_version| replace:: v0.3.2 """ diff --git a/setup.cfg b/setup.cfg index 07ab13179..ab63278b2 100644 --- a/setup.cfg +++ b/setup.cfg @@ -95,6 +95,7 @@ testing = pytest-aiohttp pytest-asyncio pytest-mock + types-pytz types-requests types-setuptools nuls2 = diff --git a/src/aleph/config.py b/src/aleph/config.py index b6229f872..c26301b10 100644 --- a/src/aleph/config.py +++ b/src/aleph/config.py @@ -85,6 +85,7 @@ def get_defaults(): "gateway_port": 8080, "id": None, "alive_topic": "ALEPH_ALIVE", + "sync_topic": "ALEPH_SYNC", "reconnect_delay": 60, "peers": [ "/dnsaddr/api1.aleph.im/ipfs/12D3KooWNgogVS6o8fVsPdzh2FJpCdJJLVSgJT38XGE1BJoCerHx", diff --git a/src/aleph/jobs/__init__.py b/src/aleph/jobs/__init__.py index a66e2be52..900460404 100644 --- a/src/aleph/jobs/__init__.py +++ b/src/aleph/jobs/__init__.py @@ -4,6 +4,7 @@ from aleph.jobs.process_pending_messages import pending_messages_subprocess, retry_messages_task from aleph.jobs.process_pending_txs import pending_txs_subprocess, handle_txs_task +from aleph.jobs.sync_unconfirmed_messages import sync_unconfirmed_messages_subprocess from aleph.jobs.reconnect_ipfs import reconnect_ipfs_job LOGGER = logging.getLogger("jobs") @@ -32,8 +33,13 @@ def start_jobs( target=pending_txs_subprocess, args=(config_values, api_servers), ) + sync_unconfirmed_messages_process = Process( + target=sync_unconfirmed_messages_subprocess, + args=(config_values, api_servers), + ) p1.start() p2.start() + sync_unconfirmed_messages_process.start() else: tasks.append(retry_messages_task(config=config, shared_stats=shared_stats)) tasks.append(handle_txs_task(config)) diff --git a/src/aleph/jobs/process_pending_messages.py b/src/aleph/jobs/process_pending_messages.py index 3fbfd779d..31a2816d9 100644 --- a/src/aleph/jobs/process_pending_messages.py +++ b/src/aleph/jobs/process_pending_messages.py @@ -72,6 +72,7 @@ async def process_message_job_results( finished_tasks: Set[asyncio.Task], task_message_dict: Dict[asyncio.Task, Dict], shared_stats: Dict[str, Any], + processing_messages: Set[Tuple], ): await process_job_results( finished_tasks, @@ -84,6 +85,9 @@ async def process_message_job_results( shared_stats["retry_messages_job_tasks"] -= 1 shared_stats["message_jobs"][message_type] -= 1 + pending_message_id = get_pending_message_id(pending) + processing_messages.remove(pending_message_id) + del task_message_dict[message_task] @@ -100,12 +104,26 @@ def validate_pending_message(pending: Dict): ) +def get_pending_message_id(pending_message: Dict) -> Tuple: + source = pending_message.get("source", {}) + chain_name = source.get("chain_name", None) + height = source.get("height", None) + + return ( + pending_message["message"]["item_hash"], + pending_message["message"]["sender"], + chain_name, + height, + ) + + async def process_pending_messages(config: Config, shared_stats: Dict): """ Processes all the messages in the pending message queue. """ seen_ids: Dict[Tuple, int] = dict() + processing_messages = set() find_params: Dict = {} max_concurrent_tasks = config.aleph.jobs.pending_messages.max_concurrency.value @@ -116,21 +134,30 @@ async def process_pending_messages(config: Config, shared_stats: Dict): for message_type in MessageType: shared_stats["message_jobs"][message_type] = 0 + # Using a set is required as asyncio.wait takes and returns sets. + pending_tasks: Set[asyncio.Task] = set() + task_message_dict: Dict[asyncio.Task, Dict] = {} + while await PendingMessage.collection.count_documents(find_params): - # Using a set is required as asyncio.wait takes and returns sets. - pending_tasks: Set[asyncio.Task] = set() - task_message_dict: Dict[asyncio.Task, Dict] = {} async for pending in PendingMessage.collection.find(find_params).sort( [("retries", ASCENDING), ("message.time", ASCENDING)] ).batch_size(max_concurrent_tasks): + # Check if the message is already processing + pending_message_id = get_pending_message_id(pending) + if pending_message_id in processing_messages: + # Skip the message, we're already processing it + continue + + processing_messages.add(pending_message_id) + if len(pending_tasks) == max_concurrent_tasks: finished_tasks, pending_tasks = await asyncio.wait( pending_tasks, return_when=asyncio.FIRST_COMPLETED ) await process_message_job_results( - finished_tasks, task_message_dict, shared_stats + finished_tasks, task_message_dict, shared_stats, processing_messages ) validate_pending_message(pending) @@ -148,13 +175,15 @@ async def process_pending_messages(config: Config, shared_stats: Dict): pending_tasks.add(message_task) task_message_dict[message_task] = pending - # Wait for the last tasks + # This synchronization point is required when a few pending messages remain. + # We wait for at least one task to finish; the remaining tasks will be collected + # on the next iterations of the loop. if pending_tasks: finished_tasks, _ = await asyncio.wait( - pending_tasks, return_when=asyncio.ALL_COMPLETED + pending_tasks, return_when=asyncio.FIRST_COMPLETED ) await process_message_job_results( - finished_tasks, task_message_dict, shared_stats + finished_tasks, task_message_dict, shared_stats, processing_messages ) # TODO: move this to a dedicated job and/or check unicity on insertion diff --git a/src/aleph/jobs/sync_unconfirmed_messages.py b/src/aleph/jobs/sync_unconfirmed_messages.py new file mode 100644 index 000000000..c456aa9c9 --- /dev/null +++ b/src/aleph/jobs/sync_unconfirmed_messages.py @@ -0,0 +1,374 @@ +""" +Job that shares unconfirmed messages on the network periodically. + +Messages are shared in real-time on the network using P2P and IPFS topics. However, +some nodes may be offline at the time and miss these messages. While this is not an +issue usually for regular nodes (they will just receive the message later on from +on-chain data), this can become problematic if a node in charge of pushing messages +on-chain is down for a set amount of time. if this happens, the messages shared during +the downtime of this/these node(s) would simply not be published on-chain, causing +synchronisation issues on the network. + +This job provides a solution to this issue. It works in three parts: +* the publisher task periodically sends the list of all the messages older than + the last TX block that have yet to be confirmed by on-chain data. +* the receiver task stores the list of unconfirmed messages for each peer detected + on the network. +* the sync/aggregator task aggregates the confirmation data from all the nodes + and fetches messages using the HTTP API. These messages are added to the pending + message queue. + +Note: currently, we use an IPFS topic given the unreliability of the P2P daemon. +TODO: use a P2P topic once we have a solution (ex: Rust bindings). +""" + +import asyncio +import json +import logging +import random +import time +from typing import List, Dict, Optional + +import pytz +import sentry_sdk +from configmanager import Config +from setproctitle import setproctitle + +from aleph.exceptions import InvalidMessageError +from aleph.logging import setup_logging +from aleph.model.messages import Message +from aleph.model.pending import PendingMessage, PendingTX +from aleph.network import check_message, INCOMING_MESSAGE_AUTHORIZED_FIELDS +from aleph.services.ipfs.pubsub import pub as pub_ipfs, sub as sub_ipfs +from aleph.services.p2p import singleton +from aleph.services.p2p.http import get_messages_from_peer +from .job_utils import prepare_loop +from ..model.chains import Chain +from ..model.p2p import Peer +from ..model.unconfirmed_messages import UnconfirmedMessage +from ..toolkit.split import split_iterable +from ..toolkit.string import truncate_log + +PUB_LOGGER = logging.getLogger(f"{__name__}.publish") +SYNC_LOGGER = logging.getLogger(f"{__name__}.sync") + + +async def list_unconfirmed_message_hashes(older_than: float, limit: int) -> List[str]: + """ + Returns the list of the hashes of unconfirmed messages, up to `limit` messages. + :param older_than: Epoch timestamp. The function will only return unconfirmed + messages older than this value. + :param limit: Maximum number of messages to return. + """ + + unconfirmed_hashes = [ + msg["item_hash"] + async for msg in ( + Message.collection.find( + filter={"confirmed": False, "time": {"$lt": older_than}}, + projection={"_id": 0, "item_hash": 1}, + ) + .sort([("time", 1)]) + .limit(limit) + ) + ] + + return unconfirmed_hashes + + +async def publish_unconfirmed_messages( + topic: str, older_than: float, limit: int = 10000 +): + unconfirmed_messages = await list_unconfirmed_message_hashes( + older_than=older_than, limit=limit + ) + PUB_LOGGER.info("Publishing %d unconfirmed messages", len(unconfirmed_messages)) + data = json.dumps(unconfirmed_messages).encode("utf-8") + await pub_ipfs(topic, data) + + +async def receive_unconfirmed_messages(topic: str): + """ + Receives unconfirmed messages sync data from the network and stores it in the DB. + :param topic: The IPFS topic where unconfirmed messages sync data is published. + """ + + restart_wait_time = 2 + + while True: + try: + async for mvalue in sub_ipfs(topic): + + sender = mvalue["from"] + data = mvalue["data"].decode("utf-8") + try: + unconfirmed_hashes = json.loads(data) + except json.JSONDecodeError: + SYNC_LOGGER.warning( + "Could not parse sync data from %s: %s", + sender, + truncate_log(data, 100), + ) + continue + + SYNC_LOGGER.info( + "Peer %s notified %d unconfirmed messages", + sender, + len(unconfirmed_hashes), + ) + await UnconfirmedMessage.collection.update( + { + "$set": { + "peer_id": sender, + "hashes": unconfirmed_hashes, + "reception_time": time.time(), + }, + }, + upsert=True, + ) + except Exception: + SYNC_LOGGER.exception( + "Unexpected exception while syncing unconfirmed messages." + "Restarting in %d seconds...", + restart_wait_time, + ) + await asyncio.sleep(restart_wait_time) + + +async def aggregate_unconfirmed_hashes(from_time: float) -> Dict[str, List[str]]: + """ + Returns a dictionary of item_hash -> providers based on unconfirmed messages + sent by the other nodes. + + :param from_time: Minimum reception time. Documents received before this epoch time + will be ignored from the query. + :return: A dictionary of item_hash -> providers of all unconfirmed messages + notified by the nodes on the network. + """ + + unconfirmed_message_sources: Dict[str, List[str]] = {} + + async for result in UnconfirmedMessage.collection.aggregate( + [ + # Filter out old data + {"$match": {"reception_time": {"$gte": from_time}}}, + # List the senders by message item_hash + {"$unwind": "$hashes"}, + {"$group": {"_id": "$hashes", "providers": {"$push": "$sender"}}}, + # Only return messages not already present on the node + { + "$lookup": { + "from": "messages", + "localField": "_id", + "foreignField": "item_hash", + "as": "existing_messages", + }, + }, + {"$match": {"existing_messages": {"$eq": []}}}, + ] + ): + unconfirmed_message_sources[result["_id"]] = result["providers"] + + return unconfirmed_message_sources + + +async def fetch_and_verify_message( + item_hash: str, providers: List[str] +) -> Optional[Dict]: + """ + Fetches a message from the HTTP API of a peer ID randomly selected in the list + of providers. + + Validates the data sent by the node and discards corrupted data. + + :param item_hash: Item hash of the message to retrieve. + :param providers: List of peer IDs that announced they have the message. Each provider + is expected to be able to provide the message. + :return: The message, or None if it could not be fetched from any of the providers. + """ + + randomized_providers = random.sample(providers, len(providers)) + for provider in randomized_providers: + provider_uri = await Peer.get_peer_address(peer_id=provider, peer_type="HTTP") + if provider_uri is None: + SYNC_LOGGER.warning("Could not determine HTTP address for %s", provider) + continue + + messages = await get_messages_from_peer( + peer_uri=provider_uri, item_hash=item_hash, timeout=15 + ) + if messages is None: + SYNC_LOGGER.warning( + "Message %s could not be fetched from %s", item_hash, provider + ) + continue + + message = messages[0] + try: + # Check the message immediately, signature included, in order to + # ignore any node that tampers with the data. Calling the function + # this way also filters out fields added by the API and allows + # to add the message to the pending queue as is. + return await check_message(message, from_network=True, trusted=False) + except InvalidMessageError as e: + SYNC_LOGGER.warning( + "Message fetched from %s is invalid: %s", provider, str(e) + ) + + return None + + +async def fetch_and_verify_message_task( + item_hash: str, providers: List[str], task_semaphore: asyncio.Semaphore +) -> Optional[Dict]: + async with task_semaphore: + return await fetch_and_verify_message(item_hash, providers) + + +async def fetch_missing_messages(last_run_time: float): + unconfirmed_message_sources = await aggregate_unconfirmed_hashes( + from_time=last_run_time + ) + + if not unconfirmed_message_sources: + SYNC_LOGGER.info( + "No unconfirmed messages notified by the network, nothing to do." + ) + return + + SYNC_LOGGER.info( + "Found %d messages to fetch from the network", len(unconfirmed_message_sources) + ) + + message_semaphore = asyncio.BoundedSemaphore(100) + + tasks = [ + asyncio.create_task( + fetch_and_verify_message_task( + item_hash, providers, task_semaphore=message_semaphore + ) + ) + for item_hash, providers in unconfirmed_message_sources.items() + ] + + results = await asyncio.gather(*tasks, return_exceptions=True) + errors, messages = split_iterable( + results, lambda result: isinstance(result, BaseException) + ) + + for error in errors: + SYNC_LOGGER.exception( + "unexpected error while fetching unconfirmed messages", + exc_info=(type(error), error, error.__traceback__), + ) + + pending_messages = [ + { + # TODO: this filtering is redundant as long as we use check_message(trusted=True). + # update this to use the PendingMessage models added in #273. + "message": { + k: v + for k, v in message.items() + if k in INCOMING_MESSAGE_AUTHORIZED_FIELDS + }, + "source": { + "chain_name": None, + "tx_hash": None, + "height": None, + "check_message": True, + }, + } + for message in messages + ] + await PendingMessage.collection.bulk_write(pending_messages) + + +async def get_last_processed_tx_time() -> float: + """ + Returns the epoch timestamp of the last processed TX in the chains collection. + """ + + # TODO: check if this is correct in a multi-chain context + last_processed_tx = await Chain.collection.find( + {"_id": 0, "last_update": 1}, + sort={"last_update": -1}, + limit=1, + ) + if last_processed_tx is None: + raise ValueError("Could not find last processed TX.") + + localized_update_datetime = pytz.utc.localize(last_processed_tx["last_update"]) + return localized_update_datetime.timestamp() + + +async def publish_unconfirmed_messages_loop(topic: str, job_period: float): + while True: + try: + # Avoid publishing data if the node is currently syncing + nb_pending_txs = await PendingTX.collection.count_documents({}) + + if nb_pending_txs: + PUB_LOGGER.info( + "Node currently syncing (%d pending txs). Unconfirmed messages not published.", + nb_pending_txs, + ) + + else: + last_processed_tx = await Chain.collection.find_one( + {"chain": "ETH"}, {"_id": 0, "last_update": 1} + ) + await publish_unconfirmed_messages( + topic=topic, + older_than=last_processed_tx["last_update"].epoch(), + limit=10000, + ) + except Exception: + PUB_LOGGER.exception("Could not publish unconfirmed messages") + + await asyncio.sleep(job_period) + + +async def sync_unconfirmed_messages_loop(job_period: float): + await asyncio.sleep(4) + last_run_time = time.time() + + SYNC_LOGGER.info("Running sync aggregate task every %d seconds", job_period) + + while True: + try: + await fetch_missing_messages(last_run_time=last_run_time) + except Exception: + SYNC_LOGGER.exception("Error in sync unconfirmed messages job") + + await asyncio.sleep(job_period) + + +async def run_sync_message_tasks(config: Config): + topic = config.ipfs.sync_topic.value + job_period = 300 + + await asyncio.gather( + publish_unconfirmed_messages_loop(topic=topic, job_period=job_period), + receive_unconfirmed_messages(topic=topic), + sync_unconfirmed_messages_loop(job_period=job_period), + ) + + +def sync_unconfirmed_messages_subprocess(config_values: Dict, api_servers: List): + proctitle = "aleph.jobs.sync_unconfirmed_messages" + setproctitle(proctitle) + loop, config = prepare_loop(config_values) + + sentry_sdk.init( + dsn=config.sentry.dsn.value, + traces_sample_rate=config.sentry.traces_sample_rate.value, + ignore_errors=[KeyboardInterrupt], + ) + setup_logging( + loglevel=config.logging.level.value, + filename=f"/tmp/{proctitle.replace('.', '-')}.log", + max_log_file_size=config.logging.max_log_file_size.value, + ) + singleton.api_servers = api_servers + + loop.run_until_complete(run_sync_message_tasks(config)) diff --git a/src/aleph/model/p2p.py b/src/aleph/model/p2p.py index 8c0db47aa..517fb573b 100644 --- a/src/aleph/model/p2p.py +++ b/src/aleph/model/p2p.py @@ -21,6 +21,16 @@ class Peer(BaseClass): IndexModel([("last_seen", DESCENDING)]), ] + @classmethod + async def get_peer_address(cls, peer_id: str, peer_type: str) -> Optional[str]: + peer = await cls.collection.find_one( + {"sender": peer_id, "type": peer_type}, {"_id": 0, "address": 1} + ) + if peer is None: + return None + + return peer["address"] + async def get_peers(peer_type: Optional[str] = None, hours: int = 2) -> AsyncIterator[str]: """Returns current peers. diff --git a/src/aleph/model/unconfirmed_messages.py b/src/aleph/model/unconfirmed_messages.py new file mode 100644 index 000000000..068693f39 --- /dev/null +++ b/src/aleph/model/unconfirmed_messages.py @@ -0,0 +1,15 @@ +from pymongo import ASCENDING, DESCENDING, IndexModel + +from aleph.model.base import BaseClass + + +class UnconfirmedMessage(BaseClass): + """ + Synchronization messages sent by other nodes. Each document contains the hashes of + unconfirmed messages for one peer on the network. + + Refer to the unconfirmed messages synchronization job for more details. + """ + + COLLECTION = "unconfirmed_messages" + INDEXES = [IndexModel("peer_id", unique=True)] diff --git a/src/aleph/services/p2p/http.py b/src/aleph/services/p2p/http.py index 4cca035ff..4c404c1e7 100644 --- a/src/aleph/services/p2p/http.py +++ b/src/aleph/services/p2p/http.py @@ -6,7 +6,7 @@ import base64 import logging from random import sample -from typing import Optional +from typing import Optional, Dict import aiohttp @@ -45,6 +45,18 @@ async def api_get_request(base_uri, method, timeout=1): return result +async def get_messages_from_peer( + peer_uri: str, item_hash: str, timeout: int +) -> Optional[Dict]: + result = await api_get_request( + base_uri=peer_uri, method=f"messages.json?hashes={item_hash}", timeout=timeout + ) + if result is None: + return None + + return result["messages"] + + async def get_peer_hash_content( base_uri: str, item_hash: str, timeout: int = 1 ) -> Optional[bytes]: diff --git a/src/aleph/toolkit/split.py b/src/aleph/toolkit/split.py index be892811a..7cd52ca1d 100644 --- a/src/aleph/toolkit/split.py +++ b/src/aleph/toolkit/split.py @@ -5,6 +5,13 @@ def split_iterable(iterable: Iterable[T], cond: Callable[[T], bool]) -> Tuple[List[T], List[T]]: + """ + Splits an iterable in two based on the condition and returns the two lists as + a (matches, others) tuple. + :param iterable: The iterable to split. + :param cond: A condition to verify for each element of the iterable. + """ + matches = [] others = [] diff --git a/src/aleph/toolkit/string.py b/src/aleph/toolkit/string.py new file mode 100644 index 000000000..499ad8628 --- /dev/null +++ b/src/aleph/toolkit/string.py @@ -0,0 +1,17 @@ +def truncate_log(s: str, max_length: int, suffix: str = "...") -> str: + """ + Truncates a string down to the specified length. Appends a user-defined suffix + if the string is truncated. The string is returned unmodified if it is shorter + than the specified maximum length. + + :param s: String to truncate. + :param max_length: Maximum size of the returned string. + :param suffix: String to be appended to the base string if it is truncated. + :return: A string of len <= max_length where the end of the original string + is replaced by the suffix if it is longer than max_length. + """ + + if len(s) <= max_length: + return s + + return s[: max_length - len(suffix)] + suffix diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/api/test_messages.py b/tests/api/test_messages.py index ad9a07f7a..34fe99e58 100644 --- a/tests/api/test_messages.py +++ b/tests/api/test_messages.py @@ -1,12 +1,7 @@ import itertools -import json -from pathlib import Path from typing import Dict, Iterable, List import pytest -import pytest_asyncio - -from aleph.model.messages import Message MESSAGES_URI = "/api/v0/messages.json" @@ -29,18 +24,6 @@ def assert_messages_equal(messages: Iterable[Dict], expected_messages: Iterable[ assert message["signature"] == expected_message["signature"] -@pytest_asyncio.fixture -async def fixture_messages(test_db): - fixtures_dir = Path(__file__).parent / "fixtures" - fixtures_file = fixtures_dir / "fixture_messages.json" - - with fixtures_file.open() as f: - messages = json.load(f) - - await Message.collection.insert_many(messages) - return messages - - @pytest.mark.asyncio async def test_get_messages(fixture_messages, ccn_api_client): response = await ccn_api_client.get(MESSAGES_URI) diff --git a/tests/conftest.py b/tests/conftest.py index d433e1d03..8ffba1b93 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,5 +1,6 @@ import asyncio -import os +import json +from pathlib import Path import pymongo import pytest_asyncio @@ -7,6 +8,7 @@ from aleph.config import get_defaults from aleph.model import init_db +from aleph.model.messages import Message from aleph.web import create_app TEST_DB = "ccn_automated_tests" @@ -30,6 +32,7 @@ async def test_db(): init_db(config, ensure_indexes=True) from aleph.model import db + yield db @@ -43,3 +46,20 @@ async def ccn_api_client(aiohttp_client): client = await aiohttp_client(app) return client + + +@pytest_asyncio.fixture +async def fixture_messages(test_db): + """ + Some generic message fixtures to insert in the messages collection for + tests fetching data from the DB. + """ + + fixtures_dir = Path(__file__).parent / "fixtures" + fixtures_file = fixtures_dir / "fixture_messages.json" + + with fixtures_file.open() as f: + messages = json.load(f) + + await Message.collection.insert_many(messages) + return messages diff --git a/tests/api/fixtures/fixture_messages.json b/tests/fixtures/fixture_messages.json similarity index 100% rename from tests/api/fixtures/fixture_messages.json rename to tests/fixtures/fixture_messages.json diff --git a/tests/message_processing/conftest.py b/tests/message_processing/conftest.py index 1b6b8e9fc..702204bf0 100644 --- a/tests/message_processing/conftest.py +++ b/tests/message_processing/conftest.py @@ -4,5 +4,5 @@ @pytest.fixture -def fixture_messages(): +def fixture_chain_data(): return load_fixture_messages("test-data-pending-tx-messages.json") diff --git a/tests/message_processing/sync_unconfirmed_messages/__init__.py b/tests/message_processing/sync_unconfirmed_messages/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/message_processing/sync_unconfirmed_messages/test_aggregate_unconfirmed_messages.py b/tests/message_processing/sync_unconfirmed_messages/test_aggregate_unconfirmed_messages.py new file mode 100644 index 000000000..0a3529cd1 --- /dev/null +++ b/tests/message_processing/sync_unconfirmed_messages/test_aggregate_unconfirmed_messages.py @@ -0,0 +1,74 @@ +import pytest + +from aleph.jobs.sync_unconfirmed_messages import aggregate_unconfirmed_hashes +from aleph.model.messages import Message +from aleph.model.unconfirmed_messages import UnconfirmedMessage + + +@pytest.mark.asyncio +async def test_aggregate_unconfirmed_messages(test_db): + await UnconfirmedMessage.collection.insert_many( + [ + {"sender": "1", "hashes": ["123", "456", "789"], "reception_time": 1000000}, + {"sender": "2", "hashes": ["123", "789", "abc"], "reception_time": 1000000}, + ] + ) + + unconfirmed_message_sources = await aggregate_unconfirmed_hashes(from_time=0) + expected_unconfirmed_sources = { + "123": ["1", "2"], + "456": ["1"], + "789": ["1", "2"], + "abc": ["2"], + } + + assert unconfirmed_message_sources == expected_unconfirmed_sources + + +@pytest.mark.asyncio +async def test_aggregate_unconfirmed_messages_already_present(test_db): + """ + Tests that messages already present on the local node are ignored when aggregating + unconfirmed messages sent by the network. + """ + + await UnconfirmedMessage.collection.insert_many( + [ + {"sender": "1", "hashes": ["123", "456", "789"], "reception_time": 1000000}, + {"sender": "2", "hashes": ["123", "789", "abc"], "reception_time": 1000000}, + ] + ) + + await Message.collection.insert_many([{"item_hash": "123"}, {"item_hash": "abc"}]) + + unconfirmed_message_sources = await aggregate_unconfirmed_hashes(from_time=0) + expected_unconfirmed_sources = { + "456": ["1"], + "789": ["1", "2"], + } + + assert unconfirmed_message_sources == expected_unconfirmed_sources + + +@pytest.mark.asyncio +async def test_aggregate_unconfirmed_messages_ignore_old_data(test_db): + """ + Tests that messages already present on the local node are ignored when aggregating + unconfirmed messages sent by the network. + """ + + await UnconfirmedMessage.collection.insert_many( + [ + {"sender": "1", "hashes": ["123", "456", "789"], "reception_time": 100000}, + {"sender": "2", "hashes": ["123", "789", "abc"], "reception_time": 1000000}, + ] + ) + + unconfirmed_message_sources = await aggregate_unconfirmed_hashes(from_time=1000000) + expected_unconfirmed_sources = { + "123": ["2"], + "789": ["2"], + "abc": ["2"], + } + + assert unconfirmed_message_sources == expected_unconfirmed_sources diff --git a/tests/message_processing/sync_unconfirmed_messages/test_list_unconfirmed_messages.py b/tests/message_processing/sync_unconfirmed_messages/test_list_unconfirmed_messages.py new file mode 100644 index 000000000..c2661a87f --- /dev/null +++ b/tests/message_processing/sync_unconfirmed_messages/test_list_unconfirmed_messages.py @@ -0,0 +1,27 @@ +import pytest +from aleph.jobs.sync_unconfirmed_messages import list_unconfirmed_message_hashes +import datetime as dt + + +@pytest.mark.asyncio +async def test_list_unconfirmed_message_hashes(test_db, fixture_messages): + # List all unconfirmed messages + # TODO: update this by 3999, Dec 31st + in_a_long_time = dt.datetime(4000, 1, 1) + + expected_hashes = { + "9200cfab5950e5d173f07d7c61bb0524675d0305e808590e7d0a0752ce65f791", + "4c33dd1ebf61bbb4342d8258b591fcd52cca73fd7c425542f78311d8f45ba274", + } + hashes = await list_unconfirmed_message_hashes( + older_than=in_a_long_time.timestamp(), limit=1000 + ) + assert set(hashes) == expected_hashes + + # List only one message using older_than + filtered_hashes = await list_unconfirmed_message_hashes(older_than=1652126600, limit=1000) + assert filtered_hashes == ["9200cfab5950e5d173f07d7c61bb0524675d0305e808590e7d0a0752ce65f791"] + + # List 0 messages + no_hashes = await list_unconfirmed_message_hashes(older_than=0, limit=1000) + assert no_hashes == [] diff --git a/tests/message_processing/test_perform_db_operations.py b/tests/message_processing/test_perform_db_operations.py index 413e4d211..43cb0ce45 100644 --- a/tests/message_processing/test_perform_db_operations.py +++ b/tests/message_processing/test_perform_db_operations.py @@ -61,7 +61,7 @@ async def test_db_operations_delete_one(test_db): @pytest.mark.asyncio -async def test_db_operations_insert_and_delete(test_db, fixture_messages): +async def test_db_operations_insert_and_delete(test_db, fixture_chain_data): """ Test a typical case where we insert several messages and delete a pending TX. """ @@ -72,7 +72,7 @@ async def test_db_operations_insert_and_delete(test_db, fixture_messages): db_operations = [ DbBulkOperation(collection=PendingMessage, operation=InsertOne(msg)) - for msg in fixture_messages + for msg in fixture_chain_data ] db_operations.append( @@ -89,13 +89,13 @@ async def test_db_operations_insert_and_delete(test_db, fixture_messages): tx_end_count = await PendingTX.count({}) msg_end_count = await PendingMessage.count({}) assert tx_end_count - tx_start_count == -1 - assert msg_end_count - msg_start_count == len(fixture_messages) + assert msg_end_count - msg_start_count == len(fixture_chain_data) # Check each message - fixture_messages_by_hash = {msg["item_hash"]: msg for msg in fixture_messages} + fixture_messages_by_hash = {msg["item_hash"]: msg for msg in fixture_chain_data} async for pending_msg in PendingMessage.collection.find( - {"message.item_hash": {"$in": [msg["item_hash"] for msg in fixture_messages]}} + {"message.item_hash": {"$in": [msg["item_hash"] for msg in fixture_chain_data]}} ): pending_message = pending_msg["message"] expected_message = fixture_messages_by_hash[pending_message["item_hash"]] diff --git a/tests/models/test_peer.py b/tests/models/test_peer.py new file mode 100644 index 000000000..79cacc81f --- /dev/null +++ b/tests/models/test_peer.py @@ -0,0 +1,29 @@ +import pytest +from aleph.model.p2p import Peer +import datetime as dt + + +@pytest.mark.asyncio +async def test_get_peer_type_no_match(test_db): + peer_address = await Peer.get_peer_address(peer_id="123", peer_type="HTTP") + assert peer_address is None + + +@pytest.mark.asyncio +async def test_get_peer_type_match(test_db): + peer_id = "123" + http_address = "http://127.0.0.1:4024" + + await Peer.collection.insert_one( + { + "address": http_address, + "type": "HTTP", + "last_seen": dt.datetime.utcnow(), + "sender": peer_id, + "source": "p2p", + } + ) + + peer_address = await Peer.get_peer_address(peer_id=peer_id, peer_type="HTTP") + assert peer_address is not None + assert peer_address == http_address