diff --git a/README.md b/README.md index d611373..4c15174 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ stac-updater new-service # Build AWS resources to update collection stac-updater update-collection --root https://stac.com/landsat-8-l1/catalog.json \ --path {landsat:path}/{landsat:row} \ - --row {date}/{id} + --filename {date}/{id} # Modify kickoff event source to s3:ObjectCreated stac-updater modify-kickoff --type s3 --bucket_name stac-updater-kickoff @@ -88,7 +88,7 @@ It took 86 total invocations to process the 200 STAC Items. ![es-logging-2](docs/images/es-logging-summary.png) ## Update Dynamic Catalog -STAC Items which are successfully ingested into a static collection may also by ingested into a deployed instance of [sat-api](https://github.com/sat-utils/sat-api). +STAC Items which are successfully ingested into a static collection may also by ingested into a deployed instance of [sat-api](https://github.com/sat-utils/sat-api)28. ``` stac-updater update-dynamic-collection --arn arn:aws:lambda:::function:sat-api-ingest @@ -96,7 +96,20 @@ stac-updater update-dynamic-collection --arn arn:aws:lambda:: ![update-sat-api](docs/images/update-sat-api.png) +## Thumbnails +You may backfill thumbnails as items are inserted into the STAC catalog using a deployed instance of [stac-thumbnails](https://github.com/geospatial-jeff/stac-thumbnails). +``` +stac-updater build-thumbnails +``` + +![update-sat-api](docs/images/build-thumbnails.png) + +This plugin is currently designed for a specific use-case and expects the following conditions: +- The underlying asset is referenced at `stac_item['assets']['data']['href']`. +- The underlying asset is a three band image (RGB). + +More generic support for thumbnails will be added at a later date if there is interest. # TODOS - Add support for [staccato](https://github.com/boundlessgeo/staccato). diff --git a/docs/images/build-thumbnails.png b/docs/images/build-thumbnails.png new file mode 100644 index 0000000..4cc6a6b Binary files /dev/null and b/docs/images/build-thumbnails.png differ diff --git a/setup.py b/setup.py index 49217e9..e93a2f8 100644 --- a/setup.py +++ b/setup.py @@ -4,7 +4,7 @@ requirements = [line.rstrip() for line in reqs] setup(name="STAC Updater", - version='0.3', + version='0.3.1', author='Jeff Albrecht', author_email='geospatialjeff@gmail.com', packages=find_packages(exclude=['package']), diff --git a/stac_updater/cli.py b/stac_updater/cli.py index f151fbc..db2162f 100644 --- a/stac_updater/cli.py +++ b/stac_updater/cli.py @@ -26,9 +26,12 @@ def new_service(): @click.option('--root', '-r', type=str, required=True, help="URL of collection.") @click.option('--long-poll/--short-poll', default=False, help="Enable long polling.") @click.option('--concurrency', type=int, default=1, help="Sets lambda concurrency limit when polling the queue.") +@click.option('--timeout', type=int, default=15, help="Sets lambda timeout.") +@click.option('--visibility_timeout', type=int, default=60, help="Sets visibility timeout of messages consumed by the queue.") @click.option('--path', type=str, help="Pattern used by sat-stac to build sub-catalogs.") @click.option('--filename', type=str, help="Pattern used by sat-stac to build item name.") -def update_collection(root, long_poll, concurrency, path, filename): +@click.option('--backfill_extent/--no_backfill', default=False, help="Enable backfilling of collection spatial/temporal extent.") +def update_collection(root, long_poll, concurrency, timeout, visibility_timeout, path, filename, backfill_extent): # Create a SQS queue for the collection # Subscribe SQS queue to SNS topic with filter policy on collection name # Configure lambda function and attach to SQS queue (use ENV variables to pass state) @@ -42,8 +45,8 @@ def update_collection(root, long_poll, concurrency, path, filename): with open(sls_config_path, 'r') as f: # Using unsafe load to preserve type. sls_config = yaml.unsafe_load(f) - - aws_resources = resources.update_collection(name, root, filter_rule, long_poll, concurrency, path, filename) + aws_resources = resources.update_collection(name, root, filter_rule, long_poll, concurrency, + timeout, visibility_timeout, path, filename, backfill_extent) sls_config['resources']['Resources'].update(aws_resources['resources']) sls_config['functions'].update(aws_resources['functions']) @@ -73,6 +76,7 @@ def update_dynamic_catalog(arn): 'satApiIngestSubscription': sns_subscription, 'satApiIngestPolicy': policy }) + sns_subscription.update({'DependsOn': 'stacUpdaterNotifications'}) with open(sls_config_path, 'w') as outf: yaml.dump(sls_config, outf, indent=1) @@ -150,6 +154,43 @@ def add_logging(es_host): with open(sls_config_path, 'w') as outf: yaml.dump(sls_config, outf, indent=1) +@stac_updater.command(name='build-thumbnails', short_help="Generate thumbnails when ingesting items.") +def build_thumbnails(): + # Deploy the stac-thumbnail service + # Subscribe notification SNS topic to stac-thumbnail SQS queue + queue_name = 'newThumbnailQueue' + + with open(sls_config_path, 'r') as f: + sls_config = yaml.unsafe_load(f) + + # Build notification topic if it doesn't already exist + if notification_topic_name not in sls_config['resources']['Resources']: + sls_config['resources']['Resources'].update({ + notification_topic_name: resources.sns_topic(notification_topic_name) + }) + sls_config['provider']['environment'].update({ + 'NOTIFICATION_TOPIC': notification_topic_name + }) + + # Create filter policies based on input collections + subscription, policy = resources.subscribe_sqs_to_sns(queue_name, notification_topic_name) + + # Use remote references instead of local (queue is defined in separate service). + policy['Properties']['PolicyDocument']['Statement'][0].update({ + 'Resource': "arn:aws:sqs:#{AWS::Region}:#{AWS::AccountId}:" + queue_name + }) + policy['Properties']['Queues'][0] = 'https://sqs.#{AWS::Region}.amazonaws.com/#{AWS::AccountId}/' + queue_name + subscription['Properties'].update({'Endpoint': "arn:aws:sqs:#{AWS::Region}:#{AWS::AccountId}:" + queue_name}) + subscription.update({'DependsOn': 'stacUpdaterNotifications'}) + + sls_config['resources']['Resources'].update({ + 'thumbnailSnsSub': subscription, + 'thumbnailSqsPolicy': policy, + }) + + with open(sls_config_path, 'w') as outf: + yaml.dump(sls_config, outf, indent=1) + @stac_updater.command(name='deploy', short_help="deploy service to aws.") def deploy(): subprocess.call("docker build . -t stac-updater:latest", shell=True) diff --git a/stac_updater/handler.py b/stac_updater/handler.py index 5b20cf4..a80cf6c 100644 --- a/stac_updater/handler.py +++ b/stac_updater/handler.py @@ -2,6 +2,7 @@ import json import base64 import gzip +from datetime import datetime import boto3 from satstac import Collection, Item @@ -50,41 +51,69 @@ def kickoff(event, context): ) def update_collection(event, context): - collection_root = os.getenv('COLLECTION_ROOT') - path = os.getenv('PATH') - filename = os.getenv('FILENAME') - - item_count = len(event['Records']) - stac_links = [] - - for record in event['Records']: - stac_item = json.loads(record['body']) - - print(stac_item) - - col = Collection.open(collection_root) - collection_name = col.id - kwargs = {'item': Item(stac_item)} - if path: - kwargs.update({'path': '$' + '/$'.join(path.split('/'))}) - if filename: - kwargs.update({'filename': '$' + '/$'.join(filename.split('/'))}) - print(kwargs) - col.add_item(**kwargs) - col.save() - - stac_links.append(kwargs['item'].links('self')[0]) - - # Send message to SNS Topic if enabled - if NOTIFICATION_TOPIC: - kwargs = utils.stac_to_sns(kwargs['item'].data) - kwargs.update({ - 'TopicArn': f"arn:aws:sns:{REGION}:{ACCOUNT_ID}:{NOTIFICATION_TOPIC}" - }) - sns_client.publish(**kwargs) - - - print(f"LOGS CollectionName: {collection_name}\tItemCount: {item_count}\tItemLinks: {stac_links}") + try: + collection_root = os.getenv('COLLECTION_ROOT') + path = os.getenv('PATH') + filename = os.getenv('FILENAME') + backfill_extent = os.getenv('BACKFILL_EXTENT') + + item_count = len(event['Records']) + stac_links = [] + + for record in event['Records']: + stac_item = json.loads(record['body']) + print(stac_item) + + col = Collection.open(collection_root) + collection_name = col.id + kwargs = {'item': Item(stac_item)} + if path: + kwargs.update({'path': '$' + '/$'.join(path.split('/'))}) + if filename: + kwargs.update({'filename': '$' + '/$'.join(filename.split('/'))}) + + # Update spatial and temporal extent of collection + if backfill_extent: + if 'spatial' in col.data['extent']: + if stac_item['bbox'][0] < col.data['extent']['spatial'][0]: + col.data['extent']['spatial'][0] = stac_item['bbox'][0] + if stac_item['bbox'][1] < col.data['extent']['spatial'][1]: + col.data['extent']['spatial'][1] = stac_item['bbox'][1] + if stac_item['bbox'][2] > col.data['extent']['spatial'][2]: + col.data['extent']['spatial'][2] = stac_item['bbox'][2] + if stac_item['bbox'][3] > col.data['extent']['spatial'][3]: + col.data['extent']['spatial'][3] = stac_item['bbox'][3] + else: + col.data['extent'].update({'spatial': stac_item['bbox']}) + + if 'temporal' in col.data['extent']: + item_dt = utils.load_datetime(stac_item['properties']['datetime']) + min_dt = utils.load_datetime(col.data['extent']['temporal'][0]) + max_dt = utils.load_datetime(col.data['extent']['temporal'][1]) + if item_dt < min_dt: + col.data['extent']['temporal'][0] = stac_item['properties']['datetime'] + if item_dt > max_dt: + col.data['extent']['temporal'][1] = stac_item['properties']['datetime'] + else: + col.data['extent'].update({'temporal': [stac_item['properties']['datetime'], stac_item['properties']['datetime']]}) + + col.add_item(**kwargs) + col.save() + + stac_links.append(kwargs['item'].links('self')[0]) + + # Send message to SNS Topic if enabled + if NOTIFICATION_TOPIC: + kwargs = utils.stac_to_sns(kwargs['item'].data) + kwargs.update({ + 'TopicArn': f"arn:aws:sns:{REGION}:{ACCOUNT_ID}:{NOTIFICATION_TOPIC}" + }) + sns_client.publish(**kwargs) + + + print(f"LOGS CollectionName: {collection_name}\tItemCount: {item_count}\tItemLinks: {stac_links}") + except: + raise def es_log_ingest(event, context): diff --git a/stac_updater/resources.py b/stac_updater/resources.py index 756eb7e..895e92c 100644 --- a/stac_updater/resources.py +++ b/stac_updater/resources.py @@ -67,7 +67,6 @@ def subscribe_lambda_to_sns(lambda_arn, topic_name): "FunctionName": lambda_arn, "Principal": "sns.amazonaws.com", "SourceArn": {"Ref": topic_name}, - } } @@ -85,7 +84,7 @@ def subscribe_lambda_to_sns(lambda_arn, topic_name): return subscription, lambda_policy -def sqs_queue(queue_name, dlq_name=None, maxRetry=3, long_poll=False): +def sqs_queue(queue_name, dlq_name=None, maxRetry=3, long_poll=False, timeout=None): resource = { "Type": "AWS::SQS::Queue", "Properties": { @@ -108,6 +107,9 @@ def sqs_queue(queue_name, dlq_name=None, maxRetry=3, long_poll=False): if long_poll: resource['Properties'].update({'ReceiveMessageWaitTimeSeconds': 20}) + if timeout: + resource['Properties'].update({'VisibilityTimeout': timeout}) + return resource def sns_topic(topic_name): @@ -119,24 +121,19 @@ def sns_topic(topic_name): } return resource -def lambda_sqs_trigger(func_name, queue_name, catalog_root, concurrency): +def lambda_sqs_trigger(func_name, queue_name): func = { "handler": f"stac_updater.handler.{func_name}", - "environment": { - 'COLLECTION_ROOT': catalog_root - }, "events": [ { "sqs": { "arn": "arn:aws:sqs:#{}:#{}:{}".format("{AWS::Region}", "{AWS::AccountId}", - queue_name), + queue_name) } } - ], - "reservedConcurrency": concurrency + ] } - return func def lambda_s3_trigger(func_name, bucket_name): @@ -195,17 +192,25 @@ def lambda_invoke(func_name): } return func -def update_collection(name, root, filter_rule, long_poll, concurrency, path, filename): - dlq_name = f"{name}Dlq" - queue_name = f"{name}Queue" - sns_sub_name = f"{name}SnsSub" - sqs_policy_name = f"{name}SqsPolicy" +def update_collection(name, root, filter_rule, long_poll, concurrency, timeout, vis_timeout, path, filename, backfill_extent): + dlq_name = f"{name}Dlq"[:43] + queue_name = f"{name}Queue"[:43] + sns_sub_name = f"{name}SnsSub"[:43] + sqs_policy_name = f"{name}SqsPolicy"[:43] lambda_name = "update_collection" dlq = sqs_queue(dlq_name) - queue = sqs_queue(queue_name, dlq_name=dlq_name, maxRetry=3, long_poll=long_poll) + queue = sqs_queue(queue_name, dlq_name=dlq_name, maxRetry=3, long_poll=long_poll, timeout=vis_timeout) sns_subscription, sqs_policy = subscribe_sqs_to_sns(queue_name, 'newStacItemTopic', filter_rule) - lambda_updater = lambda_sqs_trigger(lambda_name, queue_name, root, concurrency) + lambda_updater = lambda_sqs_trigger(lambda_name, queue_name) + + lambda_updater.update({ + 'environment': { + 'COLLECTION_ROOT': root + }, + "reservedConcurrency": concurrency, + "timeout": timeout + }) if path: lambda_updater['environment'].update({ @@ -217,6 +222,11 @@ def update_collection(name, root, filter_rule, long_poll, concurrency, path, fil 'FILENAME': filename }) + if backfill_extent: + lambda_updater['environment'].update({ + 'BACKFILL_EXTENT': backfill_extent + }) + return { 'resources': { dlq_name: dlq, @@ -225,6 +235,6 @@ def update_collection(name, root, filter_rule, long_poll, concurrency, path, fil sqs_policy_name: sqs_policy }, 'functions': { - f"{name}_{lambda_name}": lambda_updater + f"{name}_{lambda_name}"[:45]: lambda_updater } - } + } \ No newline at end of file diff --git a/stac_updater/utils.py b/stac_updater/utils.py index 298c270..69817e3 100644 --- a/stac_updater/utils.py +++ b/stac_updater/utils.py @@ -1,3 +1,4 @@ +from datetime import datetime import json def stac_to_sns(stac_item): @@ -36,3 +37,8 @@ def stac_to_sns(stac_item): "MessageAttributes": attributes } +def load_datetime(date_str): + try: + return datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%S.%fZ") + except: + return datetime.strptime(date_str, "%Y-%m-%d") \ No newline at end of file