Skip to content

Commit d866bfb

Browse files
Add replay-ability for transaction collection process
Previously, if a transaction history collection job failed to process for whatever reason (such as a service outage), it was incredibly difficult to run the processing for that specific time period again. This change leverages the fact that event bridge sends in the timestamp of the time in which the scheduled task was triggered. By going off of this time, rather than generating the timestamp in the lambda, developers can simply rerun the failed step function execution, and it will process all the transactions for that same time period. Due to the idempotent nature of the transaction pk/sk design, we will never see duplicate transaction records for the same transaction ids.
1 parent 18574e1 commit d866bfb

File tree

4 files changed

+82
-9
lines changed

4 files changed

+82
-9
lines changed

backend/compact-connect/lambdas/python/purchases/handlers/transaction_history.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import json
2-
from datetime import timedelta
2+
from datetime import datetime, timedelta
33

44
from aws_lambda_powertools.utilities.typing import LambdaContext
55
from cc_common.config import config, logger
@@ -25,6 +25,7 @@ def process_settled_transactions(event: dict, context: LambdaContext) -> dict:
2525
2626
:param event: Lambda event containing:
2727
- compact: The compact name
28+
- scheduledTime: The scheduled execution time from EventBridge for replay-ability
2829
- lastProcessedTransactionId: Optional last processed transaction ID
2930
- currentBatchId: Optional current batch ID being processed
3031
- processedBatchIds: Optional list of batch IDs that have been processed, this ensures we don't process the same
@@ -33,14 +34,16 @@ def process_settled_transactions(event: dict, context: LambdaContext) -> dict:
3334
:return: Dictionary indicating processing status and optional pagination info
3435
"""
3536
compact = event['compact']
37+
scheduled_time = event['scheduledTime']
3638
last_processed_transaction_id = event.get('lastProcessedTransactionId')
3739
current_batch_id = event.get('currentBatchId')
3840
processed_batch_ids = event.get('processedBatchIds', [])
3941

42+
# Use the scheduled time from EventBridge for replay-ability
4043
# By default, the authorize.net accounts batch settlements at 4:00pm Pacific Time.
4144
# This daily collector runs an hour later (5pm PST, which is 1am UTC) to collect
4245
# all settled transaction for the last 24 hours.
43-
end_time = config.current_standard_datetime.replace(hour=1, minute=0, second=0, microsecond=0)
46+
end_time = datetime.fromisoformat(scheduled_time).replace(hour=1, minute=0, second=0, microsecond=0)
4447
start_time = end_time - timedelta(days=1)
4548

4649
# Format timestamps for API call
@@ -81,6 +84,7 @@ def process_settled_transactions(event: dict, context: LambdaContext) -> dict:
8184
# Return appropriate response based on whether there are more transactions to process
8285
response = {
8386
'compact': compact, # Always include the compact name
87+
'scheduledTime': scheduled_time, # Preserve scheduled time for subsequent iterations
8488
'status': 'IN_PROGRESS' if not _all_transactions_processed(transaction_response) else 'COMPLETE',
8589
'processedBatchIds': transaction_response['processedBatchIds'],
8690
}

backend/compact-connect/lambdas/python/purchases/tests/function/test_handlers/test_transaction_history.py

Lines changed: 52 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,6 @@
88

99
TEST_COMPACT = 'aslp'
1010
TEST_AUD_LICENSE_TYPE_ABBR = 'aud'
11-
MOCK_START_TIME = '2024-01-01T00:00:00Z'
12-
MOCK_END_TIME = '2024-01-02T00:00:00Z'
13-
MOCK_TRANSACTION_LIMIT = 500
1411

1512
# Test transaction data
1613
MOCK_TRANSACTION_ID = '12345'
@@ -28,6 +25,8 @@
2825
MOCK_CURRENT_BATCH_ID = 'mock_current_batch_id'
2926
MOCK_PROCESSED_BATCH_IDS = ['mock_processed_batch_id']
3027

28+
MOCK_SCHEDULED_TIME = '2024-01-01T01:00:00Z'
29+
3130

3231
def _generate_mock_transaction(
3332
transaction_id=MOCK_TRANSACTION_ID,
@@ -139,6 +138,7 @@ def _add_mock_privilege_update_to_database(
139138
def _when_testing_non_paginated_event(self, test_compact=TEST_COMPACT):
140139
return {
141140
'compact': test_compact,
141+
'scheduledTime': MOCK_SCHEDULED_TIME, # Mock EventBridge scheduled time
142142
'lastProcessedTransactionId': None,
143143
'currentBatchId': None,
144144
'processedBatchIds': None,
@@ -147,6 +147,7 @@ def _when_testing_non_paginated_event(self, test_compact=TEST_COMPACT):
147147
def _when_testing_paginated_event(self, test_compact=TEST_COMPACT):
148148
return {
149149
'compact': test_compact,
150+
'scheduledTime': MOCK_SCHEDULED_TIME, # Mock EventBridge scheduled time
150151
'lastProcessedTransactionId': MOCK_LAST_PROCESSED_TRANSACTION_ID,
151152
'currentBatchId': MOCK_CURRENT_BATCH_ID,
152153
'processedBatchIds': MOCK_PROCESSED_BATCH_IDS,
@@ -196,7 +197,15 @@ def test_process_settled_transactions_returns_complete_status(self, mock_purchas
196197
event = self._when_testing_non_paginated_event()
197198
resp = process_settled_transactions(event, self.mock_context)
198199

199-
self.assertEqual({'compact': 'aslp', 'processedBatchIds': [MOCK_BATCH_ID], 'status': 'COMPLETE'}, resp)
200+
self.assertEqual(
201+
{
202+
'compact': 'aslp',
203+
'scheduledTime': MOCK_SCHEDULED_TIME,
204+
'processedBatchIds': [MOCK_BATCH_ID],
205+
'status': 'COMPLETE',
206+
},
207+
resp,
208+
)
200209

201210
@patch('handlers.transaction_history.PurchaseClient')
202211
def test_process_settled_transactions_passes_pagination_values_into_purchase_client(
@@ -287,6 +296,42 @@ def test_process_settled_transactions_stores_transactions_in_dynamodb(self, mock
287296
stored_transactions['Items'],
288297
)
289298

299+
@patch('handlers.transaction_history.PurchaseClient')
300+
def test_process_settled_transactions_does_not_duplicate_identical_transaction_records(
301+
self, mock_purchase_client_constructor
302+
):
303+
"""Test that transactions are stored in DynamoDB."""
304+
from handlers.transaction_history import process_settled_transactions
305+
306+
# in this test, there is one transaction, and one privilege. These should map together using the default
307+
# transaction id and privilege id
308+
self._when_purchase_client_returns_transactions(mock_purchase_client_constructor)
309+
self._add_mock_privilege_to_database()
310+
311+
event = self._when_testing_non_paginated_event()
312+
313+
process_settled_transactions(event, self.mock_context)
314+
315+
# Verify transactions were stored in DynamoDB
316+
stored_transactions = self.config.transaction_history_table.query(
317+
KeyConditionExpression='pk = :pk',
318+
ExpressionAttributeValues={':pk': f'COMPACT#{TEST_COMPACT}#TRANSACTIONS#MONTH#2024-01'},
319+
)
320+
321+
self.assertEqual(1, len(stored_transactions['Items']))
322+
323+
# now run the lambda again with the same payload, the duplicate transaction record should overwrite the
324+
# existing one
325+
process_settled_transactions(event, self.mock_context)
326+
327+
# Verify transactions were stored in DynamoDB
328+
stored_transactions = self.config.transaction_history_table.query(
329+
KeyConditionExpression='pk = :pk',
330+
ExpressionAttributeValues={':pk': f'COMPACT#{TEST_COMPACT}#TRANSACTIONS#MONTH#2024-01'},
331+
)
332+
333+
self.assertEqual(1, len(stored_transactions['Items']))
334+
290335
@patch('handlers.transaction_history.PurchaseClient')
291336
def test_process_settled_transactions_returns_in_progress_status_with_pagination_values(
292337
self, mock_purchase_client_constructor
@@ -302,6 +347,7 @@ def test_process_settled_transactions_returns_in_progress_status_with_pagination
302347
self.assertEqual(
303348
{
304349
'compact': TEST_COMPACT,
350+
'scheduledTime': MOCK_SCHEDULED_TIME,
305351
'status': 'IN_PROGRESS',
306352
'lastProcessedTransactionId': MOCK_LAST_PROCESSED_TRANSACTION_ID,
307353
'currentBatchId': MOCK_CURRENT_BATCH_ID,
@@ -369,6 +415,7 @@ def test_process_settled_transactions_returns_batch_failure_status_after_process
369415
{
370416
'status': 'IN_PROGRESS',
371417
'compact': TEST_COMPACT,
418+
'scheduledTime': MOCK_SCHEDULED_TIME,
372419
'currentBatchId': MOCK_BATCH_ID,
373420
'lastProcessedTransactionId': mock_first_iteration_failed_transaction_id,
374421
'processedBatchIds': [],
@@ -391,6 +438,7 @@ def test_process_settled_transactions_returns_batch_failure_status_after_process
391438
{
392439
'status': 'BATCH_FAILURE',
393440
'compact': TEST_COMPACT,
441+
'scheduledTime': MOCK_SCHEDULED_TIME,
394442
'processedBatchIds': [MOCK_BATCH_ID],
395443
'batchFailureErrorMessage': json.dumps(
396444
{

backend/compact-connect/stacks/transaction_monitoring_stack/transaction_history_processing_workflow.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
LogLevel,
1919
LogOptions,
2020
Pass,
21-
Result,
2221
StateMachine,
2322
Succeed,
2423
TaskInput,
@@ -98,11 +97,16 @@ def __init__(
9897
)
9998

10099
# Create Step Function definition
101-
# set initial values for the process task to use
100+
# Transform EventBridge input to extract time and create payload for replay-ability
102101
self.initialize_state = Pass(
103102
self,
104103
f'{compact}-InitializeState',
105-
result=Result.from_object({'Payload': {'compact': compact, 'processedBatchIds': []}}),
104+
parameters={
105+
'compact': compact,
106+
'scheduledTime.$': '$.time', # Extract time from EventBridge event
107+
'processedBatchIds': [],
108+
},
109+
result_path='$.Payload',
106110
)
107111

108112
self.processor_task = LambdaInvoke(

backend/compact-connect/tests/app/test_transaction_monitoring.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,23 @@ def test_workflow_generates_expected_process_transaction_history_lambda_invoke_s
150150
),
151151
)
152152

153+
def test_workflow_generates_expected_initialize_step(self):
154+
aslp_transaction_history_proccessing_workflow = (
155+
self.app.prod_backend_pipeline_stack.prod_stage.transaction_monitoring_stack.compact_state_machines['aslp']
156+
)
157+
158+
self.assertEqual(
159+
{
160+
'Next': 'aslp-ProcessTransactionHistory',
161+
'Parameters': {'compact': 'aslp', 'processedBatchIds': [], 'scheduledTime.$': '$.time'},
162+
'ResultPath': '$.Payload',
163+
'Type': 'Pass',
164+
},
165+
self.remove_dynamic_tokens_numbers(
166+
aslp_transaction_history_proccessing_workflow.initialize_state.to_state_json()
167+
),
168+
)
169+
153170
def test_workflow_generates_expected_choice_step(self):
154171
aslp_transaction_history_proccessing_workflow = (
155172
self.app.prod_backend_pipeline_stack.prod_stage.transaction_monitoring_stack.compact_state_machines['aslp']

0 commit comments

Comments
 (0)