Skip to content

Fix kombu crash when using sentry & setting parsing #1359

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Apr 24, 2025
14 changes: 8 additions & 6 deletions newrelic/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,10 @@ def _process_configuration(section):
_process_setting(section, "azure_operator.enabled", "getboolean", None)
_process_setting(section, "package_reporting.enabled", "getboolean", None)
_process_setting(section, "instrumentation.graphql.capture_introspection_queries", "getboolean", None)
_process_setting(
section, "instrumentation.kombu.ignored_exchanges", "get", newrelic.core.config.parse_space_separated_into_list
)
_process_setting(section, "instrumentation.kombu.consumer.enabled", "getboolean", None)


# Loading of configuration from specified file and for specified
Expand Down Expand Up @@ -2847,12 +2851,10 @@ def _process_module_builtin_defaults():
_process_module_definition(
"kafka.coordinator.heartbeat", "newrelic.hooks.messagebroker_kafkapython", "instrument_kafka_heartbeat"
)
# Kombu instrumentation is causing crashes so until we figure out the root cause
# comment it out.
# _process_module_definition("kombu.messaging", "newrelic.hooks.messagebroker_kombu", "instrument_kombu_messaging")
# _process_module_definition(
# "kombu.serialization", "newrelic.hooks.messagebroker_kombu", "instrument_kombu_serializaion"
# )
_process_module_definition("kombu.messaging", "newrelic.hooks.messagebroker_kombu", "instrument_kombu_messaging")
_process_module_definition(
"kombu.serialization", "newrelic.hooks.messagebroker_kombu", "instrument_kombu_serializaion"
)
_process_module_definition("logging", "newrelic.hooks.logger_logging", "instrument_logging")

_process_module_definition("loguru", "newrelic.hooks.logger_loguru", "instrument_loguru")
Expand Down
54 changes: 53 additions & 1 deletion newrelic/hooks/messagebroker_kombu.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,65 @@
}


def bind_publish(
body,
routing_key=None,
delivery_mode=None,
mandatory=False,
immediate=False,
priority=0,
content_type=None,
content_encoding=None,
serializer=None,
headers=None,
compression=None,
exchange=None,
retry=False,
retry_policy=None,
declare=None,
expiration=None,
timeout=None,
confirm_timeout=None,
**properties,
):
return {
"body": body,
"routing_key": routing_key,
"delivery_mode": delivery_mode,
"mandatory": mandatory,
"immediate": immediate,
"priority": priority,
"content_type": content_type,
"content_encoding": content_encoding,
"serializer": serializer,
"headers": headers,
"compression": compression,
"exchange": exchange,
"retry": retry,
"retry_policy": retry_policy,
"declare": declare,
"expiration": expiration,
"timeout": timeout,
"confirm_timeout": confirm_timeout,
"properties": properties,
}


def wrap_Producer_publish(wrapped, instance, args, kwargs):
transaction = current_transaction()

if transaction is None:
return wrapped(*args, **kwargs)

bound_args = bind_args(wrapped, args, kwargs)
try:
bound_args = bind_publish(*args, **kwargs)
except Exception:
_logger.debug(
"Unable to bind arguments for kombu.messaging.Producer.publish. Report this issue to New Relic support.",
record_exception=True,
)
return wrapped(*args, **kwargs)

headers = bound_args["headers"]
headers = headers if headers else {}
value = bound_args["body"]
Expand Down
11 changes: 11 additions & 0 deletions tests/messagebroker_kombu/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,14 @@ def test():
producer.publish({"foo": object()}, exchange=exchange, routing_key="bar", declare=[queue])

test()


def test_producer_tries_to_parse_args(exchange, producer, queue, monkeypatch):
@background_task()
def test():
with pytest.raises(TypeError):
producer.publish(
{"foo": object()}, body={"foo": object()}, exchange=exchange, routing_key="bar", declare=[queue]
)

test()
5 changes: 3 additions & 2 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ envlist =
python-template_jinja2-py37-jinja2030103,
python-template_mako-{py37,py38,py39,py310,py311,py312,py313},
rabbitmq-messagebroker_pika-{py37,py38,py39,py310,py311,py312,py313,pypy310}-pikalatest,
;; Comment out Kombu until we can root cause the crash.
; rabbitmq-messagebroker_kombu-{py38,py39,py310,py311,py312,py313,pypy310}-kombulatest,
rabbitmq-messagebroker_kombu-{py38,py39,py310,py311,py312,py313,pypy310}-kombulatest,
rabbitmq-messagebroker_kombu-{py38,py39,py310,pypy310}-kombu050204,
redis-datastore_redis-{py37,py311,pypy310}-redis04,
redis-datastore_redis-{py37,py38,py39,py310,py311,py312,py313,pypy310}-redislatest,
rediscluster-datastore_rediscluster-{py37,py312,py313,pypy310}-redislatest,
Expand Down Expand Up @@ -418,6 +418,7 @@ deps =
messagebroker_confluentkafka-confluentkafka0106: confluent-kafka<1.7
messagebroker_kafkapython-kafkapythonnglatest: kafka-python-ng
messagebroker_kombu-kombulatest: kombu
messagebroker_kombu-kombu050204: kombu<5.3.0
# TODO: Pinned to 2.0 for now, fix tests later
messagebroker_kafkapython-kafkapythonlatest: kafka-python<2.1
template_genshi-genshilatest: genshi
Expand Down
Loading