Skip to content

Commit 82e0204

Browse files
committed
don't send messages to the same queue or topic asynchronously and don't reuse service bus messages
1 parent bb557af commit 82e0204

File tree

1 file changed

+55
-54
lines changed

1 file changed

+55
-54
lines changed

tests/contrib/azure_servicebus/test_azure_servicebus_snapshot.py

Lines changed: 55 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -114,19 +114,6 @@ async def azure_servicebus_subscription_receiver_async(azure_servicebus_client_a
114114
yield subscription_receiver
115115

116116

117-
@pytest.fixture()
118-
def message_with_properties():
119-
return ServiceBusMessage(
120-
"test message with properties",
121-
application_properties=DEFAULT_APPLICATION_PROPERTIES,
122-
)
123-
124-
125-
@pytest.fixture()
126-
def message_without_properties():
127-
return ServiceBusMessage("test message without properties")
128-
129-
130117
@pytest.fixture()
131118
def trace_context_keys():
132119
return [
@@ -139,6 +126,40 @@ def trace_context_keys():
139126
]
140127

141128

129+
def make_messages():
130+
return [
131+
ServiceBusMessage("test message without properties"),
132+
ServiceBusMessage("test message without properties"),
133+
[
134+
ServiceBusMessage("test message without properties"),
135+
ServiceBusMessage(
136+
"test message with properties",
137+
application_properties=DEFAULT_APPLICATION_PROPERTIES,
138+
),
139+
],
140+
]
141+
142+
143+
async def send_messages_to_queue_async(queue_sender_async):
144+
for message in make_messages():
145+
await queue_sender_async.send_messages(message)
146+
147+
148+
async def send_messages_to_topic_async(topic_sender_async):
149+
for message in make_messages():
150+
await topic_sender_async.send_messages(message)
151+
152+
153+
async def schedule_messages_to_queue_async(queue_sender_async, schedule_time_utc):
154+
for message in make_messages():
155+
await queue_sender_async.schedule_messages(message, schedule_time_utc)
156+
157+
158+
async def schedule_messages_to_topic_async(topic_sender_async, schedule_time_utc):
159+
for message in make_messages():
160+
await topic_sender_async.schedule_messages(message, schedule_time_utc)
161+
162+
142163
def normalize_application_properties(
143164
application_properties: Optional[Dict[Union[str, bytes], Union[int, float, bytes, bool, str, uuid.UUID]]],
144165
):
@@ -188,24 +209,26 @@ def normalize_application_properties(
188209
return {{k.decode() if isinstance(k, bytes) else k: v for k, v in application_properties.items()}}
189210
190211
191-
message_with_properties = ServiceBusMessage(
192-
"test message with properties",
193-
application_properties={DEFAULT_APPLICATION_PROPERTIES},
194-
)
195-
message_without_properties = ServiceBusMessage("test message without properties")
212+
def make_messages():
213+
return [
214+
ServiceBusMessage("test message without properties"),
215+
ServiceBusMessage("test message without properties"),
216+
[
217+
ServiceBusMessage("test message without properties"),
218+
ServiceBusMessage(
219+
"test message with properties",
220+
application_properties={DEFAULT_APPLICATION_PROPERTIES},
221+
),
222+
],
223+
]
196224
197-
messages = [
198-
message_without_properties,
199-
message_without_properties,
200-
[message_without_properties, message_with_properties],
201-
]
202225
203226
with ServiceBusClient.from_connection_string(conn_str="{CONNECTION_STRING}") as servicebus_client:
204227
with servicebus_client.get_queue_sender(queue_name="{QUEUE_NAME}") as queue_sender:
205-
for message in messages:
228+
for message in make_messages():
206229
queue_sender.send_messages(message)
207230
with servicebus_client.get_topic_sender(topic_name="{TOPIC_NAME}") as topic_sender:
208-
for message in messages:
231+
for message in make_messages():
209232
topic_sender.send_messages(message)
210233
with servicebus_client.get_queue_receiver(
211234
queue_name="{QUEUE_NAME}", receive_mode=ServiceBusReceiveMode.RECEIVE_AND_DELETE
@@ -259,19 +282,11 @@ async def test_send_messages_async(
259282
azure_servicebus_queue_receiver_async: ServiceBusReceiverAsync,
260283
azure_servicebus_topic_sender_async: ServiceBusSenderAsync,
261284
azure_servicebus_subscription_receiver_async: ServiceBusReceiverAsync,
262-
message_without_properties: ServiceBusMessage,
263-
message_with_properties: ServiceBusMessage,
264285
trace_context_keys: List[str],
265286
):
266-
messages = [
267-
message_without_properties,
268-
message_without_properties,
269-
[message_without_properties, message_with_properties],
270-
]
271-
272287
await asyncio.gather(
273-
*[azure_servicebus_queue_sender_async.send_messages(message) for message in messages],
274-
*[azure_servicebus_topic_sender_async.send_messages(message) for message in messages],
288+
send_messages_to_queue_async(azure_servicebus_queue_sender_async),
289+
send_messages_to_topic_async(azure_servicebus_topic_sender_async),
275290
)
276291

277292
received_queue_messages, received_subscription_messages = await asyncio.gather(
@@ -301,20 +316,14 @@ def test_schedule_messages(
301316
azure_servicebus_queue_receiver: ServiceBusReceiver,
302317
azure_servicebus_topic_sender: ServiceBusSender,
303318
azure_servicebus_subscription_receiver: ServiceBusReceiver,
304-
message_without_properties: ServiceBusMessage,
305-
message_with_properties: ServiceBusMessage,
306319
trace_context_keys: List[str],
307320
):
308321
now = datetime.now(timezone.utc)
309322

310-
messages = [
311-
message_without_properties,
312-
message_without_properties,
313-
[message_without_properties, message_with_properties],
314-
]
315-
316-
for message in messages:
323+
for message in make_messages():
317324
azure_servicebus_queue_sender.schedule_messages(message, now)
325+
326+
for message in make_messages():
318327
azure_servicebus_topic_sender.schedule_messages(message, now)
319328

320329
received_queue_messages = azure_servicebus_queue_receiver.receive_messages(max_message_count=4, max_wait_time=5)
@@ -345,21 +354,13 @@ async def test_schedule_messages_async(
345354
azure_servicebus_queue_receiver_async: ServiceBusReceiverAsync,
346355
azure_servicebus_topic_sender_async: ServiceBusSenderAsync,
347356
azure_servicebus_subscription_receiver_async: ServiceBusReceiverAsync,
348-
message_without_properties: ServiceBusMessage,
349-
message_with_properties: ServiceBusMessage,
350357
trace_context_keys: List[str],
351358
):
352359
now = datetime.now(timezone.utc)
353360

354-
messages = [
355-
message_without_properties,
356-
message_without_properties,
357-
[message_without_properties, message_with_properties],
358-
]
359-
360361
await asyncio.gather(
361-
*[azure_servicebus_queue_sender_async.schedule_messages(message, now) for message in messages],
362-
*[azure_servicebus_topic_sender_async.schedule_messages(message, now) for message in messages],
362+
schedule_messages_to_queue_async(azure_servicebus_queue_sender_async, now),
363+
schedule_messages_to_topic_async(azure_servicebus_topic_sender_async, now),
363364
)
364365

365366
received_queue_messages, received_subscription_messages = await asyncio.gather(

0 commit comments

Comments
 (0)