Skip to content

Commit 3bad36e

Browse files
prakhar1144timabbott
authored andcommitted
queue: Rename queue_json_publish to queue_json_publish_rollback_unsafe.
This commit renames the 'queue_json_publish' function to 'queue_json_publish_rollback_unsafe' to reflect the fact that it doesn't wait for the db transaction (within which it gets called, if any) to commit and sends event irrespective of commit or rollback. In most of the cases we don't want to send event in the case of rollbacks, so the caller should be aware that calling the function directly is rollback unsafe. Fixes part of zulip#30489.
1 parent 1847086 commit 3bad36e

24 files changed

+63
-53
lines changed

docs/subsystems/queuing.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,13 +57,13 @@ manually.
5757
### Publishing events into a queue
5858

5959
You can publish events to a RabbitMQ queue using the
60-
`queue_json_publish` function defined in `zerver/lib/queue.py`.
60+
`queue_event_on_commit` function defined in `zerver/lib/queue.py`.
6161

6262
An interesting challenge with queue processors is what should happen
6363
when queued events in Zulip's backend tests. Our current solution is
64-
that in the tests, `queue_json_publish` will (by default) simple call
64+
that in the tests, `queue_event_on_commit` will (by default) simple call
6565
the `consume` method for the relevant queue processor. However,
66-
`queue_json_publish` also supports being passed a function that should
66+
`queue_event_on_commit` also supports being passed a function that should
6767
be called in the tests instead of the queue processor's `consume`
6868
method. Where possible, we prefer the model of calling `consume` in
6969
tests since that's more predictable and automatically covers the queue

zerver/actions/user_activity.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from datetime import datetime
22

3-
from zerver.lib.queue import queue_json_publish
3+
from zerver.lib.queue import queue_json_publish_rollback_unsafe
44
from zerver.lib.timestamp import datetime_to_timestamp
55
from zerver.models import UserActivityInterval, UserProfile
66

@@ -31,4 +31,4 @@ def do_update_user_activity_interval(user_profile: UserProfile, log_time: dateti
3131

3232
def update_user_activity_interval(user_profile: UserProfile, log_time: datetime) -> None:
3333
event = {"user_profile_id": user_profile.id, "time": datetime_to_timestamp(log_time)}
34-
queue_json_publish("user_activity_interval", event)
34+
queue_json_publish_rollback_unsafe("user_activity_interval", event)

zerver/decorator.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
UserDeactivatedError,
4444
WebhookError,
4545
)
46-
from zerver.lib.queue import queue_json_publish
46+
from zerver.lib.queue import queue_json_publish_rollback_unsafe
4747
from zerver.lib.rate_limiter import is_local_addr, rate_limit_request_by_ip, rate_limit_user
4848
from zerver.lib.request import RequestNotes
4949
from zerver.lib.response import json_method_not_allowed
@@ -91,7 +91,7 @@ def update_user_activity(
9191
"time": datetime_to_timestamp(timezone_now()),
9292
"client_id": request_notes.client.id,
9393
}
94-
queue_json_publish("user_activity", event, lambda event: None)
94+
queue_json_publish_rollback_unsafe("user_activity", event, lambda event: None)
9595

9696

9797
# Based on django.views.decorators.http.require_http_methods

zerver/lib/digest.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from zerver.lib.email_notifications import build_message_list
1818
from zerver.lib.logging_util import log_to_file
1919
from zerver.lib.message import get_last_message_id
20-
from zerver.lib.queue import queue_json_publish
20+
from zerver.lib.queue import queue_json_publish_rollback_unsafe
2121
from zerver.lib.send_email import FromAddress, send_future_email
2222
from zerver.lib.url_encoding import stream_narrow_url
2323
from zerver.models import (
@@ -92,7 +92,7 @@ def teaser_data(self, user: UserProfile, stream_id_map: dict[int, Stream]) -> di
9292
def queue_digest_user_ids(user_ids: list[int], cutoff: datetime) -> None:
9393
# Convert cutoff to epoch seconds for transit.
9494
event = {"user_ids": user_ids, "cutoff": cutoff.strftime("%s")}
95-
queue_json_publish("digest_emails", event)
95+
queue_json_publish_rollback_unsafe("digest_emails", event)
9696

9797

9898
def enqueue_emails(cutoff: datetime) -> None:

zerver/lib/email_mirror.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
from zerver.lib.email_notifications import convert_html_to_markdown
2626
from zerver.lib.exceptions import JsonableError, RateLimitedError
2727
from zerver.lib.message import normalize_body, truncate_content, truncate_topic
28-
from zerver.lib.queue import queue_json_publish
28+
from zerver.lib.queue import queue_json_publish_rollback_unsafe
2929
from zerver.lib.rate_limiter import RateLimitedObject
3030
from zerver.lib.send_email import FromAddress
3131
from zerver.lib.streams import access_stream_for_send_message
@@ -540,7 +540,7 @@ def mirror_email_message(rcpt_to: str, msg_base64: str) -> dict[str, str]:
540540
"msg": f"5.1.1 Bad destination mailbox address: {e}",
541541
}
542542

543-
queue_json_publish(
543+
queue_json_publish_rollback_unsafe(
544544
"email_mirror",
545545
{
546546
"rcpt_to": rcpt_to,

zerver/lib/queue.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -434,7 +434,11 @@ def set_queue_client(queue_client: SimpleQueueClient | TornadoQueueClient) -> No
434434
thread_data.queue_client = queue_client
435435

436436

437-
def queue_json_publish(
437+
# One should generally use `queue_event_on_commit` unless there's a strong
438+
# reason to use `queue_json_publish_rollback_unsafe` directly, as it doesn't
439+
# wait for the db transaction (within which it gets called, if any) to commit
440+
# and sends event irrespective of commit or rollback.
441+
def queue_json_publish_rollback_unsafe(
438442
queue_name: str,
439443
event: dict[str, Any],
440444
processor: Callable[[Any], None] | None = None,
@@ -452,7 +456,7 @@ def queue_json_publish(
452456

453457

454458
def queue_event_on_commit(queue_name: str, event: dict[str, Any]) -> None:
455-
transaction.on_commit(lambda: queue_json_publish(queue_name, event))
459+
transaction.on_commit(lambda: queue_json_publish_rollback_unsafe(queue_name, event))
456460

457461

458462
def retry_event(
@@ -464,4 +468,4 @@ def retry_event(
464468
if event["failed_tries"] > MAX_REQUEST_RETRIES:
465469
failure_processor(event)
466470
else:
467-
queue_json_publish(queue_name, event)
471+
queue_json_publish_rollback_unsafe(queue_name, event)

zerver/management/commands/enqueue_file.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from typing_extensions import override
77

88
from zerver.lib.management import ZulipBaseCommand
9-
from zerver.lib.queue import queue_json_publish
9+
from zerver.lib.queue import queue_json_publish_rollback_unsafe
1010

1111

1212
def error(*args: Any) -> None:
@@ -28,7 +28,7 @@ def enqueue_file(queue_name: str, f: IO[str]) -> None:
2828

2929
# This is designed to use the `error` method rather than
3030
# the call_consume_in_tests flow.
31-
queue_json_publish(queue_name, data, error)
31+
queue_json_publish_rollback_unsafe(queue_name, data, error)
3232

3333

3434
class Command(ZulipBaseCommand):

zerver/migrations/0209_user_profile_no_empty_password.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from django.utils.timezone import now as timezone_now
1212

1313
from zerver.lib.cache import cache_delete, user_profile_by_api_key_cache_key
14-
from zerver.lib.queue import queue_json_publish
14+
from zerver.lib.queue import queue_json_publish_rollback_unsafe
1515
from zerver.lib.utils import generate_api_key
1616

1717

@@ -223,7 +223,7 @@ def reset_user_api_key(user_profile: Any) -> None:
223223
# we can just write to the queue processor that handles sending
224224
# those notices to the push notifications bouncer service.
225225
event = {"type": "clear_push_device_tokens", "user_profile_id": user_profile.id}
226-
queue_json_publish("deferred_work", event)
226+
queue_json_publish_rollback_unsafe("deferred_work", event)
227227

228228

229229
class Migration(migrations.Migration):

zerver/migrations/0387_reupload_realmemoji_again.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from django.db.backends.base.schema import BaseDatabaseSchemaEditor
44
from django.db.migrations.state import StateApps
55

6-
from zerver.lib.queue import queue_json_publish
6+
from zerver.lib.queue import queue_json_publish_rollback_unsafe
77

88

99
def reupload_realm_emoji(apps: StateApps, schema_editor: BaseDatabaseSchemaEditor) -> None:
@@ -34,7 +34,7 @@ def reupload_realm_emoji(apps: StateApps, schema_editor: BaseDatabaseSchemaEdito
3434
"type": "reupload_realm_emoji",
3535
"realm_id": realm_id,
3636
}
37-
queue_json_publish("deferred_work", event)
37+
queue_json_publish_rollback_unsafe("deferred_work", event)
3838

3939

4040
class Migration(migrations.Migration):

zerver/signals.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from django.utils.translation import gettext as _
1010

1111
from confirmation.models import one_click_unsubscribe_link
12-
from zerver.lib.queue import queue_json_publish
12+
from zerver.lib.queue import queue_json_publish_rollback_unsafe
1313
from zerver.lib.send_email import FromAddress
1414
from zerver.lib.timezone import canonicalize_timezone
1515
from zerver.models import UserProfile
@@ -110,7 +110,7 @@ def email_on_new_login(sender: Any, user: UserProfile, request: Any, **kwargs: A
110110
"from_address": FromAddress.NOREPLY,
111111
"context": context,
112112
}
113-
queue_json_publish("email_senders", email_dict)
113+
queue_json_publish_rollback_unsafe("email_senders", email_dict)
114114

115115

116116
@receiver(user_logged_out)

zerver/tests/test_email_mirror.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1577,7 +1577,7 @@ def check_queue_json_publish(
15771577
"secret": settings.SHARED_SECRET,
15781578
}
15791579

1580-
with mock_queue_publish("zerver.lib.email_mirror.queue_json_publish") as m:
1580+
with mock_queue_publish("zerver.lib.email_mirror.queue_json_publish_rollback_unsafe") as m:
15811581
m.side_effect = check_queue_json_publish
15821582
return self.client_post("/api/internal/email_mirror_message", post_data)
15831583

zerver/tests/test_event_queue.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,13 @@ def test_maybe_enqueue_notifications(self) -> None:
4141
)
4242

4343
with mock_queue_publish(
44-
"zerver.tornado.event_queue.queue_json_publish"
44+
"zerver.tornado.event_queue.queue_json_publish_rollback_unsafe"
4545
) as mock_queue_json_publish:
4646
notified = maybe_enqueue_notifications(**params)
4747
mock_queue_json_publish.assert_not_called()
4848

4949
with mock_queue_publish(
50-
"zerver.tornado.event_queue.queue_json_publish"
50+
"zerver.tornado.event_queue.queue_json_publish_rollback_unsafe"
5151
) as mock_queue_json_publish:
5252
params["user_notifications_data"] = self.create_user_notifications_data_object(
5353
user_id=1, dm_push_notify=True, dm_email_notify=True
@@ -63,7 +63,7 @@ def test_maybe_enqueue_notifications(self) -> None:
6363
self.assertTrue(notified["push_notified"])
6464

6565
with mock_queue_publish(
66-
"zerver.tornado.event_queue.queue_json_publish"
66+
"zerver.tornado.event_queue.queue_json_publish_rollback_unsafe"
6767
) as mock_queue_json_publish:
6868
params = self.get_maybe_enqueue_notifications_parameters(
6969
message_id=1,

zerver/tests/test_link_embed.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from zerver.actions.message_delete import do_delete_messages
1515
from zerver.lib.cache import cache_delete, cache_get, preview_url_cache_key
1616
from zerver.lib.camo import get_camo_url
17-
from zerver.lib.queue import queue_json_publish
17+
from zerver.lib.queue import queue_json_publish_rollback_unsafe
1818
from zerver.lib.test_classes import ZulipTestCase
1919
from zerver.lib.test_helpers import mock_queue_publish
2020
from zerver.lib.url_preview.oembed import get_oembed_data, strip_cdata
@@ -457,9 +457,9 @@ def wrapped_queue_event_on_commit(*args: Any, **kwargs: Any) -> None:
457457
self.assertTrue(responses.assert_call_count(edited_url, 0))
458458

459459
with self.settings(TEST_SUITE=False), self.assertLogs(level="INFO") as info_logs:
460-
# Now proceed with the original queue_json_publish and call the
461-
# up-to-date event for edited_url.
462-
queue_json_publish(*args, **kwargs)
460+
# Now proceed with the original queue_json_publish_rollback_unsafe
461+
# and call the up-to-date event for edited_url.
462+
queue_json_publish_rollback_unsafe(*args, **kwargs)
463463
msg = Message.objects.select_related("sender").get(id=msg_id)
464464
assert msg.rendered_content is not None
465465
self.assertIn(

zerver/tests/test_message_delete.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -500,7 +500,7 @@ def test_delete_event_sent_after_transaction_commits(self) -> None:
500500

501501
with (
502502
self.capture_send_event_calls(expected_num_events=1),
503-
mock.patch("zerver.tornado.django_api.queue_json_publish") as m,
503+
mock.patch("zerver.tornado.django_api.queue_json_publish_rollback_unsafe") as m,
504504
):
505505
m.side_effect = AssertionError(
506506
"Events should be sent only after the transaction commits."

zerver/tests/test_message_edit_notifications.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,8 @@ def fake_publish(queue_name: str, event: Mapping[str, Any] | str, *args: Any) ->
133133
)
134134

135135
with mock_queue_publish(
136-
"zerver.tornado.event_queue.queue_json_publish", side_effect=fake_publish
136+
"zerver.tornado.event_queue.queue_json_publish_rollback_unsafe",
137+
side_effect=fake_publish,
137138
) as m:
138139
maybe_enqueue_notifications(**enqueue_kwargs)
139140

zerver/tests/test_queue.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
SimpleQueueClient,
1111
TornadoQueueClient,
1212
get_queue_client,
13-
queue_json_publish,
13+
queue_json_publish_rollback_unsafe,
1414
)
1515
from zerver.lib.test_classes import ZulipTestCase
1616

@@ -43,7 +43,7 @@ def collect(events: list[dict[str, Any]]) -> None:
4343
output.append(events[0])
4444
queue_client.stop_consuming()
4545

46-
queue_json_publish("test_suite", {"event": "my_event"})
46+
queue_json_publish_rollback_unsafe("test_suite", {"event": "my_event"})
4747

4848
queue_client.start_json_consumer("test_suite", collect)
4949

@@ -67,7 +67,7 @@ def collect(events: list[dict[str, Any]]) -> None:
6767
raise Exception("Make me nack!")
6868
output.append(events[0])
6969

70-
queue_json_publish("test_suite", {"event": "my_event"})
70+
queue_json_publish_rollback_unsafe("test_suite", {"event": "my_event"})
7171

7272
try:
7373
queue_client.start_json_consumer("test_suite", collect)
@@ -97,7 +97,7 @@ def throw_connection_error_once(self_obj: Any, *args: Any, **kwargs: Any) -> Non
9797
mock.patch("zerver.lib.queue.SimpleQueueClient.publish", throw_connection_error_once),
9898
self.assertLogs("zulip.queue", level="WARN") as warn_logs,
9999
):
100-
queue_json_publish("test_suite", {"event": "my_event"})
100+
queue_json_publish_rollback_unsafe("test_suite", {"event": "my_event"})
101101
self.assertEqual(
102102
warn_logs.output,
103103
["WARNING:zulip.queue:Failed to send to rabbitmq, trying to reconnect and send again"],

zerver/tests/test_queue_worker.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -552,7 +552,8 @@ def generate_remove_notification() -> dict[str, Any]:
552552

553553
with (
554554
mock_queue_publish(
555-
"zerver.lib.queue.queue_json_publish", side_effect=fake_publish
555+
"zerver.lib.queue.queue_json_publish_rollback_unsafe",
556+
side_effect=fake_publish,
556557
),
557558
self.assertLogs(
558559
"zerver.worker.missedmessage_mobile_notifications", "WARNING"
@@ -694,7 +695,9 @@ def fake_publish(
694695
worker.setup()
695696
with (
696697
patch("zerver.lib.send_email.build_email", side_effect=EmailNotDeliveredError),
697-
mock_queue_publish("zerver.lib.queue.queue_json_publish", side_effect=fake_publish),
698+
mock_queue_publish(
699+
"zerver.lib.queue.queue_json_publish_rollback_unsafe", side_effect=fake_publish
700+
),
698701
self.assertLogs(level="ERROR") as m,
699702
):
700703
worker.start()

zerver/tests/test_reactions.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1056,7 +1056,7 @@ def test_add_event(self) -> None:
10561056
}
10571057
with (
10581058
self.capture_send_event_calls(expected_num_events=1) as events,
1059-
mock.patch("zerver.tornado.django_api.queue_json_publish") as m,
1059+
mock.patch("zerver.tornado.django_api.queue_json_publish_rollback_unsafe") as m,
10601060
):
10611061
m.side_effect = AssertionError(
10621062
"Events should be sent only after the transaction commits!"
@@ -1141,7 +1141,7 @@ def test_events_sent_after_transaction_commits(self) -> None:
11411141

11421142
with (
11431143
self.capture_send_event_calls(expected_num_events=1),
1144-
mock.patch("zerver.tornado.django_api.queue_json_publish") as m,
1144+
mock.patch("zerver.tornado.django_api.queue_json_publish_rollback_unsafe") as m,
11451145
):
11461146
m.side_effect = AssertionError(
11471147
"Events should be sent only after the transaction commits."

zerver/tests/test_realm_export.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from analytics.models import RealmCount
1010
from zerver.actions.user_settings import do_change_user_setting
1111
from zerver.lib.exceptions import JsonableError
12-
from zerver.lib.queue import queue_json_publish
12+
from zerver.lib.queue import queue_json_publish_rollback_unsafe
1313
from zerver.lib.test_classes import ZulipTestCase
1414
from zerver.lib.test_helpers import (
1515
HostRequestMock,
@@ -250,7 +250,7 @@ def test_export_failure(self) -> None:
250250
patch("zerver.lib.export.do_export_realm") as mock_export,
251251
self.assertLogs(level="INFO") as info_logs,
252252
):
253-
queue_json_publish(
253+
queue_json_publish_rollback_unsafe(
254254
"deferred_work",
255255
{
256256
"type": "realm_export",

zerver/tests/test_submessage.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ def test_submessage_event_sent_after_transaction_commits(self) -> None:
196196

197197
with (
198198
self.capture_send_event_calls(expected_num_events=1),
199-
mock.patch("zerver.tornado.django_api.queue_json_publish") as m,
199+
mock.patch("zerver.tornado.django_api.queue_json_publish_rollback_unsafe") as m,
200200
):
201201
m.side_effect = AssertionError(
202202
"Events should be sent only after the transaction commits."

zerver/tornado/django_api.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from urllib3.util import Retry
1515

1616
from zerver.lib.partial import partial
17-
from zerver.lib.queue import queue_json_publish
17+
from zerver.lib.queue import queue_json_publish_rollback_unsafe
1818
from zerver.models import Client, Realm, UserProfile
1919
from zerver.tornado.sharding import (
2020
get_realm_tornado_ports,
@@ -201,7 +201,7 @@ def send_event_rollback_unsafe(
201201
port_user_map[get_user_id_tornado_port(realm_ports, user_id)].append(user)
202202

203203
for port, port_users in port_user_map.items():
204-
queue_json_publish(
204+
queue_json_publish_rollback_unsafe(
205205
notify_tornado_queue_name(port),
206206
dict(event=event, users=port_users),
207207
partial(send_notification_http, port),

0 commit comments

Comments
 (0)