From 7a30f083094335292db06137d0ce418c2331df6f Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Sat, 22 Mar 2025 16:15:56 -0700 Subject: [PATCH 01/13] optimize generating collection replay.json by performing bulk presigned url lookup --- backend/btrixcloud/colls.py | 99 +++++++++++++++++++++++++++++-------- 1 file changed, 78 insertions(+), 21 deletions(-) diff --git a/backend/btrixcloud/colls.py b/backend/btrixcloud/colls.py index ff9cc70959..e73b6c5644 100644 --- a/backend/btrixcloud/colls.py +++ b/backend/btrixcloud/colls.py @@ -28,7 +28,7 @@ UpdateColl, AddRemoveCrawlList, BaseCrawl, - CrawlOutWithResources, + CrawlFile, CrawlFileOut, Organization, PaginatedCollOutResponse, @@ -50,7 +50,13 @@ MIN_UPLOAD_PART_SIZE, PublicCollOut, ) -from .utils import dt_now, slug_from_name, get_duplicate_key_error_field, get_origin +from .utils import ( + dt_now, + slug_from_name, + get_duplicate_key_error_field, + get_origin, + date_to_str, +) if TYPE_CHECKING: from .orgs import OrgOps @@ -346,7 +352,7 @@ async def get_collection_out( result["resources"], crawl_ids, pages_optimized, - ) = await self.get_collection_crawl_resources(coll_id) + ) = await self.get_collection_crawl_resources(coll_id, org) initial_pages, _ = await self.page_ops.list_pages( crawl_ids=crawl_ids, @@ -400,7 +406,9 @@ async def get_public_collection_out( if result.get("access") not in allowed_access: raise HTTPException(status_code=404, detail="collection_not_found") - result["resources"], _, _ = await self.get_collection_crawl_resources(coll_id) + result["resources"], _, _ = await self.get_collection_crawl_resources( + coll_id, org + ) thumbnail = result.get("thumbnail") if thumbnail: @@ -555,30 +563,79 @@ async def list_collections( return collections, total async def get_collection_crawl_resources( - self, coll_id: UUID + self, coll_id: UUID, org: Organization ) -> tuple[List[CrawlFileOut], List[str], bool]: """Return pre-signed resources for all collection crawl files.""" - # Ensure collection exists - _ = await self.get_collection_raw(coll_id) - resources = [] pages_optimized = True - crawls, _ = await self.crawl_ops.list_all_base_crawls( - collection_id=coll_id, - states=list(SUCCESSFUL_STATES), - page_size=10_000, - cls_type=CrawlOutWithResources, + crawl_ids = await self.get_collection_crawl_ids(coll_id) + + cursor = self.crawls.aggregate( + [ + {"$match": {"_id": {"$in": crawl_ids}}}, + {"$project": {"files": "$files", "version": 1}}, + { + "$lookup": { + "from": "presigned_urls", + "localField": "files.filename", + "foreignField": "_id", + "as": "presigned", + } + }, + ] ) - crawl_ids = [] + resources = [] - for crawl in crawls: - crawl_ids.append(crawl.id) - if crawl.resources: - resources.extend(crawl.resources) - if crawl.version != 2: - pages_optimized = False + async for result in cursor: + mapping = {} + # create mapping of filename -> file data + for file in result["files"]: + mapping[file["filename"]] = file + + # add already presigned resources + for presigned in result["presigned"]: + file = mapping.get(presigned["_id"]) + if file: + file["signedAt"] = presigned["signedAt"] + file["path"] = presigned["url"] + resources.append( + CrawlFileOut( + name=os.path.basename(file["filename"]), + path=presigned["url"], + hash=file["hash"], + size=file["size"], + crawlId=result.get("_id"), + numReplicas=len(file.get("replicas") or []), + expireAt=date_to_str( + presigned["signedAt"] + + self.storage_ops.signed_duration_delta + ), + ) + ) + + del mapping[presigned["_id"]] + + # need to sign the remainder + for file in mapping.values(): + # force update as we know its not already presigned, skip extra check + url, expire_at = await self.storage_ops.get_presigned_url( + org, CrawlFile(**file), force_update=True + ) + resources.append( + CrawlFileOut( + name=os.path.basename(file["filename"]), + path=url, + hash=file["hash"], + size=file["size"], + crawlId=result.get("_id"), + numReplicas=len(file.get("replicas") or []), + expireAt=date_to_str(expire_at), + ) + ) + + pages_optimized = result.get("version") == 2 return resources, crawl_ids, pages_optimized @@ -1017,7 +1074,7 @@ async def get_collection_all(org: Organization = Depends(org_viewer_dep)): results[collection.name], _, _, - ) = await colls.get_collection_crawl_resources(collection.id) + ) = await colls.get_collection_crawl_resources(collection.id, org) except Exception as exc: # pylint: disable=raise-missing-from raise HTTPException( From c75e6b16d6ce93ea1a1fe6ddeaa3d0a8434b6806 Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Sat, 22 Mar 2025 16:29:56 -0700 Subject: [PATCH 02/13] sign in parallel --- backend/btrixcloud/colls.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/backend/btrixcloud/colls.py b/backend/btrixcloud/colls.py index e73b6c5644..c074bb6982 100644 --- a/backend/btrixcloud/colls.py +++ b/backend/btrixcloud/colls.py @@ -618,11 +618,18 @@ async def get_collection_crawl_resources( del mapping[presigned["_id"]] # need to sign the remainder + futures = [] + for file in mapping.values(): # force update as we know its not already presigned, skip extra check - url, expire_at = await self.storage_ops.get_presigned_url( - org, CrawlFile(**file), force_update=True + futures.append( + self.storage_ops.get_presigned_url( + org, CrawlFile(**file), force_update=True + ) ) + + results = await asyncio.gather(*futures) + for (url, expire_at), file in zip(results, mapping.values()): resources.append( CrawlFileOut( name=os.path.basename(file["filename"]), From d9f3e68f36cc7b768df0b184b7355da1f6c1d587 Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Sat, 22 Mar 2025 17:22:42 -0700 Subject: [PATCH 03/13] insert in batches --- backend/btrixcloud/colls.py | 29 ++++++++-------- backend/btrixcloud/storages.py | 61 ++++++++++++++++++++++++++++++++++ 2 files changed, 76 insertions(+), 14 deletions(-) diff --git a/backend/btrixcloud/colls.py b/backend/btrixcloud/colls.py index c074bb6982..c4d2d76492 100644 --- a/backend/btrixcloud/colls.py +++ b/backend/btrixcloud/colls.py @@ -588,6 +588,8 @@ async def get_collection_crawl_resources( resources = [] + sign_files = [] + async for result in cursor: mapping = {} # create mapping of filename -> file data @@ -617,19 +619,21 @@ async def get_collection_crawl_resources( del mapping[presigned["_id"]] - # need to sign the remainder - futures = [] + pages_optimized = result.get("version") == 2 - for file in mapping.values(): - # force update as we know its not already presigned, skip extra check - futures.append( - self.storage_ops.get_presigned_url( - org, CrawlFile(**file), force_update=True - ) - ) + sign_files.extend(list(mapping.values())) + + if sign_files: + names = [file["filename"] for file in sign_files] - results = await asyncio.gather(*futures) - for (url, expire_at), file in zip(results, mapping.values()): + first_file = CrawlFile(**sign_files[0]) + s3storage = self.storage_ops.get_org_storage_by_ref(org, first_file.storage) + + signed_urls, expire_at = await self.storage_ops.get_presigned_urls_bulk( + org, s3storage, names + ) + + for url, file in zip(signed_urls, sign_files): resources.append( CrawlFileOut( name=os.path.basename(file["filename"]), @@ -642,8 +646,6 @@ async def get_collection_crawl_resources( ) ) - pages_optimized = result.get("version") == 2 - return resources, crawl_ids, pages_optimized async def get_collection_names(self, uuids: List[UUID]): @@ -1070,7 +1072,6 @@ async def list_collection_all( @app.get( "/orgs/{oid}/collections/$all", tags=["collections"], - response_model=Dict[str, List[CrawlFileOut]], ) async def get_collection_all(org: Organization = Depends(org_viewer_dep)): results = {} diff --git a/backend/btrixcloud/storages.py b/backend/btrixcloud/storages.py index 1e58521717..cd248a3f75 100644 --- a/backend/btrixcloud/storages.py +++ b/backend/btrixcloud/storages.py @@ -521,6 +521,67 @@ async def get_presigned_url( return presigned_url, now + self.signed_duration_delta + async def get_presigned_urls_bulk( + self, org: Organization, s3storage: S3Storage, filenames: list[str] + ) -> tuple[list[str], datetime]: + """generate pre-signed url for crawl file""" + + urls = [] + + futures = [] + num_batch = 8 + + now = dt_now() + + async with self.get_s3_client( + s3storage, + for_presign=True, + ) as (client, bucket, key): + + for filename in filenames: + orig_key = key + key += filename + + futures.append( + client.generate_presigned_url( + "get_object", + Params={"Bucket": bucket, "Key": key}, + ExpiresIn=PRESIGN_DURATION_SECONDS, + ) + ) + + for i in range(0, len(futures), num_batch): + batch = futures[i : i + num_batch] + results = await asyncio.gather(*batch) + + presigned_obj = [] + + for presigned_url, filename in zip( + results, filenames[i : i + num_batch] + ): + if ( + s3storage.access_endpoint_url + and s3storage.access_endpoint_url != s3storage.endpoint_url + ): + parts = urlsplit(s3storage.endpoint_url) + host_endpoint_url = ( + f"{parts.scheme}://{bucket}.{parts.netloc}/{orig_key}" + ) + presigned_url = presigned_url.replace( + host_endpoint_url, s3storage.access_endpoint_url + ) + + urls.append(presigned_url) + presigned_obj.append( + PresignedUrl( + id=filename, url=presigned_url, signedAt=now, oid=org.id + ).dict() + ) + + await self.presigned_urls.insert_many(presigned_obj, ordered=False) + + return urls, now + self.signed_duration_delta + async def delete_file_object(self, org: Organization, crawlfile: BaseFile) -> bool: """delete crawl file from storage.""" return await self._delete_file(org, crawlfile.filename, crawlfile.storage) From 94b30a3fe148fe5a524cd6aeb03f56cf6edaf89f Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Sat, 22 Mar 2025 17:39:35 -0700 Subject: [PATCH 04/13] test out all coll --- backend/btrixcloud/colls.py | 27 ++++++++++----------------- 1 file changed, 10 insertions(+), 17 deletions(-) diff --git a/backend/btrixcloud/colls.py b/backend/btrixcloud/colls.py index c4d2d76492..7d2b6b77b3 100644 --- a/backend/btrixcloud/colls.py +++ b/backend/btrixcloud/colls.py @@ -563,17 +563,23 @@ async def list_collections( return collections, total async def get_collection_crawl_resources( - self, coll_id: UUID, org: Organization + self, coll_id: Optional[UUID], org: Organization ) -> tuple[List[CrawlFileOut], List[str], bool]: """Return pre-signed resources for all collection crawl files.""" resources = [] pages_optimized = True + match: dict[str, Any] - crawl_ids = await self.get_collection_crawl_ids(coll_id) + if coll_id: + crawl_ids = await self.get_collection_crawl_ids(coll_id) + match = {"_id": {"$in": crawl_ids}} + else: + crawl_ids = [] + match = {"oid": org.id} cursor = self.crawls.aggregate( [ - {"$match": {"_id": {"$in": crawl_ids}}}, + {"$match": match}, {"$project": {"files": "$files", "version": 1}}, { "$lookup": { @@ -1075,20 +1081,7 @@ async def list_collection_all( ) async def get_collection_all(org: Organization = Depends(org_viewer_dep)): results = {} - try: - all_collections, _ = await colls.list_collections(org, page_size=10_000) - for collection in all_collections: - ( - results[collection.name], - _, - _, - ) = await colls.get_collection_crawl_resources(collection.id, org) - except Exception as exc: - # pylint: disable=raise-missing-from - raise HTTPException( - status_code=400, detail="Error Listing All Crawled Files: " + str(exc) - ) - + results["resources"] = colls.get_collection_crawl_resources(None, org) return results @app.get( From 7f93c5aa76589012c372cc443646aebc25d6188d Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Sat, 22 Mar 2025 17:46:18 -0700 Subject: [PATCH 05/13] typo --- backend/btrixcloud/colls.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/btrixcloud/colls.py b/backend/btrixcloud/colls.py index 7d2b6b77b3..b852003252 100644 --- a/backend/btrixcloud/colls.py +++ b/backend/btrixcloud/colls.py @@ -1081,7 +1081,7 @@ async def list_collection_all( ) async def get_collection_all(org: Organization = Depends(org_viewer_dep)): results = {} - results["resources"] = colls.get_collection_crawl_resources(None, org) + results["resources"] = await colls.get_collection_crawl_resources(None, org) return results @app.get( From 4d27fb4c7fd9a98b158f7094969840a571a12168 Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Sat, 22 Mar 2025 18:02:20 -0700 Subject: [PATCH 06/13] fix --- backend/btrixcloud/storages.py | 33 ++++++++++++++------------------- 1 file changed, 14 insertions(+), 19 deletions(-) diff --git a/backend/btrixcloud/storages.py b/backend/btrixcloud/storages.py index cd248a3f75..3ea05916a6 100644 --- a/backend/btrixcloud/storages.py +++ b/backend/btrixcloud/storages.py @@ -485,12 +485,9 @@ async def get_presigned_url( s3storage, for_presign=True, ) as (client, bucket, key): - orig_key = key - key += crawlfile.filename - presigned_url = await client.generate_presigned_url( "get_object", - Params={"Bucket": bucket, "Key": key}, + Params={"Bucket": bucket, "Key": key + crawlfile.filename}, ExpiresIn=PRESIGN_DURATION_SECONDS, ) @@ -499,9 +496,7 @@ async def get_presigned_url( and s3storage.access_endpoint_url != s3storage.endpoint_url ): parts = urlsplit(s3storage.endpoint_url) - host_endpoint_url = ( - f"{parts.scheme}://{bucket}.{parts.netloc}/{orig_key}" - ) + host_endpoint_url = f"{parts.scheme}://{bucket}.{parts.netloc}/{key}" presigned_url = presigned_url.replace( host_endpoint_url, s3storage.access_endpoint_url ) @@ -538,14 +533,20 @@ async def get_presigned_urls_bulk( for_presign=True, ) as (client, bucket, key): - for filename in filenames: - orig_key = key - key += filename + if ( + s3storage.access_endpoint_url + and s3storage.access_endpoint_url != s3storage.endpoint_url + ): + parts = urlsplit(s3storage.endpoint_url) + host_endpoint_url = f"{parts.scheme}://{bucket}.{parts.netloc}/{key}" + else: + host_endpoint_url = None + for filename in filenames: futures.append( client.generate_presigned_url( "get_object", - Params={"Bucket": bucket, "Key": key}, + Params={"Bucket": bucket, "Key": key + filename}, ExpiresIn=PRESIGN_DURATION_SECONDS, ) ) @@ -559,19 +560,13 @@ async def get_presigned_urls_bulk( for presigned_url, filename in zip( results, filenames[i : i + num_batch] ): - if ( - s3storage.access_endpoint_url - and s3storage.access_endpoint_url != s3storage.endpoint_url - ): - parts = urlsplit(s3storage.endpoint_url) - host_endpoint_url = ( - f"{parts.scheme}://{bucket}.{parts.netloc}/{orig_key}" - ) + if host_endpoint_url: presigned_url = presigned_url.replace( host_endpoint_url, s3storage.access_endpoint_url ) urls.append(presigned_url) + presigned_obj.append( PresignedUrl( id=filename, url=presigned_url, signedAt=now, oid=org.id From b814bd60300fe1106dad6bad9d6eb322589822fd Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Sat, 22 Mar 2025 18:14:33 -0700 Subject: [PATCH 07/13] cleanup --- backend/btrixcloud/colls.py | 8 ++++++-- backend/btrixcloud/storages.py | 2 +- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/backend/btrixcloud/colls.py b/backend/btrixcloud/colls.py index b852003252..c51b6e824f 100644 --- a/backend/btrixcloud/colls.py +++ b/backend/btrixcloud/colls.py @@ -562,6 +562,7 @@ async def list_collections( return collections, total + # pylint: disable=too-many-locals async def get_collection_crawl_resources( self, coll_id: Optional[UUID], org: Organization ) -> tuple[List[CrawlFileOut], List[str], bool]: @@ -596,10 +597,13 @@ async def get_collection_crawl_resources( sign_files = [] + crawl_id = None + async for result in cursor: mapping = {} # create mapping of filename -> file data for file in result["files"]: + file["crawl_id"] = result.get("_id") mapping[file["filename"]] = file # add already presigned resources @@ -614,7 +618,7 @@ async def get_collection_crawl_resources( path=presigned["url"], hash=file["hash"], size=file["size"], - crawlId=result.get("_id"), + crawlId=file["crawl_id"], numReplicas=len(file.get("replicas") or []), expireAt=date_to_str( presigned["signedAt"] @@ -646,7 +650,7 @@ async def get_collection_crawl_resources( path=url, hash=file["hash"], size=file["size"], - crawlId=result.get("_id"), + crawlId=file["crawl_id"], numReplicas=len(file.get("replicas") or []), expireAt=date_to_str(expire_at), ) diff --git a/backend/btrixcloud/storages.py b/backend/btrixcloud/storages.py index 3ea05916a6..4253b5a669 100644 --- a/backend/btrixcloud/storages.py +++ b/backend/btrixcloud/storages.py @@ -570,7 +570,7 @@ async def get_presigned_urls_bulk( presigned_obj.append( PresignedUrl( id=filename, url=presigned_url, signedAt=now, oid=org.id - ).dict() + ).to_dict() ) await self.presigned_urls.insert_many(presigned_obj, ordered=False) From 1b7658959207d8cc2ddae632927174f610f4ebf4 Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Sun, 23 Mar 2025 02:10:11 -0700 Subject: [PATCH 08/13] try increasing batch size --- backend/btrixcloud/storages.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/btrixcloud/storages.py b/backend/btrixcloud/storages.py index 4253b5a669..78d1e6b7ab 100644 --- a/backend/btrixcloud/storages.py +++ b/backend/btrixcloud/storages.py @@ -524,7 +524,7 @@ async def get_presigned_urls_bulk( urls = [] futures = [] - num_batch = 8 + num_batch = 16 now = dt_now() From 4c7cc72f9ebfcaf5391c566eb5cb67a542cb99d1 Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Sun, 23 Mar 2025 02:15:49 -0700 Subject: [PATCH 09/13] fix --- backend/btrixcloud/colls.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/backend/btrixcloud/colls.py b/backend/btrixcloud/colls.py index c51b6e824f..afc82ccabc 100644 --- a/backend/btrixcloud/colls.py +++ b/backend/btrixcloud/colls.py @@ -597,8 +597,6 @@ async def get_collection_crawl_resources( sign_files = [] - crawl_id = None - async for result in cursor: mapping = {} # create mapping of filename -> file data From 0a0cd623a6dff3e7bfc5fefce785d22d8d313657 Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Sun, 23 Mar 2025 03:13:31 -0700 Subject: [PATCH 10/13] presign batch size configurable --- backend/btrixcloud/storages.py | 6 ++++-- chart/templates/configmap.yaml | 2 ++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/backend/btrixcloud/storages.py b/backend/btrixcloud/storages.py index 78d1e6b7ab..41feee02c9 100644 --- a/backend/btrixcloud/storages.py +++ b/backend/btrixcloud/storages.py @@ -70,7 +70,7 @@ # ============================================================================ -# pylint: disable=broad-except,raise-missing-from +# pylint: disable=broad-except,raise-missing-from,too-many-instance-attributes class StorageOps: """All storage handling, download/upload operations""" @@ -104,6 +104,8 @@ def __init__(self, org_ops, crawl_manager, mdb) -> None: default_namespace = os.environ.get("DEFAULT_NAMESPACE", "default") self.frontend_origin = f"{frontend_origin}.{default_namespace}" + self.presign_batch_size = int(os.environ.get("PRESIGN_BATCH_SIZE", 8)) + with open(os.environ["STORAGES_JSON"], encoding="utf-8") as fh: storage_list = json.loads(fh.read()) @@ -524,7 +526,7 @@ async def get_presigned_urls_bulk( urls = [] futures = [] - num_batch = 16 + num_batch = self.presign_batch_size now = dt_now() diff --git a/chart/templates/configmap.yaml b/chart/templates/configmap.yaml index 9fd4188e8b..82edc1a054 100644 --- a/chart/templates/configmap.yaml +++ b/chart/templates/configmap.yaml @@ -90,6 +90,8 @@ data: REPLICA_DELETION_DELAY_DAYS: "{{ .Values.replica_deletion_delay_days | default 0 }}" + PRESIGN_BATCH_SIZE: "{{ .Values.presign_batch_size | default 8 }}" + --- apiVersion: v1 From e3b631dcb2aa14571b43f9f86af8112c8ee5bca4 Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Mon, 24 Mar 2025 14:18:16 -0700 Subject: [PATCH 11/13] refactor: support bulk presign for single crawls as well: - move bulk presign query to basecrawls - support bulk presign for crawl as well as collection replay --- backend/btrixcloud/basecrawls.py | 133 +++++++++++++++++++++++++++---- backend/btrixcloud/colls.py | 80 +------------------ 2 files changed, 120 insertions(+), 93 deletions(-) diff --git a/backend/btrixcloud/basecrawls.py b/backend/btrixcloud/basecrawls.py index e9b70d4e1d..a5e24f4be9 100644 --- a/backend/btrixcloud/basecrawls.py +++ b/backend/btrixcloud/basecrawls.py @@ -1,7 +1,18 @@ """base crawl type""" from datetime import datetime -from typing import Optional, List, Union, Dict, Any, Type, TYPE_CHECKING, cast, Tuple +from typing import ( + Optional, + List, + Union, + Dict, + Any, + Type, + TYPE_CHECKING, + cast, + Tuple, + AsyncIterable, +) from uuid import UUID import os import urllib.parse @@ -76,6 +87,7 @@ def __init__( background_job_ops: BackgroundJobOps, ): self.crawls = mdb["crawls"] + self.presigned_urls = mdb["presigned_urls"] self.crawl_configs = crawl_configs self.user_manager = users self.orgs = orgs @@ -468,24 +480,115 @@ async def resolve_signed_urls( out_files = [] - for file_ in files: - presigned_url, expire_at = await self.storage_ops.get_presigned_url( - org, file_, force_update=force_update + cursor = self.presigned_urls.find( + {"_id": {"$in": [file.filename for file in files]}} + ) + + presigned = await cursor.to_list(10000) + + files_dict = [file.dict() for file in files] + + async def async_gen(): + yield {"presigned": presigned, "files": files_dict, "_id": crawl_id} + + out_files, _ = await self.process_presigned_files( + async_gen(), org, force_update + ) + + return out_files + + async def get_presigned_files( + self, match: dict[str, Any], org: Organization + ) -> tuple[list[CrawlFileOut], bool]: + """return presigned crawl files queried as batch, merging presigns with files in one pass""" + cursor = self.crawls.aggregate( + [ + {"$match": match}, + {"$project": {"files": "$files", "version": 1}}, + { + "$lookup": { + "from": "presigned_urls", + "localField": "files.filename", + "foreignField": "_id", + "as": "presigned", + } + }, + ] + ) + + return await self.process_presigned_files(cursor, org) + + async def process_presigned_files( + self, + cursor: AsyncIterable[dict[str, Any]], + org: Organization, + force_update=False, + ) -> tuple[list[CrawlFileOut], bool]: + """process presigned files in batches""" + resources = [] + pages_optimized = False + + sign_files = [] + + async for result in cursor: + pages_optimized = result.get("version") == 2 + + mapping = {} + # create mapping of filename -> file data + for file in result["files"]: + file["crawl_id"] = result["_id"] + mapping[file["filename"]] = file + + if not force_update: + # add already presigned resources + for presigned in result["presigned"]: + file = mapping.get(presigned["_id"]) + if file: + file["signedAt"] = presigned["signedAt"] + file["path"] = presigned["url"] + resources.append( + CrawlFileOut( + name=os.path.basename(file["filename"]), + path=presigned["url"], + hash=file["hash"], + size=file["size"], + crawlId=file["crawl_id"], + numReplicas=len(file.get("replicas") or []), + expireAt=date_to_str( + presigned["signedAt"] + + self.storage_ops.signed_duration_delta + ), + ) + ) + + del mapping[presigned["_id"]] + + sign_files.extend(list(mapping.values())) + + if sign_files: + names = [file["filename"] for file in sign_files] + + first_file = CrawlFile(**sign_files[0]) + s3storage = self.storage_ops.get_org_storage_by_ref(org, first_file.storage) + + signed_urls, expire_at = await self.storage_ops.get_presigned_urls_bulk( + org, s3storage, names ) - out_files.append( - CrawlFileOut( - name=os.path.basename(file_.filename), - path=presigned_url or "", - hash=file_.hash, - size=file_.size, - crawlId=crawl_id, - numReplicas=len(file_.replicas) if file_.replicas else 0, - expireAt=date_to_str(expire_at), + for url, file in zip(signed_urls, sign_files): + resources.append( + CrawlFileOut( + name=os.path.basename(file["filename"]), + path=url, + hash=file["hash"], + size=file["size"], + crawlId=file["crawl_id"], + numReplicas=len(file.get("replicas") or []), + expireAt=date_to_str(expire_at), + ) ) - ) - return out_files + return resources, pages_optimized async def add_to_collection( self, crawl_ids: List[str], collection_id: UUID, org: Organization diff --git a/backend/btrixcloud/colls.py b/backend/btrixcloud/colls.py index afc82ccabc..03b673b64f 100644 --- a/backend/btrixcloud/colls.py +++ b/backend/btrixcloud/colls.py @@ -28,7 +28,6 @@ UpdateColl, AddRemoveCrawlList, BaseCrawl, - CrawlFile, CrawlFileOut, Organization, PaginatedCollOutResponse, @@ -55,7 +54,6 @@ slug_from_name, get_duplicate_key_error_field, get_origin, - date_to_str, ) if TYPE_CHECKING: @@ -567,8 +565,6 @@ async def get_collection_crawl_resources( self, coll_id: Optional[UUID], org: Organization ) -> tuple[List[CrawlFileOut], List[str], bool]: """Return pre-signed resources for all collection crawl files.""" - resources = [] - pages_optimized = True match: dict[str, Any] if coll_id: @@ -578,82 +574,10 @@ async def get_collection_crawl_resources( crawl_ids = [] match = {"oid": org.id} - cursor = self.crawls.aggregate( - [ - {"$match": match}, - {"$project": {"files": "$files", "version": 1}}, - { - "$lookup": { - "from": "presigned_urls", - "localField": "files.filename", - "foreignField": "_id", - "as": "presigned", - } - }, - ] + resources, pages_optimized = await self.crawl_ops.get_presigned_files( + match, org ) - resources = [] - - sign_files = [] - - async for result in cursor: - mapping = {} - # create mapping of filename -> file data - for file in result["files"]: - file["crawl_id"] = result.get("_id") - mapping[file["filename"]] = file - - # add already presigned resources - for presigned in result["presigned"]: - file = mapping.get(presigned["_id"]) - if file: - file["signedAt"] = presigned["signedAt"] - file["path"] = presigned["url"] - resources.append( - CrawlFileOut( - name=os.path.basename(file["filename"]), - path=presigned["url"], - hash=file["hash"], - size=file["size"], - crawlId=file["crawl_id"], - numReplicas=len(file.get("replicas") or []), - expireAt=date_to_str( - presigned["signedAt"] - + self.storage_ops.signed_duration_delta - ), - ) - ) - - del mapping[presigned["_id"]] - - pages_optimized = result.get("version") == 2 - - sign_files.extend(list(mapping.values())) - - if sign_files: - names = [file["filename"] for file in sign_files] - - first_file = CrawlFile(**sign_files[0]) - s3storage = self.storage_ops.get_org_storage_by_ref(org, first_file.storage) - - signed_urls, expire_at = await self.storage_ops.get_presigned_urls_bulk( - org, s3storage, names - ) - - for url, file in zip(signed_urls, sign_files): - resources.append( - CrawlFileOut( - name=os.path.basename(file["filename"]), - path=url, - hash=file["hash"], - size=file["size"], - crawlId=file["crawl_id"], - numReplicas=len(file.get("replicas") or []), - expireAt=date_to_str(expire_at), - ) - ) - return resources, crawl_ids, pages_optimized async def get_collection_names(self, uuids: List[UUID]): From d064c530114a16355c3d5824c041e613c41bded6 Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Mon, 24 Mar 2025 14:21:25 -0700 Subject: [PATCH 12/13] rename --- backend/btrixcloud/basecrawls.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/backend/btrixcloud/basecrawls.py b/backend/btrixcloud/basecrawls.py index a5e24f4be9..4c2fe7cb30 100644 --- a/backend/btrixcloud/basecrawls.py +++ b/backend/btrixcloud/basecrawls.py @@ -491,9 +491,7 @@ async def resolve_signed_urls( async def async_gen(): yield {"presigned": presigned, "files": files_dict, "_id": crawl_id} - out_files, _ = await self.process_presigned_files( - async_gen(), org, force_update - ) + out_files, _ = await self.bulk_presigned_files(async_gen(), org, force_update) return out_files @@ -516,9 +514,9 @@ async def get_presigned_files( ] ) - return await self.process_presigned_files(cursor, org) + return await self.bulk_presigned_files(cursor, org) - async def process_presigned_files( + async def bulk_presigned_files( self, cursor: AsyncIterable[dict[str, Any]], org: Organization, From 697c3863d3b7c8172f219c446b9a8bd1d41fd4a9 Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Tue, 25 Mar 2025 00:22:02 -0700 Subject: [PATCH 13/13] tweaks --- backend/btrixcloud/basecrawls.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/btrixcloud/basecrawls.py b/backend/btrixcloud/basecrawls.py index 4c2fe7cb30..c23cb90bd3 100644 --- a/backend/btrixcloud/basecrawls.py +++ b/backend/btrixcloud/basecrawls.py @@ -475,7 +475,6 @@ async def resolve_signed_urls( ) -> List[CrawlFileOut]: """Regenerate presigned URLs for files as necessary""" if not files: - print("no files") return [] out_files = [] @@ -488,6 +487,7 @@ async def resolve_signed_urls( files_dict = [file.dict() for file in files] + # need an async generator to call bulk_presigned_files async def async_gen(): yield {"presigned": presigned, "files": files_dict, "_id": crawl_id}