Skip to content

Integrate large payload support for SQS #2116

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions kombu/asynchronous/aws/ext.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ class BotoCoreError(Exception):
get_response = _void()
get_cert_path = _void()

try:
import sqs_extended_client
except ImportError:
sqs_extended_client = None


__all__ = (
'exceptions', 'AWSRequest', 'get_response', 'get_cert_path',
Expand Down
5 changes: 5 additions & 0 deletions kombu/asynchronous/aws/sqs/ext.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,8 @@
import boto3
except ImportError:
boto3 = None

try:
import sqs_extended_client
except ImportError:
sqs_extended_client = None
55 changes: 53 additions & 2 deletions kombu/transport/SQS.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@
from __future__ import annotations

import base64
import json
import socket
import string
import uuid
Expand All @@ -144,7 +145,7 @@
from vine import ensure_promise, promise, transform

from kombu.asynchronous import get_event_loop
from kombu.asynchronous.aws.ext import boto3, exceptions
from kombu.asynchronous.aws.ext import boto3, exceptions, sqs_extended_client
from kombu.asynchronous.aws.sqs.connection import AsyncSQSConnection
from kombu.asynchronous.aws.sqs.message import AsyncMessage
from kombu.log import get_logger
Expand Down Expand Up @@ -498,6 +499,24 @@
message['ReceiptHandle'],
)
else:

if (
sqs_extended_client and
isinstance(payload, list)
and payload[0] == sqs_extended_client.client.MESSAGE_POINTER_CLASS
):
# Used the sqs_extended_client, so we need to fetch the file from S3 and use that as the payload
s3_details = payload[1]
s3_bucket_name, s3_key = s3_details["s3BucketName"], s3_details["s3Key"]

Check warning on line 510 in kombu/transport/SQS.py

View check run for this annotation

Codecov / codecov/patch

kombu/transport/SQS.py#L509-L510

Added lines #L509 - L510 were not covered by tests

s3_client = self.s3()
response = s3_client.get_object(Bucket=s3_bucket_name, Key=s3_key)

Check warning on line 513 in kombu/transport/SQS.py

View check run for this annotation

Codecov / codecov/patch

kombu/transport/SQS.py#L512-L513

Added lines #L512 - L513 were not covered by tests

# The message body is under a wrapper class called StreamingBody
streaming_body = response["Body"]
body = self._optional_b64_decode(streaming_body.read())
payload = json.loads(body)

Check warning on line 518 in kombu/transport/SQS.py

View check run for this annotation

Codecov / codecov/patch

kombu/transport/SQS.py#L516-L518

Added lines #L516 - L518 were not covered by tests

try:
properties = payload['properties']
delivery_info = payload['properties']['delivery_info']
Expand Down Expand Up @@ -713,6 +732,32 @@
# if "can't set attribute" not in str(exc):
# raise

def new_s3_client(
self, region, access_key_id, secret_access_key, session_token=None
):
session = boto3.session.Session(

Check warning on line 738 in kombu/transport/SQS.py

View check run for this annotation

Codecov / codecov/patch

kombu/transport/SQS.py#L738

Added line #L738 was not covered by tests
region_name=region,
aws_access_key_id=access_key_id,
aws_secret_access_key=secret_access_key,
aws_session_token=session_token,
)
is_secure = self.is_secure if self.is_secure is not None else True
client_kwargs = {"use_ssl": is_secure}

Check warning on line 745 in kombu/transport/SQS.py

View check run for this annotation

Codecov / codecov/patch

kombu/transport/SQS.py#L744-L745

Added lines #L744 - L745 were not covered by tests

if self.endpoint_url is not None:
client_kwargs["endpoint_url"] = self.endpoint_url

Check warning on line 748 in kombu/transport/SQS.py

View check run for this annotation

Codecov / codecov/patch

kombu/transport/SQS.py#L748

Added line #L748 was not covered by tests

client = session.client("s3", **client_kwargs)

Check warning on line 750 in kombu/transport/SQS.py

View check run for this annotation

Codecov / codecov/patch

kombu/transport/SQS.py#L750

Added line #L750 was not covered by tests

return client

Check warning on line 752 in kombu/transport/SQS.py

View check run for this annotation

Codecov / codecov/patch

kombu/transport/SQS.py#L752

Added line #L752 was not covered by tests

def s3(self):
return self.new_s3_client(

Check warning on line 755 in kombu/transport/SQS.py

View check run for this annotation

Codecov / codecov/patch

kombu/transport/SQS.py#L755

Added line #L755 was not covered by tests
region=self.region,
access_key_id=self.conninfo.userid,
secret_access_key=self.conninfo.password,
)

def new_sqs_client(self, region, access_key_id,
secret_access_key, session_token=None):
session = boto3.session.Session(
Expand All @@ -729,7 +774,13 @@
client_kwargs['endpoint_url'] = self.endpoint_url
client_config = self.transport_options.get('client-config') or {}
config = Config(**client_config)
return session.client('sqs', config=config, **client_kwargs)
client = session.client('sqs', config=config, **client_kwargs)

Check warning on line 777 in kombu/transport/SQS.py

View check run for this annotation

Codecov / codecov/patch

kombu/transport/SQS.py#L777

Added line #L777 was not covered by tests

if self.transport_options.get('large_payload_bucket') and sqs_extended_client:
client.large_payload_support = self.transport_options.get('large_payload_bucket')
client.use_legacy_attribute = False

Check warning on line 781 in kombu/transport/SQS.py

View check run for this annotation

Codecov / codecov/patch

kombu/transport/SQS.py#L780-L781

Added lines #L780 - L781 were not covered by tests

return client

Check warning on line 783 in kombu/transport/SQS.py

View check run for this annotation

Codecov / codecov/patch

kombu/transport/SQS.py#L783

Added line #L783 was not covered by tests

def sqs(self, queue=None):
if queue is not None and self.predefined_queues:
Expand Down
1 change: 1 addition & 0 deletions requirements/extras/sqs.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
boto3>=1.26.143
urllib3>=1.26.16
amazon-sqs-extended-client>=1.0.1
75 changes: 75 additions & 0 deletions t/unit/transport/test_SQS.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
from __future__ import annotations

import base64
import json
import os
import random
import string
from datetime import datetime, timedelta
from io import BytesIO
from queue import Empty
from unittest.mock import Mock, patch

Expand All @@ -19,6 +21,7 @@
from kombu import Connection, Exchange, Queue, messaging

boto3 = pytest.importorskip('boto3')
sqs_extended_client = pytest.importorskip('sqs_extended_client')

from botocore.exceptions import ClientError # noqa

Expand Down Expand Up @@ -364,6 +367,51 @@ def test_optional_b64_decode(self):
assert self.channel._optional_b64_decode(raw) == raw
assert self.channel._optional_b64_decode(b"test123") == b"test123"

@patch('boto3.session.Session')
def test_new_s3_client_with_is_secure_false(self, mock_session):
self.channel.is_secure = False
self.channel.endpoint_url = None

self.channel.new_s3_client(
region='us-west-2',
access_key_id='test_access_key',
secret_access_key='test_secret_key'
)

# assert isinstance(client, boto3.client('s3').__class__)
mock_session.assert_called_once_with(
region_name='us-west-2',
aws_access_key_id='test_access_key',
aws_secret_access_key='test_secret_key',
aws_session_token=None
)
mock_session().client.assert_called_once_with(
's3', use_ssl=False
)

@patch('boto3.session.Session')
def test_new_s3_client_with_custom_endpoint(self, mock_session):
mock_client = Mock()
mock_session.return_value.client.return_value = mock_client

self.channel.is_secure = True
self.channel.endpoint_url = 'https://custom-endpoint.com'

result = self.channel.new_s3_client('us-west-2', 'access_key', 'secret_key')

mock_session.assert_called_once_with(
region_name='us-west-2',
aws_access_key_id='access_key',
aws_secret_access_key='secret_key',
aws_session_token=None
)
mock_session.return_value.client.assert_called_once_with(
's3',
use_ssl=True,
endpoint_url='https://custom-endpoint.com'
)
assert result == mock_client

def test_messages_to_python(self):
from kombu.asynchronous.aws.sqs.message import Message

Expand Down Expand Up @@ -1035,3 +1083,30 @@ def test_message_attribute(self):
assert message == output_message.payload
# It's not propagated to the properties
assert 'message_attributes' not in output_message.properties

def test_message_to_python_with_sqs_extended_client(self):
message = [
sqs_extended_client.client.MESSAGE_POINTER_CLASS,
{'s3BucketName': 's3://large-payload-bucket', 's3Key': 'payload.json'}
]

# Get the messages now
with patch('kombu.transport.SQS.Channel.s3') as s3_mock:
s3_client = Mock(
get_object=Mock(
return_value={'Body': BytesIO(json.dumps({"my_key": "Hello, World!"}).encode()), })
)
s3_mock.return_value = s3_client

result = self.channel._message_to_python(
{'Body': json.dumps(message), 'ReceiptHandle': 'handle'}, self.queue_name,
'test',
)

assert s3_client.get_object.called

# Make sure they're payload-style objects
assert 'properties' in result

# Data from s3 is loaded into the return payload
assert 'my_key' in result