Skip to content

Commit f5c82a8

Browse files
committed
Feature: CLI command to repair local storage
Problem: the local storage of a CCN may get incoherent or corrupted because of issues like downtime or maintenance gone wrong. Solution: a new CLI command, 'repair'. This command checks the DB to determine the files that should be stored on the node and fetches them from the network.
1 parent dd702df commit f5c82a8

File tree

4 files changed

+119
-20
lines changed

4 files changed

+119
-20
lines changed

setup.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ install_requires =
4040
aiohttp==3.8.1
4141
aioipfs@git+https://github.com/aleph-im/aioipfs.git@76d5624661e879a13b70f3ea87dc9c9604c7eda7
4242
aleph-client==0.4.6
43-
aleph-message==0.2.1
43+
aleph-message==0.2.2
4444
aleph-pytezos@git+https://github.com/aleph-im/aleph-pytezos.git@97fe92ffa6e21ef5ec17ef4fa16c86022b30044c
4545
coincurve==15.0.1
4646
configmanager==1.35.1

src/aleph/ccn_cli/commands/repair.py

Lines changed: 108 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,64 +3,155 @@
33
"""
44
import asyncio
55
import itertools
6-
from typing import Dict, FrozenSet
6+
from typing import Dict, FrozenSet, Set, Tuple
77
from typing import cast
88

99
import typer
10+
from aleph_message.models import ItemHash
1011
from configmanager import Config
1112

1213
import aleph.model
14+
import aleph.services.p2p.singleton as singleton
15+
from aleph import config as aleph_config
1316
from aleph.ccn_cli.cli_config import CliConfig
1417
from aleph.config import get_defaults
18+
from aleph.exceptions import ContentCurrentlyUnavailable
1519
from aleph.model import init_db_globals
20+
from aleph.services.p2p import http
21+
from aleph.storage import get_hash_content
1622
from .toolkit.local_storage import list_expected_local_files
1723

1824
repair_ns = typer.Typer()
1925

2026

21-
22-
def print_files_to_preserve(files_to_preserve: Dict[str, FrozenSet[str]]) -> None:
23-
typer.echo("The following files will be preserved:")
24-
for file_type, files in files_to_preserve.items():
25-
typer.echo(f"* {len(files)} {file_type}")
27+
async def init_api_servers():
28+
peers = [peer async for peer in aleph.model.db["peers"].find({"type": "HTTP"})]
29+
singleton.api_servers = [peer["address"] for peer in peers]
2630

2731

2832
async def list_missing_files() -> FrozenSet[str]:
29-
# Get a set of all the files currently in GridFS
30-
gridfs_files_dict = {
31-
file["filename"]: file
32-
async for file in aleph.model.db["fs.files"].find(
33-
projection={"_id": 0, "filename": 1, "length": 1, "uploadDate": 1},
34-
batch_size=1000,
35-
)
36-
}
33+
if aleph.model.db is None: # for mypy
34+
raise ValueError("DB not initialized as expected.")
3735

38-
gridfs_files = frozenset(gridfs_files_dict.keys())
39-
typer.echo(f"Found {len(gridfs_files_dict)} files in local storage.")
36+
# Get a set of all the files currently in GridFS
37+
gridfs_files = frozenset(
38+
[
39+
file["filename"]
40+
async for file in aleph.model.db["fs.files"].find(
41+
projection={"_id": 0, "filename": 1},
42+
batch_size=1000,
43+
)
44+
]
45+
)
46+
47+
typer.echo(f"Found {len(gridfs_files)} files in local storage.")
4048

4149
expected_local_files_dict = await list_expected_local_files()
42-
expected_local_files = frozenset(itertools.chain.from_iterable(expected_local_files_dict.values()))
50+
expected_local_files = frozenset(
51+
itertools.chain.from_iterable(expected_local_files_dict.values())
52+
)
4353

4454
missing_files = expected_local_files - gridfs_files
4555
return missing_files
4656

4757

58+
async def fetch_and_store_file(filename: str):
59+
item_hash = ItemHash(filename)
60+
_ = await get_hash_content(
61+
content_hash=filename,
62+
engine=item_hash.item_type,
63+
use_network=True,
64+
use_ipfs=True,
65+
store_value=True,
66+
timeout=15,
67+
)
68+
69+
70+
def process_results(
71+
finished_tasks: Set[asyncio.Task], task_dict: Dict[asyncio.Task, str]
72+
) -> Tuple[Set[str], Set[str]]:
73+
fetched_files = set()
74+
failed_files = set()
75+
76+
for task in finished_tasks:
77+
filename = task_dict.pop(task)
78+
exception = task.exception()
79+
80+
if exception is None:
81+
fetched_files.add(filename)
82+
83+
else:
84+
failed_files.add(filename)
85+
if isinstance(exception, ContentCurrentlyUnavailable):
86+
typer.echo(
87+
f"WARNING: Could not fetch {filename}: currently unavailable."
88+
)
89+
else:
90+
typer.echo(
91+
f"ERROR: Could not fetch {filename}: unexpected error: {exception}"
92+
)
93+
94+
return fetched_files, failed_files
95+
96+
97+
async def fetch_files(missing_files: FrozenSet[str], batch_size: int):
98+
tasks = set()
99+
task_dict = {}
100+
101+
fetched_files = set()
102+
failed_files = set()
103+
104+
for i, filename in enumerate(missing_files, start=1):
105+
typer.echo(f"Fetching {filename} ({i}/{len(missing_files)})...")
106+
fetch_task = asyncio.create_task(fetch_and_store_file(filename))
107+
tasks.add(fetch_task)
108+
task_dict[fetch_task] = filename
109+
110+
if len(tasks) == batch_size:
111+
done, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
112+
fetched, failed = process_results(done, task_dict)
113+
fetched_files |= fetched
114+
failed_files |= failed
115+
116+
# Finish
117+
if tasks:
118+
done, _ = await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)
119+
fetched, failed = process_results(done, task_dict)
120+
fetched_files |= fetched
121+
failed_files |= failed
122+
123+
typer.echo(f"Successfully fetched {len(fetched_files)} files.")
124+
if failed_files:
125+
typer.echo(f"WARNING: Failed to fetch {len(failed_files)} files.")
126+
127+
48128
async def fetch_missing_files():
49129
missing_files = await list_missing_files()
50130
typer.echo(f"Found {len(missing_files)} missing files.")
51131

132+
await fetch_files(missing_files, 2000)
133+
52134

53135
async def run(ctx: typer.Context):
54136
config = Config(schema=get_defaults())
55137
cli_config = cast(CliConfig, ctx.obj)
56138
config.yaml.load(str(cli_config.config_file_path))
57139

140+
# Set the config global variable, otherwise the IPFS client will not be initialized properly
141+
aleph_config.app_config = config
142+
58143
init_db_globals(config=config)
144+
# To be able to fetch data from the network
145+
await init_api_servers()
59146
if aleph.model.db is None: # for mypy
60147
raise ValueError("DB not initialized as expected.")
61148

62149
await fetch_missing_files()
63150

151+
# Clean up aiohttp client sessions to avoid a warning
152+
for client_session in http.SESSIONS.values():
153+
await client_session.close()
154+
64155
typer.echo("Done.")
65156

66157

src/aleph/ccn_cli/commands/toolkit/local_storage.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
from typing import Any, Dict, FrozenSet, List, Optional
2-
from aleph.model.messages import Message
3-
from aleph.model.filepin import PermanentPin
2+
43
from aleph_message.models import MessageType
54

5+
from aleph.model.filepin import PermanentPin
6+
from aleph.model.messages import Message
7+
68

79
async def get_hashes(
810
item_type_field: str, item_hash_field: str, msg_type: Optional[MessageType] = None

src/aleph/ccn_cli/main.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from .commands.garbage_collector import gc_ns
88
from .commands.keys import keys_ns
99
from .commands.migrations import migrations_ns
10+
from .commands.repair import repair_ns
1011

1112
app = typer.Typer()
1213

@@ -68,6 +69,11 @@ def main(
6869
app.add_typer(gc_ns, name="gc", help="Invoke the garbage collector.")
6970
app.add_typer(keys_ns, name="keys", help="Operations on private keys.")
7071
app.add_typer(migrations_ns, name="migrations", help="Run DB migrations.")
72+
app.add_typer(
73+
repair_ns,
74+
name="repair",
75+
help="Performs checks on the local install and fixes issues like missing files.",
76+
)
7177

7278

7379
if __name__ == "__main__":

0 commit comments

Comments
 (0)