Skip to content

Feature: garbage collector CLI command #269

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion deployment/docker-build/pyaleph.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ RUN /opt/venv/bin/pip install --no-cache-dir -r /opt/build/requirements.txt
RUN rm /opt/build/requirements.txt

# === Install the CCN itself ===
COPY deployment/migrations /opt/pyaleph/migrations
COPY setup.py /opt/pyaleph/
COPY src /opt/pyaleph/src
# Git data is used to determine the version of the CCN
Expand Down
164 changes: 0 additions & 164 deletions deployment/migrations/config_updater.py

This file was deleted.

4 changes: 4 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ install_requires =
sentry-sdk==1.5.11
setproctitle==1.2.2
substrate-interface==1.1.7
typer==0.4.1
ujson==5.1.0 # required by aiocache
urllib3==1.26.8
uvloop==0.16.0
Expand Down Expand Up @@ -96,6 +97,8 @@ testing =
pytest-aiohttp
pytest-asyncio
pytest-mock
types-pytz
types-pyyaml
types-requests
types-setuptools
nuls2 =
Expand All @@ -111,6 +114,7 @@ docs =
# Add here console scripts like:
console_scripts =
pyaleph = aleph.commands:run
ccn_cli = aleph.ccn_cli.main:app
# For example:
# console_scripts =
# fibonacci = pyaleph.skeleton:run
Expand Down
File renamed without changes.
14 changes: 14 additions & 0 deletions src/aleph/ccn_cli/cli_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
"""
Global configuration object for the CLI. Use the `get_cli_config()` method
to access and modify the configuration.
"""

from dataclasses import dataclass
from pathlib import Path


@dataclass
class CliConfig:
config_file_path: Path
key_dir: Path
verbose: bool
Empty file.
166 changes: 166 additions & 0 deletions src/aleph/ccn_cli/commands/garbage_collector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
"""
This migration checks all the files stored in local storage (=GridFS) and compares them to the list
of messages already on the node. The files that are not linked to any message are scheduled for
deletion.
"""
import asyncio
import datetime as dt
from typing import Any, Dict, FrozenSet, List, Optional
from typing import cast

import pytz
import typer
from aleph_message.models import MessageType
from configmanager import Config

import aleph.model
from aleph.ccn_cli.cli_config import CliConfig
from aleph.config import get_defaults
from aleph.model import init_db_globals
from aleph.model.filepin import PermanentPin
from aleph.model.hashes import delete_value as delete_gridfs_file
from aleph.model.messages import Message

gc_ns = typer.Typer()


async def get_hashes(
item_type_field: str, item_hash_field: str, msg_type: Optional[MessageType] = None
) -> FrozenSet[str]:
def rgetitem(dictionary: Any, fields: List[str]) -> Any:
value = dictionary[fields[0]]
if len(fields) > 1:
return rgetitem(value, fields[1:])
return value

filters = {
# Check if the hash field exists in case the message was forgotten
item_hash_field: {"$exists": 1},
item_type_field: {"$in": ["ipfs", "storage"]},
}
if msg_type:
filters["type"] = msg_type

hashes = [
rgetitem(msg, item_hash_field.split("."))
async for msg in Message.collection.find(
filters,
{item_hash_field: 1},
batch_size=1000,
)
]

# Temporary fix for api2. A message has a list of dicts as item hash.
hashes = [h for h in hashes if isinstance(h, str)]

return frozenset(hashes)


def print_files_to_preserve(files_to_preserve: Dict[str, FrozenSet[str]]) -> None:
typer.echo("The following files will be preserved:")
for file_type, files in files_to_preserve.items():
typer.echo(f"* {len(files)} {file_type}")


async def list_files_to_preserve(
gridfs_files_dict: Dict[str, Dict],
temporary_files_ttl: int,
) -> Dict[str, FrozenSet[str]]:
files_to_preserve_dict = {}

# Preserve any file that was uploaded less than an hour ago
current_datetime = pytz.utc.localize(dt.datetime.utcnow())
files_to_preserve_dict["temporary files"] = frozenset(
[
file["filename"]
for file in gridfs_files_dict.values()
if file["uploadDate"]
> current_datetime - dt.timedelta(seconds=temporary_files_ttl)
]
)

# Get all the messages that potentially store data in local storage:
# * any message with item_type in ["storage", "ipfs"]
# * STOREs with content.item_type in ["storage", "ipfs"]
files_to_preserve_dict["non-inline messages"] = await get_hashes(
item_type_field="item_type",
item_hash_field="item_hash",
)
files_to_preserve_dict["stores"] = await get_hashes(
item_type_field="content.item_type",
item_hash_field="content.item_hash",
msg_type=MessageType.store,
)

# We also keep permanent pins, even if they are also stored on IPFS
files_to_preserve_dict["file pins"] = frozenset(
[
pin["multihash"]
async for pin in PermanentPin.collection.find({}, {"multihash": 1})
]
)

return files_to_preserve_dict


async def run(ctx: typer.Context, dry_run: bool):
config = Config(schema=get_defaults())
cli_config = cast(CliConfig, ctx.obj)
config.yaml.load(str(cli_config.config_file_path))

init_db_globals(config=config)
if aleph.model.db is None: # for mypy
raise ValueError("DB not initialized as expected.")

# Get a set of all the files currently in GridFS
gridfs_files_dict = {
file["filename"]: file
async for file in aleph.model.db["fs.files"].find(
projection={"_id": 0, "filename": 1, "length": 1, "uploadDate": 1},
batch_size=1000,
)
}
gridfs_files = frozenset(gridfs_files_dict.keys())

typer.echo(f"Found {len(gridfs_files_dict)} files in local storage.")

files_to_preserve_dict = await list_files_to_preserve(
gridfs_files_dict=gridfs_files_dict,
temporary_files_ttl=config.storage.temporary_files_ttl.value,
)
files_to_preserve = frozenset().union(*files_to_preserve_dict.values())
files_to_delete = gridfs_files - files_to_preserve

if cli_config.verbose:
print_files_to_preserve(files_to_preserve_dict)

restored_memory = sum(
gridfs_files_dict[filename]["length"] for filename in files_to_delete
)
typer.echo(
f"{len(files_to_delete)} will be deleted, totaling {restored_memory} bytes."
)

if dry_run:
if cli_config.verbose:
if files_to_delete:
typer.echo("The following files will be deleted:")
for file_to_delete in files_to_delete:
typer.echo(f"* {file_to_delete}")

else:
for file_to_delete in files_to_delete:
typer.echo(f"Deleting {file_to_delete}...")
await delete_gridfs_file(file_to_delete)

typer.echo("Done.")


@gc_ns.command(name="run")
def run_gc(
ctx: typer.Context,
dry_run: bool = typer.Option(
False, help="If set, display files to delete without deleting them."
),
):
asyncio.run(run(ctx, dry_run))
Loading