diff --git a/packaging/aleph-vm/etc/ipfs/kubo.json b/packaging/aleph-vm/etc/ipfs/kubo.json deleted file mode 100644 index 9957b142e..000000000 --- a/packaging/aleph-vm/etc/ipfs/kubo.json +++ /dev/null @@ -1,17 +0,0 @@ -{ - "AutoNAT": { - "ServiceMode": "disabled" - }, - "AddrFilters": [ - "/ip4/86.84.0.0/ipcidr/16" - ], - "Reprovider": { - "Strategy": "roots" - }, - "Swarm": { - "EnableHolePunching":false, - "RelayService": { - "Enabled": false - } - } -} diff --git a/packaging/aleph-vm/etc/systemd/system/ipfs.service b/packaging/aleph-vm/etc/systemd/system/ipfs.service index 2009361e3..0708ae8f1 100644 --- a/packaging/aleph-vm/etc/systemd/system/ipfs.service +++ b/packaging/aleph-vm/etc/systemd/system/ipfs.service @@ -50,6 +50,11 @@ ProtectHome=true RemoveIPC=true RestrictSUIDSGID=true CapabilityBoundingSet=CAP_NET_BIND_SERVICE +# set memory limit to avoid taking all the CRN ressource and getting OOM +# https://github.com/ipfs/kubo/blob/master/docs/config.md#swarmresourcemgrmaxmemory +Environment=GOMEMLIMIT=1900m +MemoryHigh=2G +MemoryMax=4G # enable for 1-1024 port listening #AmbientCapabilities=CAP_NET_BIND_SERVICE @@ -76,7 +81,11 @@ Type=notify User=ipfs Group=ipfs Environment=IPFS_PATH="/var/lib/ipfs" -ExecStart=/opt/kubo/ipfs daemon --init --migrate --init-profile=server --config-file /etc/ipfs/kubo.json +ExecStartPre=/opt/kubo/ipfs init +ExecStartPre=/opt/kubo/ipfs config --json Gateway.PublicGateways '{"localhost": {"UseSubdomains": false, "Paths": ["/ipfs", "/ipns"]}}' +ExecStartPre=/opt/kubo/ipfs config --json Reprovider.Strategy '"roots"' +ExecStartPre=/opt/kubo/ipfs config --json Swarm.ResourceMgr '{"MaxMemory" : "1GB"}' +ExecStart=/opt/kubo/ipfs daemon --migrate=true --init-profile=server Restart=on-failure KillSignal=SIGINT diff --git a/src/aleph/vm/conf.py b/src/aleph/vm/conf.py index 0c6d9cbec..5de7816c0 100644 --- a/src/aleph/vm/conf.py +++ b/src/aleph/vm/conf.py @@ -126,6 +126,7 @@ class Settings(BaseSettings): WATCH_FOR_UPDATES: bool = True API_SERVER: str = "https://official.aleph.cloud" + IPFS_SERVER: Url = Url("http://localhost:8080/ipfs") # Connect to the Quad9 VPN provider using their IPv4 and IPv6 addresses. CONNECTIVITY_IPV4_URL: str = "https://9.9.9.9/" CONNECTIVITY_IPV6_URL: str = "https://[2620:fe::fe]/" diff --git a/src/aleph/vm/orchestrator/messages.py b/src/aleph/vm/orchestrator/messages.py index 5ae67102c..f1826ba2c 100644 --- a/src/aleph/vm/orchestrator/messages.py +++ b/src/aleph/vm/orchestrator/messages.py @@ -7,13 +7,13 @@ from aleph_message.status import MessageStatus from aleph.vm.conf import settings -from aleph.vm.storage import get_latest_amend, get_message +from aleph.vm.storage import get_executable_message, get_latest_amend async def try_get_message(ref: str) -> ExecutableMessage: """Get the message or raise an aiohttp HTTP error""" try: - return await get_message(ref) + return await get_executable_message(ref) except ClientConnectorError as error: raise HTTPServiceUnavailable(reason="Aleph Connector unavailable") from error except ClientResponseError as error: diff --git a/src/aleph/vm/storage.py b/src/aleph/vm/storage.py index 58d6f78c2..a6eef447c 100644 --- a/src/aleph/vm/storage.py +++ b/src/aleph/vm/storage.py @@ -10,7 +10,6 @@ import logging import re import sys -import uuid from datetime import datetime, timezone from pathlib import Path from shutil import copy2, make_archive @@ -18,9 +17,12 @@ import aiohttp from aleph_message.models import ( + AlephMessage, InstanceMessage, ItemHash, + ItemType, ProgramMessage, + StoreMessage, parse_message, ) from aleph_message.models.execution.instance import RootfsVolume @@ -133,6 +135,32 @@ async def download_file(url: str, local_path: Path) -> None: tmp_path.unlink(missing_ok=True) +async def download_file_from_ipfs_or_connector(ref: str, cache_path: Path, filetype: str) -> None: + """Download a file from the IPFS Gateway if possible, else from the vm-connector.""" + + if cache_path.is_file(): + logger.debug(f"File already exists: {cache_path}") + return + + message: StoreMessage = await get_store_message(ref) + + if message.content.item_type == ItemType.ipfs: + # Download IPFS files from the IPFS gateway directly + cid = message.content.item_hash + url = f"{settings.IPFS_SERVER}/{cid}" + await download_file(url, cache_path) + else: + # Download via the vm-connector + path_mapping = { + "runtime": "/download/runtime", + "code": "/download/code", + "data": "/download/data", + } + path = path_mapping[filetype] + url = f"{settings.CONNECTOR_URL}{path}/{ref}" + await download_file(url, cache_path) + + async def get_latest_amend(item_hash: str) -> str: if settings.FAKE_DATA_PROGRAM: return item_hash @@ -146,7 +174,26 @@ async def get_latest_amend(item_hash: str) -> str: return result or item_hash -async def get_message(ref: str) -> ProgramMessage | InstanceMessage: +async def load_message(path: Path) -> AlephMessage: + """Load a message from the cache on disk.""" + with open(path) as cache_file: + msg = json.load(cache_file) + + if path in (settings.FAKE_DATA_MESSAGE, settings.FAKE_INSTANCE_MESSAGE): + # Ensure validation passes while tweaking message content + msg = fix_message_validation(msg) + + return parse_message(message_dict=msg) + + +async def get_message(ref: str) -> AlephMessage: + cache_path = (Path(settings.MESSAGE_CACHE) / ref).with_suffix(".json") + url = f"{settings.CONNECTOR_URL}/download/message/{ref}" + await download_file(url, cache_path) + return await load_message(cache_path) + + +async def get_executable_message(ref: str) -> ProgramMessage | InstanceMessage: if ref == settings.FAKE_INSTANCE_ID: logger.debug("Using the fake instance message since the ref matches") cache_path = settings.FAKE_INSTANCE_MESSAGE @@ -158,23 +205,22 @@ async def get_message(ref: str) -> ProgramMessage | InstanceMessage: url = f"{settings.CONNECTOR_URL}/download/message/{ref}" await download_file(url, cache_path) - with open(cache_path) as cache_file: - msg = json.load(cache_file) + return await load_message(cache_path) - if cache_path in (settings.FAKE_DATA_MESSAGE, settings.FAKE_INSTANCE_MESSAGE): - # Ensure validation passes while tweaking message content - msg = fix_message_validation(msg) - result = parse_message(message_dict=msg) - assert isinstance(result, InstanceMessage | ProgramMessage), "Parsed message is not executable" - return result +async def get_store_message(ref: str) -> StoreMessage: + message = await get_message(ref) + if not isinstance(message, StoreMessage): + msg = f"Expected a store message, got {message.type}" + raise ValueError(msg) + return message async def get_code_path(ref: str) -> Path: if settings.FAKE_DATA_PROGRAM: archive_path = Path(settings.FAKE_DATA_PROGRAM) - encoding: Encoding = (await get_message(ref="fake-message")).content.code.encoding + encoding: Encoding = (await get_executable_message(ref="fake-message")).content.code.encoding if encoding == Encoding.squashfs: squashfs_path = Path(archive_path.name + ".squashfs") squashfs_path.unlink(missing_ok=True) @@ -191,8 +237,7 @@ async def get_code_path(ref: str) -> Path: raise ValueError(msg) cache_path = Path(settings.CODE_CACHE) / ref - url = f"{settings.CONNECTOR_URL}/download/code/{ref}" - await download_file(url, cache_path) + await download_file_from_ipfs_or_connector(ref, cache_path, "code") return cache_path @@ -203,8 +248,7 @@ async def get_data_path(ref: str) -> Path: return Path(f"{data_dir}.zip") cache_path = Path(settings.DATA_CACHE) / ref - url = f"{settings.CONNECTOR_URL}/download/data/{ref}" - await download_file(url, cache_path) + await download_file_from_ipfs_or_connector(ref, cache_path, "data") return cache_path @@ -224,11 +268,7 @@ async def get_runtime_path(ref: str) -> Path: return Path(settings.FAKE_DATA_RUNTIME) cache_path = Path(settings.RUNTIME_CACHE) / ref - url = f"{settings.CONNECTOR_URL}/download/runtime/{ref}" - - if not cache_path.is_file(): - # File does not exist, download it - await download_file(url, cache_path) + await download_file_from_ipfs_or_connector(ref, cache_path, "runtime") await check_squashfs_integrity(cache_path) await chown_to_jailman(cache_path) @@ -242,8 +282,10 @@ async def get_rootfs_base_path(ref: ItemHash) -> Path: return Path(settings.FAKE_INSTANCE_BASE) cache_path = Path(settings.RUNTIME_CACHE) / ref - url = f"{settings.CONNECTOR_URL}/download/runtime/{ref}" - await download_file(url, cache_path) + + # if not cache_path.is_file(): + await download_file_from_ipfs_or_connector(ref, cache_path, "runtime") + await chown_to_jailman(cache_path) return cache_path @@ -364,8 +406,8 @@ async def get_existing_file(ref: str) -> Path: return Path(settings.FAKE_DATA_VOLUME) cache_path = Path(settings.DATA_CACHE) / ref - url = f"{settings.CONNECTOR_URL}/download/data/{ref}" - await download_file(url, cache_path) + await download_file_from_ipfs_or_connector(ref, cache_path, "data") + await chown_to_jailman(cache_path) return cache_path diff --git a/tests/supervisor/test_checkpayment.py b/tests/supervisor/test_checkpayment.py index 3671114de..d554b7bc7 100644 --- a/tests/supervisor/test_checkpayment.py +++ b/tests/supervisor/test_checkpayment.py @@ -149,6 +149,7 @@ async def compute_required_flow(executions): async def test_not_enough_flow(mocker, fake_instance_content): mocker.patch.object(settings, "ALLOW_VM_NETWORKING", False) mocker.patch.object(settings, "PAYMENT_RECEIVER_ADDRESS", "0xD39C335404a78E0BDCf6D50F29B86EFd57924288") + mocker.patch.object(settings, "IPFS_SERVER", "https://ipfs.io/ipfs") mock_community_wallet_address = "0x23C7A99d7AbebeD245d044685F1893aeA4b5Da90" mocker.patch("aleph.vm.orchestrator.tasks.get_community_wallet_address", return_value=mock_community_wallet_address) diff --git a/tests/supervisor/test_execution.py b/tests/supervisor/test_execution.py index b064a084a..812404441 100644 --- a/tests/supervisor/test_execution.py +++ b/tests/supervisor/test_execution.py @@ -12,7 +12,7 @@ from aleph.vm.models import VmExecution from aleph.vm.orchestrator import metrics from aleph.vm.orchestrator.messages import load_updated_message -from aleph.vm.storage import get_message +from aleph.vm.storage import get_executable_message from aleph.vm.utils import fix_message_validation @@ -33,6 +33,7 @@ async def test_create_execution(mocker): mock_settings.FAKE_DATA_PROGRAM = mock_settings.BENCHMARK_FAKE_DATA_PROGRAM mock_settings.ALLOW_VM_NETWORKING = False mock_settings.USE_JAILER = False + mock_settings.IPFS_SERVER = "https://ipfs.io/ipfs" logging.basicConfig(level=logging.DEBUG) mock_settings.PRINT_SYSTEM_LOGS = True @@ -46,7 +47,7 @@ async def test_create_execution(mocker): await metrics.create_tables(engine) vm_hash = ItemHash("cafecafecafecafecafecafecafecafecafecafecafecafecafecafecafecafe") - message = await get_message(ref=vm_hash) + message = await get_executable_message(ref=vm_hash) execution = VmExecution( vm_hash=vm_hash, @@ -78,6 +79,7 @@ async def test_create_execution_online(vm_hash: ItemHash = None): """ vm_hash = vm_hash or settings.CHECK_FASTAPI_VM_ID + settings.IPFS_SERVER = "https://ipfs.io/ipfs" # Ensure that the settings are correct and required files present. settings.setup() diff --git a/tests/supervisor/test_instance.py b/tests/supervisor/test_instance.py index 1fc1f12ba..69e0d0fa2 100644 --- a/tests/supervisor/test_instance.py +++ b/tests/supervisor/test_instance.py @@ -13,7 +13,7 @@ from aleph.vm.models import VmExecution from aleph.vm.network.hostnetwork import Network, make_ipv6_allocator from aleph.vm.orchestrator import metrics -from aleph.vm.storage import get_message +from aleph.vm.storage import get_executable_message from aleph.vm.systemd import SystemDManager from aleph.vm.vm_type import VmType @@ -55,6 +55,7 @@ async def test_create_instance(): # settings.FAKE_INSTANCE_MESSAGE settings.ALLOW_VM_NETWORKING = True settings.USE_JAILER = True + settings.IPFS_SERVER = "https://ipfs.io/ipfs" logging.basicConfig(level=logging.DEBUG) settings.PRINT_SYSTEM_LOGS = True @@ -70,7 +71,7 @@ async def test_create_instance(): await metrics.create_tables(engine) vm_hash = ItemHash(settings.FAKE_INSTANCE_ID) - message = await get_message(ref=vm_hash) + message = await get_executable_message(ref=vm_hash) mock_systemd_manager = MockSystemDManager() diff --git a/tests/supervisor/test_qemu_instance.py b/tests/supervisor/test_qemu_instance.py index 56d4fc145..1e76d19a4 100644 --- a/tests/supervisor/test_qemu_instance.py +++ b/tests/supervisor/test_qemu_instance.py @@ -13,7 +13,7 @@ from aleph.vm.models import VmExecution from aleph.vm.network.hostnetwork import Network, make_ipv6_allocator from aleph.vm.orchestrator import metrics -from aleph.vm.storage import get_message +from aleph.vm.storage import get_executable_message from aleph.vm.systemd import SystemDManager from aleph.vm.vm_type import VmType @@ -69,7 +69,7 @@ async def test_create_qemu_instance(): await metrics.create_tables(engine) vm_hash = ItemHash(settings.FAKE_INSTANCE_ID) - message = await get_message(ref=vm_hash) + message = await get_executable_message(ref=vm_hash) mock_systemd_manager = MockSystemDManager() @@ -112,6 +112,7 @@ async def test_create_qemu_instance_online(): settings.ENABLE_CONFIDENTIAL_COMPUTING = False settings.ALLOW_VM_NETWORKING = True settings.USE_JAILER = False + settings.IPFS_SERVER = "https://ipfs.io/ipfs" logging.basicConfig(level=logging.DEBUG) @@ -126,7 +127,7 @@ async def test_create_qemu_instance_online(): await metrics.create_tables(engine) vm_hash = ItemHash(settings.FAKE_INSTANCE_ID) - message = await get_message(ref=vm_hash) + message = await get_executable_message(ref=vm_hash) mock_systemd_manager = MockSystemDManager() diff --git a/tests/supervisor/views/test_operator.py b/tests/supervisor/views/test_operator.py index 8a6c70485..51ad5323d 100644 --- a/tests/supervisor/views/test_operator.py +++ b/tests/supervisor/views/test_operator.py @@ -14,7 +14,7 @@ from aleph.vm.conf import settings from aleph.vm.orchestrator.metrics import ExecutionRecord from aleph.vm.orchestrator.supervisor import setup_webapp -from aleph.vm.storage import get_message +from aleph.vm.storage import get_executable_message from aleph.vm.utils.logs import EntryDict from aleph.vm.utils.test_helpers import ( generate_signer_and_signed_headers_for_operation, @@ -72,7 +72,7 @@ async def test_operator_confidential_initialize_already_running(aiohttp_client, settings.setup() vm_hash = ItemHash(settings.FAKE_INSTANCE_ID) - instance_message = await get_message(ref=vm_hash) + instance_message = await get_executable_message(ref=vm_hash) fake_vm_pool = mocker.Mock( executions={ @@ -115,7 +115,7 @@ async def test_operator_expire(aiohttp_client, mocker): settings.setup() vm_hash = ItemHash(settings.FAKE_INSTANCE_ID) - instance_message = await get_message(ref=vm_hash) + instance_message = await get_executable_message(ref=vm_hash) fake_vm_pool = mocker.Mock( executions={ @@ -154,7 +154,7 @@ async def test_operator_stop(aiohttp_client, mocker): settings.setup() vm_hash = ItemHash(settings.FAKE_INSTANCE_ID) - instance_message = await get_message(ref=vm_hash) + instance_message = await get_executable_message(ref=vm_hash) fake_vm_pool = mocker.AsyncMock( executions={ @@ -190,7 +190,7 @@ async def test_operator_confidential_initialize_not_confidential(aiohttp_client, settings.setup() vm_hash = ItemHash(settings.FAKE_INSTANCE_ID) - instance_message = await get_message(ref=vm_hash) + instance_message = await get_executable_message(ref=vm_hash) fake_vm_pool = mocker.Mock( executions={ @@ -232,7 +232,7 @@ async def test_operator_confidential_initialize(aiohttp_client): settings.setup() vm_hash = ItemHash(settings.FAKE_INSTANCE_ID) - instance_message = await get_message(ref=vm_hash) + instance_message = await get_executable_message(ref=vm_hash) class FakeExecution: message = instance_message.content