Skip to content

Commit 964cfbc

Browse files
authored
Multiple celery queues (#7226)
* Separate Celery queue for malware tasks * remove malware queue and routes for now
1 parent 654c873 commit 964cfbc

File tree

4 files changed

+38
-23
lines changed

4 files changed

+38
-23
lines changed

Procfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
release: bin/release
22
web: bin/start-web python -m gunicorn.app.wsgiapp -c gunicorn.conf.py warehouse.wsgi:application
33
web-uploads: bin/start-web python -m gunicorn.app.wsgiapp -c gunicorn-uploads.conf.py warehouse.wsgi:application
4-
worker: bin/start-worker celery -A warehouse worker -l info --max-tasks-per-child 32
4+
worker: bin/start-worker celery -A warehouse worker -Q default -l info --max-tasks-per-child 32
55
worker-beat: bin/start-worker celery -A warehouse beat -S redbeat.RedBeatScheduler -l info

dev/environment

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ WAREHOUSE_TOKEN=insecuretoken
77

88
AWS_ACCESS_KEY_ID=foo
99
AWS_SECRET_ACCESS_KEY=foo
10-
BROKER_URL=sqs://localstack:4576/warehouse-dev?region=us-east-1
10+
BROKER_URL=sqs://localstack:4576/?region=us-east-1&queue_name_prefix=warehouse-dev
1111

1212
DATABASE_URL=postgresql://postgres@db/warehouse
1313

tests/unit/test_tasks.py

+27-17
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import transaction
1818

1919
from celery import Celery, Task
20+
from kombu import Queue
2021
from pyramid import scripting
2122
from pyramid_retry import RetryableException
2223

@@ -372,63 +373,69 @@ def test_make_celery_app():
372373

373374

374375
@pytest.mark.parametrize(
375-
("env", "ssl", "broker_url", "expected_url", "queue_name", "transport_options"),
376+
("env", "ssl", "broker_url", "expected_url", "transport_options"),
376377
[
377378
(
378379
Environment.development,
379380
False,
380381
"amqp://guest@rabbitmq:5672//",
381382
"amqp://guest@rabbitmq:5672//",
382-
"celery",
383383
{},
384384
),
385385
(
386386
Environment.production,
387387
True,
388388
"amqp://guest@rabbitmq:5672//",
389389
"amqp://guest@rabbitmq:5672//",
390-
"celery",
391390
{},
392391
),
393-
(Environment.development, False, "sqs://", "sqs://", "celery", {}),
394-
(Environment.production, True, "sqs://", "sqs://", "celery", {}),
395-
(Environment.development, False, "sqs:///my-queue", "sqs://", "my-queue", {}),
396-
(Environment.production, True, "sqs:///my-queue", "sqs://", "my-queue", {}),
392+
(Environment.development, False, "sqs://", "sqs://", {}),
393+
(Environment.production, True, "sqs://", "sqs://", {}),
394+
(
395+
Environment.development,
396+
False,
397+
"sqs://?queue_name_prefix=warehouse",
398+
"sqs://",
399+
{"queue_name_prefix": "warehouse-"},
400+
),
401+
(
402+
Environment.production,
403+
True,
404+
"sqs://?queue_name_prefix=warehouse",
405+
"sqs://",
406+
{"queue_name_prefix": "warehouse-"},
407+
),
397408
(
398409
Environment.development,
399410
False,
400411
"sqs://?region=us-east-2",
401412
"sqs://",
402-
"celery",
403413
{"region": "us-east-2"},
404414
),
405415
(
406416
Environment.production,
407417
True,
408418
"sqs://?region=us-east-2",
409419
"sqs://",
410-
"celery",
411420
{"region": "us-east-2"},
412421
),
413422
(
414423
Environment.development,
415424
False,
416-
"sqs:///my-queue?region=us-east-2",
425+
"sqs:///?region=us-east-2&queue_name_prefix=warehouse",
417426
"sqs://",
418-
"my-queue",
419-
{"region": "us-east-2"},
427+
{"region": "us-east-2", "queue_name_prefix": "warehouse-"},
420428
),
421429
(
422430
Environment.production,
423431
True,
424-
"sqs:///my-queue?region=us-east-2",
432+
"sqs:///?region=us-east-2&queue_name_prefix=warehouse",
425433
"sqs://",
426-
"my-queue",
427-
{"region": "us-east-2"},
434+
{"region": "us-east-2", "queue_name_prefix": "warehouse-"},
428435
),
429436
],
430437
)
431-
def test_includeme(env, ssl, broker_url, expected_url, queue_name, transport_options):
438+
def test_includeme(env, ssl, broker_url, expected_url, transport_options):
432439
registry_dict = {}
433440
config = pretend.stub(
434441
action=pretend.call_recorder(lambda *a, **kw: None),
@@ -456,10 +463,13 @@ def test_includeme(env, ssl, broker_url, expected_url, queue_name, transport_opt
456463
"broker_url": expected_url,
457464
"broker_use_ssl": ssl,
458465
"worker_disable_rate_limits": True,
459-
"task_default_queue": queue_name,
466+
"task_default_queue": "default",
467+
"task_default_routing_key": "task.default",
460468
"task_serializer": "json",
461469
"accept_content": ["json", "msgpack"],
462470
"task_queue_ha_policy": "all",
471+
"task_queues": (Queue("default", routing_key="task.#"),),
472+
"task_routes": ([]),
463473
"REDBEAT_REDIS_URL": (config.registry.settings["celery.scheduler_url"]),
464474
}.items():
465475
assert app.conf[key] == value

warehouse/tasks.py

+9-4
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import transaction
2222
import venusian
2323

24+
from kombu import Queue
2425
from pyramid.threadlocal import get_current_request
2526

2627
from warehouse.config import Environment
@@ -158,7 +159,6 @@ def add_task():
158159
def includeme(config):
159160
s = config.registry.settings
160161

161-
queue_name = "celery"
162162
broker_transport_options = {}
163163

164164
broker_url = s["celery.broker_url"]
@@ -169,8 +169,10 @@ def includeme(config):
169169
# so we'll just remove them from here.
170170
broker_url = urllib.parse.urlunparse(parsed_url[:2] + ("", "", "", ""))
171171

172-
if parsed_url.path:
173-
queue_name = parsed_url.path[1:]
172+
if "queue_name_prefix" in parsed_query:
173+
broker_transport_options["queue_name_prefix"] = (
174+
parsed_query["queue_name_prefix"][0] + "-"
175+
)
174176

175177
if "region" in parsed_query:
176178
broker_transport_options["region"] = parsed_query["region"][0]
@@ -183,8 +185,11 @@ def includeme(config):
183185
broker_url=broker_url,
184186
broker_use_ssl=s["warehouse.env"] == Environment.production,
185187
broker_transport_options=broker_transport_options,
186-
task_default_queue=queue_name,
188+
task_default_queue="default",
189+
task_default_routing_key="task.default",
187190
task_queue_ha_policy="all",
191+
task_queues=(Queue("default", routing_key="task.#"),),
192+
task_routes=([]),
188193
task_serializer="json",
189194
worker_disable_rate_limits=True,
190195
REDBEAT_REDIS_URL=s["celery.scheduler_url"],

0 commit comments

Comments
 (0)