Skip to content

Use Data Section by Default instead of AMQPValue for the message creation #63

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
May 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .ci/ubuntu/gha-setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ set -o xtrace
script_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
readonly script_dir
echo "[INFO] script_dir: '$script_dir'"
readonly rabbitmq_image=rabbitmq:4.1.0-beta.4-management-alpine
readonly rabbitmq_image=rabbitmq:4.1.0-management


readonly docker_name_prefix='rabbitmq-amqp-python-client'
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ __pycache__/
local*
.githooks/
.venv/
.ci/ubuntu/log/*
8 changes: 8 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,13 @@ rabbitmq-server:
rabbitmq-server-stop:
./.ci/ubuntu/gha-setup.sh stop

format:
poetry run isort --skip rabbitmq_amqp_python_client/qpid .
poetry run black rabbitmq_amqp_python_client/
poetry run black tests/
poetry run flake8 --exclude=venv,local_tests,docs/examples,rabbitmq_amqp_python_client/qpid --max-line-length=120 --ignore=E203,W503

test: format
poetry run pytest .
help:
cat Makefile
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from rabbitmq_amqp_python_client import Converter

# RabbitMQ AMQP 1.0 Python Client

This library is in early stages of development. It is meant to be used with RabbitMQ 4.0.
Expand Down Expand Up @@ -83,7 +85,7 @@ For example:

# publish messages
for i in range(messages_to_publish):
publisher.publish(Message(body="test"))
publisher.publish(Message(body=Converter.string_to_bytes("test")))

publisher.close()
```
Expand Down Expand Up @@ -149,7 +151,7 @@ You can check the [`ssl example`](./examples/tls/tls_example.py) to see how to e

The client supports oauth2 authentication.

You can check the [`oauth2 example`](./examples/oauth/oaut.py) to see how to establish and refresh a connection using an oauth2 token
You can check the [`oauth2 example`](examples/oauth/oAuth2.py) to see how to establish and refresh a connection using an oauth2 token

### Managing disconnections

Expand Down
2 changes: 1 addition & 1 deletion examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ Client examples
- [Reconnection](./reconnection/reconnection_example.py) - Producer and Consumer example with reconnection
- [TLS](./tls/tls_example.py) - Producer and Consumer using a TLS connection
- [Streams](./streams/example_with_streams.py) - Example supporting stream capabilities
- [Oauth](./oauth/oauth.py) - Connection through Oauth token
- [Oauth](./oauth/oAuth2.py) - Connection through Oauth token
19 changes: 11 additions & 8 deletions examples/getting_started/getting_started.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
AddressHelper,
AMQPMessagingHandler,
Connection,
Converter,
Environment,
Event,
ExchangeSpecification,
Expand All @@ -24,7 +25,11 @@ def __init__(self):
self._count = 0

def on_amqp_message(self, event: Event):
print("received message: " + str(event.message.body))
print(
"received message: {} ".format(
Converter.bytes_to_string(event.message.body)
)
)

# accepting
self.delivery_context.accept(event)
Expand All @@ -43,13 +48,11 @@ def on_amqp_message(self, event: Event):
# in case of rejection with annotations added
# self.delivery_context.discard_with_annotations(event)

print("count " + str(self._count))

self._count = self._count + 1
print("count " + str(self._count))

if self._count == MESSAGES_TO_PUBLISH:
print("closing receiver")
# if you want you can add cleanup operations here
print("received all messages")

def on_connection_closed(self, event: Event):
# if you want you can add cleanup operations here
Expand Down Expand Up @@ -79,7 +82,6 @@ def create_connection(environment: Environment) -> Connection:


def main() -> None:

exchange_name = "test-exchange"
queue_name = "example-queue"
routing_key = "routing-key"
Expand Down Expand Up @@ -122,8 +124,9 @@ def main() -> None:

# publish 10 messages
for i in range(MESSAGES_TO_PUBLISH):
print("publishing")
status = publisher.publish(Message(body="test"))
status = publisher.publish(
Message(body=Converter.string_to_bytes("test message {} ".format(i)))
)
if status.remote_state == OutcomeState.ACCEPTED:
print("message accepted")
elif status.remote_state == OutcomeState.RELEASED:
Expand Down
21 changes: 11 additions & 10 deletions examples/oauth/oaut.py → examples/oauth/oAuth2.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
AddressHelper,
AMQPMessagingHandler,
Connection,
Converter,
Environment,
Event,
ExchangeSpecification,
Expand All @@ -30,7 +31,7 @@ def __init__(self):
self._count = 0

def on_amqp_message(self, event: Event):
print("received message: " + str(event.message.body))
print("received message: " + Converter.bytes_to_string(event.message.body))

# accepting
self.delivery_context.accept(event)
Expand Down Expand Up @@ -85,10 +86,9 @@ def create_connection(environment: Environment) -> Connection:


def main() -> None:

exchange_name = "test-exchange"
queue_name = "example-queue"
routing_key = "routing-key"
exchange_name = "oAuth2-test-exchange"
queue_name = "oAuth2-example-queue"
routing_key = "oAuth2-routing-key"

print("connection to amqp server")
oaut_token = token(
Expand Down Expand Up @@ -144,14 +144,15 @@ def main() -> None:

# publish 10 messages
for i in range(MESSAGES_TO_PUBLISH):
print("publishing")
status = publisher.publish(Message(body="test"))
status = publisher.publish(
Message(body=Converter.string_to_bytes("test_{}".format(i)))
)
if status.remote_state == OutcomeState.ACCEPTED:
print("message accepted")
print("message: test_{} accepted".format(i))
elif status.remote_state == OutcomeState.RELEASED:
print("message not routed")
print("message: test_{} not routed".format(i))
elif status.remote_state == OutcomeState.REJECTED:
print("message not rejected")
print("message: test_{} rejected".format(i))

publisher.close()

Expand Down
9 changes: 5 additions & 4 deletions examples/reconnection/reconnection_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
AMQPMessagingHandler,
Connection,
ConnectionClosed,
Converter,
Environment,
Event,
ExchangeSpecification,
Expand All @@ -15,7 +16,6 @@
# here we keep track of the objects we need to reconnect
MESSAGES_TO_PUBLISH = 50000


environment = Environment(
uri="amqp://guest:guest@localhost:5672/",
)
Expand All @@ -29,7 +29,9 @@ def __init__(self):

def on_message(self, event: Event):
if self._count % 1000 == 0:
print("received 100 message: " + str(event.message.body))
print(
"received 100 message: " + Converter.bytes_to_string(event.message.body)
)

# accepting
self.delivery_context.accept(event)
Expand Down Expand Up @@ -79,7 +81,6 @@ def create_connection() -> Connection:


def main() -> None:

exchange_name = "test-exchange"
queue_name = "example-queue"
routing_key = "routing-key"
Expand Down Expand Up @@ -128,7 +129,7 @@ def main() -> None:
print("published 1000 messages...")
try:
if publisher is not None:
publisher.publish(Message(body="test"))
publisher.publish(Message(body=Converter.string_to_bytes("test")))
except ConnectionClosed:
print("publisher closing exception, resubmitting")
# publisher = connection.publisher(addr)
Expand Down
10 changes: 6 additions & 4 deletions examples/streams/example_with_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
AMQPMessagingHandler,
Connection,
ConnectionClosed,
Converter,
Environment,
Event,
Message,
Expand All @@ -26,7 +27,7 @@ def on_amqp_message(self, event: Event):
# just messages with banana filters get received
print(
"received message from stream: "
+ str(event.message.body)
+ Converter.bytes_to_string(event.message.body)
+ " with offset: "
+ str(event.message.annotations["x-stream-offset"])
)
Expand Down Expand Up @@ -84,7 +85,7 @@ def create_connection(environment: Environment) -> Connection:


def main() -> None:
queue_name = "example-queue"
queue_name = "stream-example-queue"

print("connection to amqp server")
environment = Environment("amqp://guest:guest@localhost:5672/")
Expand Down Expand Up @@ -118,15 +119,16 @@ def main() -> None:
for i in range(MESSAGES_TO_PUBLISH):
publisher.publish(
Message(
body="apple: " + str(i), annotations={"x-stream-filter-value": "apple"}
Converter.string_to_bytes(body="apple: " + str(i)),
annotations={"x-stream-filter-value": "apple"},
)
)

# publish with a filter of banana
for i in range(MESSAGES_TO_PUBLISH):
publisher.publish(
Message(
body="banana: " + str(i),
body=Converter.string_to_bytes("banana: " + str(i)),
annotations={"x-stream-filter-value": "banana"},
)
)
Expand Down
22 changes: 13 additions & 9 deletions examples/tls/tls_example.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
# type: ignore
import os
import sys
from traceback import print_exception

from rabbitmq_amqp_python_client import (
AddressHelper,
AMQPMessagingHandler,
Connection,
Converter,
CurrentUserStore,
Environment,
Event,
Expand Down Expand Up @@ -34,7 +36,7 @@ def __init__(self):
self._count = 0

def on_message(self, event: Event):
print("received message: " + str(event.message.body))
print("received message: " + Converter.bytes_to_string(event.message.body))

# accepting
self.delivery_context.accept(event)
Expand Down Expand Up @@ -79,15 +81,14 @@ def create_connection(environment: Environment) -> Connection:


def main() -> None:

exchange_name = "test-exchange"
queue_name = "example-queue"
routing_key = "routing-key"
exchange_name = "tls-test-exchange"
queue_name = "tls-example-queue"
routing_key = "tls-routing-key"
ca_p12_store = ".ci/certs/ca.p12"
ca_cert_file = ".ci/certs/ca_certificate.pem"
client_cert = ".ci/certs/client_certificate.pem"
client_key = ".ci/certs/client_key.pem"
client_p12_store = ".ci/certs/client.p12"
client_cert = ".ci/certs/client_localhost_certificate.pem"
client_key = ".ci/certs/client_localhost_key.pem"
client_p12_store = ".ci/certs/client_localhost.p12"
uri = "amqps://guest:guest@localhost:5671/"

if sys.platform == "win32":
Expand Down Expand Up @@ -138,6 +139,9 @@ def main() -> None:
"connection failed. working directory should be project root"
)
else:
print(" ca_cert_file exists: {}".format(os.path.isfile(ca_cert_file)))
print(" client_cert exists: {}".format(os.path.isfile(client_cert)))
print(" client_key exists: {}".format(os.path.isfile(client_key)))
environment = Environment(
uri,
ssl_context=PosixSslConfigurationContext(
Expand Down Expand Up @@ -187,7 +191,7 @@ def main() -> None:

# publish 10 messages
for i in range(messages_to_publish):
status = publisher.publish(Message(body="test"))
status = publisher.publish(Message(body=Converter.string_to_bytes("test")))
if status.ACCEPTED:
print("message accepted")
elif status.RELEASED:
Expand Down
Loading