Skip to content

v0.3.1 #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ stac-updater new-service
# Build AWS resources to update collection
stac-updater update-collection --root https://stac.com/landsat-8-l1/catalog.json \
--path {landsat:path}/{landsat:row} \
--row {date}/{id}
--filename {date}/{id}

# Modify kickoff event source to s3:ObjectCreated
stac-updater modify-kickoff --type s3 --bucket_name stac-updater-kickoff
Expand Down Expand Up @@ -88,15 +88,28 @@ It took 86 total invocations to process the 200 STAC Items.
![es-logging-2](docs/images/es-logging-summary.png)

## Update Dynamic Catalog
STAC Items which are successfully ingested into a static collection may also by ingested into a deployed instance of [sat-api](https://github.com/sat-utils/sat-api).
STAC Items which are successfully ingested into a static collection may also by ingested into a deployed instance of [sat-api](https://github.com/sat-utils/sat-api)28.

```
stac-updater update-dynamic-collection --arn arn:aws:lambda:<region>:<accountid>:function:sat-api-ingest
```

![update-sat-api](docs/images/update-sat-api.png)

## Thumbnails
You may backfill thumbnails as items are inserted into the STAC catalog using a deployed instance of [stac-thumbnails](https://github.com/geospatial-jeff/stac-thumbnails).

```
stac-updater build-thumbnails
```

![update-sat-api](docs/images/build-thumbnails.png)

This plugin is currently designed for a specific use-case and expects the following conditions:
- The underlying asset is referenced at `stac_item['assets']['data']['href']`.
- The underlying asset is a three band image (RGB).

More generic support for thumbnails will be added at a later date if there is interest.

# TODOS
- Add support for [staccato](https://github.com/boundlessgeo/staccato).
Binary file added docs/images/build-thumbnails.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
requirements = [line.rstrip() for line in reqs]

setup(name="STAC Updater",
version='0.3',
version='0.3.1',
author='Jeff Albrecht',
author_email='[email protected]',
packages=find_packages(exclude=['package']),
Expand Down
47 changes: 44 additions & 3 deletions stac_updater/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@ def new_service():
@click.option('--root', '-r', type=str, required=True, help="URL of collection.")
@click.option('--long-poll/--short-poll', default=False, help="Enable long polling.")
@click.option('--concurrency', type=int, default=1, help="Sets lambda concurrency limit when polling the queue.")
@click.option('--timeout', type=int, default=15, help="Sets lambda timeout.")
@click.option('--visibility_timeout', type=int, default=60, help="Sets visibility timeout of messages consumed by the queue.")
@click.option('--path', type=str, help="Pattern used by sat-stac to build sub-catalogs.")
@click.option('--filename', type=str, help="Pattern used by sat-stac to build item name.")
def update_collection(root, long_poll, concurrency, path, filename):
@click.option('--backfill_extent/--no_backfill', default=False, help="Enable backfilling of collection spatial/temporal extent.")
def update_collection(root, long_poll, concurrency, timeout, visibility_timeout, path, filename, backfill_extent):
# Create a SQS queue for the collection
# Subscribe SQS queue to SNS topic with filter policy on collection name
# Configure lambda function and attach to SQS queue (use ENV variables to pass state)
Expand All @@ -42,8 +45,8 @@ def update_collection(root, long_poll, concurrency, path, filename):
with open(sls_config_path, 'r') as f:
# Using unsafe load to preserve type.
sls_config = yaml.unsafe_load(f)

aws_resources = resources.update_collection(name, root, filter_rule, long_poll, concurrency, path, filename)
aws_resources = resources.update_collection(name, root, filter_rule, long_poll, concurrency,
timeout, visibility_timeout, path, filename, backfill_extent)
sls_config['resources']['Resources'].update(aws_resources['resources'])
sls_config['functions'].update(aws_resources['functions'])

Expand Down Expand Up @@ -73,6 +76,7 @@ def update_dynamic_catalog(arn):
'satApiIngestSubscription': sns_subscription,
'satApiIngestPolicy': policy
})
sns_subscription.update({'DependsOn': 'stacUpdaterNotifications'})

with open(sls_config_path, 'w') as outf:
yaml.dump(sls_config, outf, indent=1)
Expand Down Expand Up @@ -150,6 +154,43 @@ def add_logging(es_host):
with open(sls_config_path, 'w') as outf:
yaml.dump(sls_config, outf, indent=1)

@stac_updater.command(name='build-thumbnails', short_help="Generate thumbnails when ingesting items.")
def build_thumbnails():
# Deploy the stac-thumbnail service
# Subscribe notification SNS topic to stac-thumbnail SQS queue
queue_name = 'newThumbnailQueue'

with open(sls_config_path, 'r') as f:
sls_config = yaml.unsafe_load(f)

# Build notification topic if it doesn't already exist
if notification_topic_name not in sls_config['resources']['Resources']:
sls_config['resources']['Resources'].update({
notification_topic_name: resources.sns_topic(notification_topic_name)
})
sls_config['provider']['environment'].update({
'NOTIFICATION_TOPIC': notification_topic_name
})

# Create filter policies based on input collections
subscription, policy = resources.subscribe_sqs_to_sns(queue_name, notification_topic_name)

# Use remote references instead of local (queue is defined in separate service).
policy['Properties']['PolicyDocument']['Statement'][0].update({
'Resource': "arn:aws:sqs:#{AWS::Region}:#{AWS::AccountId}:" + queue_name
})
policy['Properties']['Queues'][0] = 'https://sqs.#{AWS::Region}.amazonaws.com/#{AWS::AccountId}/' + queue_name
subscription['Properties'].update({'Endpoint': "arn:aws:sqs:#{AWS::Region}:#{AWS::AccountId}:" + queue_name})
subscription.update({'DependsOn': 'stacUpdaterNotifications'})

sls_config['resources']['Resources'].update({
'thumbnailSnsSub': subscription,
'thumbnailSqsPolicy': policy,
})

with open(sls_config_path, 'w') as outf:
yaml.dump(sls_config, outf, indent=1)

@stac_updater.command(name='deploy', short_help="deploy service to aws.")
def deploy():
subprocess.call("docker build . -t stac-updater:latest", shell=True)
Expand Down
99 changes: 64 additions & 35 deletions stac_updater/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
import base64
import gzip
from datetime import datetime

import boto3
from satstac import Collection, Item
Expand Down Expand Up @@ -50,41 +51,69 @@ def kickoff(event, context):
)

def update_collection(event, context):
collection_root = os.getenv('COLLECTION_ROOT')
path = os.getenv('PATH')
filename = os.getenv('FILENAME')

item_count = len(event['Records'])
stac_links = []

for record in event['Records']:
stac_item = json.loads(record['body'])

print(stac_item)

col = Collection.open(collection_root)
collection_name = col.id
kwargs = {'item': Item(stac_item)}
if path:
kwargs.update({'path': '$' + '/$'.join(path.split('/'))})
if filename:
kwargs.update({'filename': '$' + '/$'.join(filename.split('/'))})
print(kwargs)
col.add_item(**kwargs)
col.save()

stac_links.append(kwargs['item'].links('self')[0])

# Send message to SNS Topic if enabled
if NOTIFICATION_TOPIC:
kwargs = utils.stac_to_sns(kwargs['item'].data)
kwargs.update({
'TopicArn': f"arn:aws:sns:{REGION}:{ACCOUNT_ID}:{NOTIFICATION_TOPIC}"
})
sns_client.publish(**kwargs)


print(f"LOGS CollectionName: {collection_name}\tItemCount: {item_count}\tItemLinks: {stac_links}")
try:
collection_root = os.getenv('COLLECTION_ROOT')
path = os.getenv('PATH')
filename = os.getenv('FILENAME')
backfill_extent = os.getenv('BACKFILL_EXTENT')

item_count = len(event['Records'])
stac_links = []

for record in event['Records']:
stac_item = json.loads(record['body'])
print(stac_item)

col = Collection.open(collection_root)
collection_name = col.id
kwargs = {'item': Item(stac_item)}
if path:
kwargs.update({'path': '$' + '/$'.join(path.split('/'))})
if filename:
kwargs.update({'filename': '$' + '/$'.join(filename.split('/'))})

# Update spatial and temporal extent of collection
if backfill_extent:
if 'spatial' in col.data['extent']:
if stac_item['bbox'][0] < col.data['extent']['spatial'][0]:
col.data['extent']['spatial'][0] = stac_item['bbox'][0]
if stac_item['bbox'][1] < col.data['extent']['spatial'][1]:
col.data['extent']['spatial'][1] = stac_item['bbox'][1]
if stac_item['bbox'][2] > col.data['extent']['spatial'][2]:
col.data['extent']['spatial'][2] = stac_item['bbox'][2]
if stac_item['bbox'][3] > col.data['extent']['spatial'][3]:
col.data['extent']['spatial'][3] = stac_item['bbox'][3]
else:
col.data['extent'].update({'spatial': stac_item['bbox']})

if 'temporal' in col.data['extent']:
item_dt = utils.load_datetime(stac_item['properties']['datetime'])
min_dt = utils.load_datetime(col.data['extent']['temporal'][0])
max_dt = utils.load_datetime(col.data['extent']['temporal'][1])
if item_dt < min_dt:
col.data['extent']['temporal'][0] = stac_item['properties']['datetime']
if item_dt > max_dt:
col.data['extent']['temporal'][1] = stac_item['properties']['datetime']
else:
col.data['extent'].update({'temporal': [stac_item['properties']['datetime'], stac_item['properties']['datetime']]})

col.add_item(**kwargs)
col.save()

stac_links.append(kwargs['item'].links('self')[0])

# Send message to SNS Topic if enabled
if NOTIFICATION_TOPIC:
kwargs = utils.stac_to_sns(kwargs['item'].data)
kwargs.update({
'TopicArn': f"arn:aws:sns:{REGION}:{ACCOUNT_ID}:{NOTIFICATION_TOPIC}"
})
sns_client.publish(**kwargs)


print(f"LOGS CollectionName: {collection_name}\tItemCount: {item_count}\tItemLinks: {stac_links}")
except:
raise


def es_log_ingest(event, context):
Expand Down
48 changes: 29 additions & 19 deletions stac_updater/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ def subscribe_lambda_to_sns(lambda_arn, topic_name):
"FunctionName": lambda_arn,
"Principal": "sns.amazonaws.com",
"SourceArn": {"Ref": topic_name},

}
}

Expand All @@ -85,7 +84,7 @@ def subscribe_lambda_to_sns(lambda_arn, topic_name):
return subscription, lambda_policy


def sqs_queue(queue_name, dlq_name=None, maxRetry=3, long_poll=False):
def sqs_queue(queue_name, dlq_name=None, maxRetry=3, long_poll=False, timeout=None):
resource = {
"Type": "AWS::SQS::Queue",
"Properties": {
Expand All @@ -108,6 +107,9 @@ def sqs_queue(queue_name, dlq_name=None, maxRetry=3, long_poll=False):
if long_poll:
resource['Properties'].update({'ReceiveMessageWaitTimeSeconds': 20})

if timeout:
resource['Properties'].update({'VisibilityTimeout': timeout})

return resource

def sns_topic(topic_name):
Expand All @@ -119,24 +121,19 @@ def sns_topic(topic_name):
}
return resource

def lambda_sqs_trigger(func_name, queue_name, catalog_root, concurrency):
def lambda_sqs_trigger(func_name, queue_name):
func = {
"handler": f"stac_updater.handler.{func_name}",
"environment": {
'COLLECTION_ROOT': catalog_root
},
"events": [
{
"sqs": {
"arn": "arn:aws:sqs:#{}:#{}:{}".format("{AWS::Region}",
"{AWS::AccountId}",
queue_name),
queue_name)
}
}
],
"reservedConcurrency": concurrency
]
}

return func

def lambda_s3_trigger(func_name, bucket_name):
Expand Down Expand Up @@ -195,17 +192,25 @@ def lambda_invoke(func_name):
}
return func

def update_collection(name, root, filter_rule, long_poll, concurrency, path, filename):
dlq_name = f"{name}Dlq"
queue_name = f"{name}Queue"
sns_sub_name = f"{name}SnsSub"
sqs_policy_name = f"{name}SqsPolicy"
def update_collection(name, root, filter_rule, long_poll, concurrency, timeout, vis_timeout, path, filename, backfill_extent):
dlq_name = f"{name}Dlq"[:43]
queue_name = f"{name}Queue"[:43]
sns_sub_name = f"{name}SnsSub"[:43]
sqs_policy_name = f"{name}SqsPolicy"[:43]
lambda_name = "update_collection"

dlq = sqs_queue(dlq_name)
queue = sqs_queue(queue_name, dlq_name=dlq_name, maxRetry=3, long_poll=long_poll)
queue = sqs_queue(queue_name, dlq_name=dlq_name, maxRetry=3, long_poll=long_poll, timeout=vis_timeout)
sns_subscription, sqs_policy = subscribe_sqs_to_sns(queue_name, 'newStacItemTopic', filter_rule)
lambda_updater = lambda_sqs_trigger(lambda_name, queue_name, root, concurrency)
lambda_updater = lambda_sqs_trigger(lambda_name, queue_name)

lambda_updater.update({
'environment': {
'COLLECTION_ROOT': root
},
"reservedConcurrency": concurrency,
"timeout": timeout
})

if path:
lambda_updater['environment'].update({
Expand All @@ -217,6 +222,11 @@ def update_collection(name, root, filter_rule, long_poll, concurrency, path, fil
'FILENAME': filename
})

if backfill_extent:
lambda_updater['environment'].update({
'BACKFILL_EXTENT': backfill_extent
})

return {
'resources': {
dlq_name: dlq,
Expand All @@ -225,6 +235,6 @@ def update_collection(name, root, filter_rule, long_poll, concurrency, path, fil
sqs_policy_name: sqs_policy
},
'functions': {
f"{name}_{lambda_name}": lambda_updater
f"{name}_{lambda_name}"[:45]: lambda_updater
}
}
}
6 changes: 6 additions & 0 deletions stac_updater/utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import datetime
import json

def stac_to_sns(stac_item):
Expand Down Expand Up @@ -36,3 +37,8 @@ def stac_to_sns(stac_item):
"MessageAttributes": attributes
}

def load_datetime(date_str):
try:
return datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%S.%fZ")
except:
return datetime.strptime(date_str, "%Y-%m-%d")