diff --git a/packaging/Makefile b/packaging/Makefile index 43d8a0017..c9dd6ae85 100644 --- a/packaging/Makefile +++ b/packaging/Makefile @@ -15,7 +15,7 @@ debian-package-code: cp ../examples/instance_message_from_aleph.json ./aleph-vm/opt/aleph-vm/examples/instance_message_from_aleph.json cp -r ../examples/data ./aleph-vm/opt/aleph-vm/examples/data mkdir -p ./aleph-vm/opt/aleph-vm/examples/volumes - pip3 install --target ./aleph-vm/opt/aleph-vm/ 'aleph-message==0.4.4' 'jwskate==0.8.0' 'eth-account==0.9.0' 'sentry-sdk==1.31.0' 'qmp==1.1.0' 'superfluid==0.2.1' 'sqlalchemy[asyncio]' 'aiosqlite==0.19.0' 'alembic==1.13.1' 'aiohttp_cors==0.7.0' 'pyroute2==0.7.12' + pip3 install --target ./aleph-vm/opt/aleph-vm/ 'aleph-message==0.4.4' 'jwskate==0.8.0' 'eth-account==0.9.0' 'sentry-sdk==1.31.0' 'qmp==1.1.0' 'superfluid==0.2.1' 'sqlalchemy[asyncio]' 'aiosqlite==0.19.0' 'alembic==1.13.1' 'aiohttp_cors==0.7.0' 'pyroute2==0.7.12' 'dbus-fast==1.90.1' python3 -m compileall ./aleph-vm/opt/aleph-vm/ debian-package-resources: firecracker-bins vmlinux download-ipfs-kubo diff --git a/packaging/aleph-vm/DEBIAN/control b/packaging/aleph-vm/DEBIAN/control index 6b42eea41..92d7f9777 100644 --- a/packaging/aleph-vm/DEBIAN/control +++ b/packaging/aleph-vm/DEBIAN/control @@ -3,6 +3,6 @@ Version: 0.1.8 Architecture: all Maintainer: Aleph.im Description: Aleph.im VM execution engine -Depends: python3,python3-pip,python3-aiohttp,python3-msgpack,python3-aiodns,python3-alembic,python3-sqlalchemy,python3-setproctitle,redis,python3-aioredis,python3-psutil,sudo,acl,curl,systemd-container,squashfs-tools,debootstrap,python3-packaging,python3-cpuinfo,python3-nftables,python3-jsonschema,cloud-image-utils,ndppd,python3-yaml,python3-dotenv,python3-schedule,qemu-system-x86,qemu-utils,python3-systemd,python3-dbus,btrfs-progs,nftables,python3-jwcrypto +Depends: python3,python3-pip,python3-aiohttp,python3-msgpack,python3-aiodns,python3-alembic,python3-sqlalchemy,python3-setproctitle,redis,python3-aioredis,python3-psutil,sudo,acl,curl,systemd-container,squashfs-tools,debootstrap,python3-packaging,python3-cpuinfo,python3-nftables,python3-jsonschema,cloud-image-utils,ndppd,python3-yaml,python3-dotenv,python3-schedule,qemu-system-x86,qemu-utils,python3-systemd,btrfs-progs,nftables,python3-jwcrypto Section: aleph-im Priority: Extra diff --git a/pyproject.toml b/pyproject.toml index 95ed874a5..d294fa960 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,8 +41,7 @@ dependencies = [ "packaging==23.2", "jsonschema==4.19.1", "qmp==0.0.1", - "dbus-python==1.3.2", - "systemd-python==235", + "dbus-fast==1.90.1", "systemd-python==235", "superfluid~=0.2.1", "sqlalchemy[asyncio]>=2.0", diff --git a/src/aleph/vm/models.py b/src/aleph/vm/models.py index 5a44c132a..3b222c455 100644 --- a/src/aleph/vm/models.py +++ b/src/aleph/vm/models.py @@ -86,11 +86,7 @@ class VmExecution: @property def is_running(self) -> bool: - return ( - self.systemd_manager.is_service_active(self.controller_service) - if self.persistent and self.systemd_manager - else bool(self.times.starting_at and not self.times.stopping_at) - ) + return bool(self.times.starting_at and not self.times.stopping_at) @property def is_stopping(self) -> bool: diff --git a/src/aleph/vm/orchestrator/supervisor.py b/src/aleph/vm/orchestrator/supervisor.py index 892106ba0..04b121d7a 100644 --- a/src/aleph/vm/orchestrator/supervisor.py +++ b/src/aleph/vm/orchestrator/supervisor.py @@ -140,12 +140,16 @@ async def stop_all_vms(app: web.Application): def run(): """Run the VM Supervisor.""" + # Loop creation set here to avoid bug with Future on different loop + + loop = asyncio.new_event_loop() + # apparently needed for Python 3.9 / Debian 11 + asyncio.set_event_loop(loop) settings.check() engine = setup_engine() asyncio.run(create_tables(engine)) - loop = asyncio.new_event_loop() pool = VmPool(loop) pool.setup() @@ -170,10 +174,10 @@ def run(): app.on_cleanup.append(stop_all_vms) logger.info("Loading existing executions ...") - asyncio.run(pool.load_persistent_executions()) + loop.run_until_complete(pool.load_persistent_executions()) logger.info(f"Starting the web server on http://{settings.SUPERVISOR_HOST}:{settings.SUPERVISOR_PORT}") - web.run_app(app, host=settings.SUPERVISOR_HOST, port=settings.SUPERVISOR_PORT) + web.run_app(app, host=settings.SUPERVISOR_HOST, port=settings.SUPERVISOR_PORT, loop=loop) except OSError as e: if e.errno == 98: logger.error( diff --git a/src/aleph/vm/orchestrator/views/operator.py b/src/aleph/vm/orchestrator/views/operator.py index 298486b73..5b8bb236d 100644 --- a/src/aleph/vm/orchestrator/views/operator.py +++ b/src/aleph/vm/orchestrator/views/operator.py @@ -174,7 +174,7 @@ async def operate_reboot(request: web.Request, authenticated_sender: str) -> web if execution.is_running: logger.info(f"Rebooting {execution.vm_hash}") if execution.persistent: - pool.systemd_manager.restart(execution.controller_service) + await pool.systemd_manager.restart(execution.controller_service) else: await pool.stop_vm(vm_hash) pool.forget_vm(vm_hash) diff --git a/src/aleph/vm/pool.py b/src/aleph/vm/pool.py index 3e5c5f3ec..c35312129 100644 --- a/src/aleph/vm/pool.py +++ b/src/aleph/vm/pool.py @@ -50,6 +50,7 @@ def __init__(self, loop: asyncio.AbstractEventLoop): self.executions = {} self.message_cache = {} + # apparently needed for Python 3.9 / Debian 11 asyncio.set_event_loop(loop) self.creation_lock = asyncio.Lock() @@ -124,7 +125,7 @@ async def create_a_vm( # Start VM and snapshots automatically if execution.persistent: - self.systemd_manager.enable_and_start(execution.controller_service) + await self.systemd_manager.enable_and_start(execution.controller_service) await execution.wait_for_init() if execution.is_program and execution.vm: await execution.vm.load_configuration() @@ -191,7 +192,7 @@ async def stop_vm(self, vm_hash: ItemHash) -> Optional[VmExecution]: async def stop_persistent_execution(self, execution: VmExecution): """Stop persistent VMs in the pool.""" assert execution.persistent, "Execution isn't persistent" - self.systemd_manager.stop_and_disable(execution.controller_service) + await self.systemd_manager.stop_and_disable(execution.controller_service) await execution.stop() def forget_vm(self, vm_hash: ItemHash) -> None: @@ -239,8 +240,10 @@ async def load_persistent_executions(self): persistent=saved_execution.persistent, ) - if execution.is_running: - # TODO: Improve the way that we re-create running execution + if await self.systemd_manager.is_service_active( + execution.controller_service + ): # TODO: Improve the way that we re-create running execution + logger.debug("Execution %s is still running in systemd, reconnecting", execution.vm_hash) await execution.prepare() if self.network: vm_type = VmType.from_message_content(execution.message) @@ -251,16 +254,21 @@ async def load_persistent_executions(self): vm = execution.create(vm_id=vm_id, tap_interface=tap_interface, prepare=False) await vm.start_guest_api() execution.ready_event.set() + execution.times.starting_at = execution.times.starting_at or datetime.now(tz=timezone.utc) execution.times.started_at = datetime.now(tz=timezone.utc) - + execution.times.stopping_at = None + execution.times.stopped_at = None self._schedule_forget_on_stop(execution) # Start the snapshot manager for the VM if vm.support_snapshot and self.snapshot_manager: await self.snapshot_manager.start_for(vm=execution.vm) + assert execution.is_running self.executions[vm_hash] = execution + else: + logger.debug(("Execution %s is not running in systemd, reconnecting", execution.vm_hash)) execution.uuid = saved_execution.uuid await execution.record_usage() diff --git a/src/aleph/vm/systemd.py b/src/aleph/vm/systemd.py index 001c4671d..3bd619bb7 100644 --- a/src/aleph/vm/systemd.py +++ b/src/aleph/vm/systemd.py @@ -3,10 +3,12 @@ """ import logging +from typing import Optional -import dbus -from dbus import DBusException, SystemBus -from dbus.proxies import Interface +from dbus_fast import BusType, DBusError +from dbus_fast.aio import MessageBus, ProxyObject + +from aleph.vm.systemd_helpers import UnitFileState, Mode, ActiveState, SystemdProxy, UnitProxy logger = logging.getLogger(__name__) @@ -17,60 +19,90 @@ class SystemDManager: Used to manage the systemd services on the host on Linux. """ - bus: SystemBus - manager: Interface + _bus: Optional[MessageBus] = None + _manager: Optional[SystemdProxy] = None def __init__(self): - self.bus = dbus.SystemBus() - systemd = self.bus.get_object("org.freedesktop.systemd1", "/org/freedesktop/systemd1") - self.manager = dbus.Interface(systemd, "org.freedesktop.systemd1.Manager") - - def stop_and_disable(self, service: str) -> None: - if self.is_service_active(service): - self.stop(service) - if self.is_service_enabled(service): - self.disable(service) - - def enable(self, service: str) -> None: - self.manager.EnableUnitFiles([service], False, True) + pass + + async def get_bus(self): + if self._bus is None: + self._bus = MessageBus(bus_type=BusType.SYSTEM) + await self._bus.connect() + return self._bus + + async def get_manager(self): + if self._manager is None: + bus = await self.get_bus() + path = "/org/freedesktop/systemd1" + bus_name = "org.freedesktop.systemd1" + introspect = await bus.introspect(bus_name, path) + systemd_proxy: ProxyObject = bus.get_proxy_object(bus_name, path, introspection=introspect) + interface = systemd_proxy.get_interface("org.freedesktop.systemd1.Manager") + # Check required method are implemented + assert isinstance(interface, SystemdProxy) + self._manager = interface + return self._manager + + async def enable(self, service: str) -> None: + manager = await self.get_manager() + await manager.call_enable_unit_files([service], False, True) logger.debug(f"Enabled {service} service") - def start(self, service: str) -> None: - self.manager.StartUnit(service, "replace") + async def start(self, service: str) -> None: + manager = await self.get_manager() + await manager.call_start_unit(service, Mode.REPLACE) logger.debug(f"Started {service} service") - def stop(self, service: str) -> None: - self.manager.StopUnit(service, "replace") + async def stop(self, service: str) -> None: + manager = await self.get_manager() + await manager.call_stop_unit(service, Mode.REPLACE) logger.debug(f"Stopped {service} service") - def restart(self, service: str) -> None: - self.manager.RestartUnit(service, "replace") + async def restart(self, service: str) -> None: + manager = await self.get_manager() + await manager.call_restart_unit(service, Mode.REPLACE) logger.debug(f"Restarted {service} service") - def disable(self, service: str) -> None: - self.manager.DisableUnitFiles([service], False) + async def disable(self, service: str) -> None: + manager = await self.get_manager() + await manager.call_disable_unit_files([service], False) logger.debug(f"Disabled {service} service") - def is_service_enabled(self, service: str) -> bool: + async def is_service_enabled(self, service: str) -> bool: + manager = await self.get_manager() try: - return self.manager.GetUnitFileState(service) == "enabled" - except DBusException as error: + state = await manager.call_get_unit_file_state(service) + return state == UnitFileState.ENABLED + except DBusError as error: logger.error(error) return False - def is_service_active(self, service: str) -> bool: + async def is_service_active(self, service: str) -> bool: + manager = await self.get_manager() try: - systemd_service = self.bus.get_object("org.freedesktop.systemd1", object_path=self.manager.GetUnit(service)) - unit = dbus.Interface(systemd_service, "org.freedesktop.systemd1.Unit") - unit_properties = dbus.Interface(unit, "org.freedesktop.DBus.Properties") - active_state = unit_properties.Get("org.freedesktop.systemd1.Unit", "ActiveState") - return active_state == "active" - except DBusException as error: + path = await manager.call_get_unit(service) + bus = await self.get_bus() + bus_name = "org.freedesktop.systemd1" + introspect = await bus.introspect(bus_name, path) + systemd_service = bus.get_proxy_object(bus_name, path, introspection=introspect) + unit = systemd_service.get_interface("org.freedesktop.systemd1.Unit") + # Check required method are implemented + assert isinstance(unit, UnitProxy) + active_state = await unit.get_active_state() + return active_state == ActiveState.ACTIVE + except DBusError as error: logger.error(error) return False - def enable_and_start(self, service: str) -> None: - if not self.is_service_enabled(service): - self.enable(service) - if not self.is_service_active(service): - self.start(service) + async def enable_and_start(self, service: str) -> None: + if not await self.is_service_enabled(service): + await self.enable(service) + if not await self.is_service_active(service): + await self.start(service) + + async def stop_and_disable(self, service: str) -> None: + if await self.is_service_active(service): + await self.stop(service) + if await self.is_service_enabled(service): + await self.disable(service) diff --git a/src/aleph/vm/systemd_helpers.py b/src/aleph/vm/systemd_helpers.py new file mode 100644 index 000000000..2dd6f9f72 --- /dev/null +++ b/src/aleph/vm/systemd_helpers.py @@ -0,0 +1,131 @@ +"""Typing helpers for talking to systemd via dbus + +The proxy object interface are determined at runtimes""" + +import enum +from typing import Literal, runtime_checkable, Protocol + + +class UnitFileState(str, enum.Enum): + """This StrEnum class represents the different possible states of a unit file.""" + + ENABLED = "enabled" + """Indicates that a unit file is permanently enabled.""" + + ENABLED_RUNTIME = "enabled-runtime" + """Indicates the unit file is only temporarily enabled and will no longer be enabled after a reboot + (that means, it is enabled via /run/ symlinks, rather than /etc/).""" + + LINKED = "linked" + """Indicates that a unit is linked into /etc/ permanently.""" + + LINKED_RUNTIME = "linked-runtime" + """Indicates that a unit is linked into /run/ temporarily (until the next reboot).""" + + MASKED = "masked" + """Indicates that the unit file is masked permanently.""" + + MASKED_RUNTIME = "masked-runtime" + """Indicates that it is masked in /run/ temporarily (until the next reboot).""" + + STATIC = "static" + """Indicates that the unit is statically enabled, i.e. always enabled and doesn't need to be enabled explicitly.""" + + DISABLED = "disabled" + """Indicates that the unit file is not enabled.""" + + INVALID = "invalid" + """Indicates that it could not be determined whether the unit file is enabled.""" + + +UnitFileStateLiteral = Literal[ + "enabled", + "enabled-runtime", + "linked", + "linked-runtime", + "masked", + "masked-runtime", + "static", + "disabled", + "invalid", +] + + +class Mode(str, enum.Enum): + REPLACE = "replace" + FAIL = "fail" + ISOLATE = "isolate" + IGNORE_DEPENDENCIES = "ignore-dependencies" + IGNORE_REQUIREMENTS = "ignore-requirements" + + +class ActiveState(str, enum.Enum): + """ + ActiveState contains a state value that reflects the unit's current status. + """ + + ACTIVE = "active" + """ + The unit is active. + """ + + RELOADING = "reloading" + """ + The unit is active and reloading its configuration. + """ + + INACTIVE = "inactive" + """ + The unit is inactive, previous run was successful or hasn't yet occurred. + """ + + FAILED = "failed" + """ + The unit is inactive, previous run was unsuccessful. + """ + + ACTIVATING = "activating" + """ + The unit is transitioning from inactive to active state. + """ + + DEACTIVATING = "deactivating" + """ + The unit is in the process of deactivation. + """ + + +ActiveStateLiteral = Literal["active", "reloading", "inactive", "failed", "activating", "deactivating"] + + +@runtime_checkable +class SystemdProxy(Protocol): + """ABC for typing. + + for description of methods + see https://www.freedesktop.org/software/systemd/man/latest/org.freedesktop.systemd1.html#The%20Manager%20Object""" + + async def call_enable_unit_files(self, files: list[str], runtime: bool, force: bool): ... + + async def call_get_unit_file_state(self, service) -> UnitFileStateLiteral: ... + + async def call_start_unit(self, name, mode): + pass + + async def call_stop_unit(self, name, mode): ... + + async def call_restart_unit(self, name, mode): ... + + async def call_disable_unit_files(self, files: list[str], runtime: bool): ... + + async def call_get_unit(self, name: str) -> str: ... + + +@runtime_checkable +class UnitProxy(Protocol): + """for typing. + + for description of methods see + https://www.freedesktop.org/software/systemd/man/latest/org.freedesktop.systemd1.html#Service%20Unit%20Objects""" + + async def get_active_state(self) -> ActiveStateLiteral: ...