Skip to content

Handling of null ctx in ProtobufDeserializer #1939

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
0x26res opened this issue Mar 12, 2025 · 2 comments · Fixed by #1974
Closed

Handling of null ctx in ProtobufDeserializer #1939

0x26res opened this issue Mar 12, 2025 · 2 comments · Fixed by #1974

Comments

@0x26res
Copy link

0x26res commented Mar 12, 2025

The ctx argument in ProtobufDeserializer is marked as Optional.

def __call__(self, data: bytes, ctx: Optional[SerializationContext] = None) -> Optional[Message]:

In previous version of the library, calling ProtobufDeserializer.__call__ with ctx=None worked. With the new version it fails with an AttributeError, around here

I think this was introduced by this recent change #1852

There is a work around, consisting of passing asubject.name.strategy that ignores the context:

ProtobufDeserializer(xxx,  {"subject.name.strategy": lambda *_: None})

I would argue that the code should be able to handle a null ctx, without having to change the config..

In particular in the context of a ProtobufDeserializer that doesn't have a schema registry set (and is using schema on read). Because in this case, the subject is not used, and therefore the ctx is not needed.

Steps to reproduce:

import dataclasses

import pytest
from confluent_kafka.schema_registry import RegisteredSchema, Schema
from confluent_kafka.schema_registry.protobuf import (
    ProtobufDeserializer,
    ProtobufSerializer,
)
from confluent_kafka.serialization import MessageField, SerializationContext
from google.protobuf.wrappers_pb2 import DoubleValue


@dataclasses.dataclass
class InMemorySchemaRegistryClient:
    """In memory schema registry, for test"""

    schemas: dict = dataclasses.field(default_factory=dict)

    def register_schema(self, subject_name, schema, *_, **__) -> int:
        try:
            return self.schemas[schema].schema_id
        except KeyError:
            schema_id = len(self.schemas)
            self.schemas[schema] = RegisteredSchema(
                schema_id=schema_id,
                schema=Schema(schema, "PROTOBUF", []),
                subject=subject_name,
                version=1,
            )
            return schema_id

    # noinspection PyUnusedLocal
    def lookup_schema(self, subject_name, schema):
        return self.schemas.get(schema, None)


def test_end_to_end_kafka():
    context = SerializationContext("test-topic-1", MessageField.VALUE)
    serializer = ProtobufSerializer(
        DoubleValue, InMemorySchemaRegistryClient(), {"use.deprecated.format": False}
    )
    deserializer = ProtobufDeserializer(DoubleValue, {"use.deprecated.format": False})

    msg_in = DoubleValue(value=3.14)
    kafka_data = serializer(msg_in, context)
    assert isinstance(kafka_data, bytes)
    proto_data = msg_in.SerializeToString()

    assert kafka_data[-len(proto_data) :] == proto_data
    assert len(kafka_data) - len(proto_data) == 6
    deserializer._msg_class().ParseFromString(proto_data)

    with pytest.raises(
        AttributeError, match="'NoneType' object has no attribute 'topic'"
    ):
        deserializer(kafka_data, ctx=None)

    msg_out = deserializer(
        kafka_data, ctx=SerializationContext("test-topic-1", MessageField.VALUE)
    )

    deserializer_no_subject_name = ProtobufDeserializer(
        DoubleValue,
        {"use.deprecated.format": False, "subject.name.strategy": lambda *_: None},
    )
    msg_out2 = deserializer_no_subject_name(kafka_data, ctx=None)
    assert msg_out == msg_out2

    assert isinstance(msg_out, DoubleValue)
    assert msg_out == msg_in

    kafka_data_back = serializer(msg_out, context)

    assert kafka_data_back == kafka_data


if __name__ == "__main__":
    test_end_to_end_kafka()
@and-ratajski
Copy link
Contributor

AvroDeserializer behaves in the same way unfortunately... Since it (this behavior) wasn't present in previous releases shouldn't be it treated as a breaking change?

I mean, changing parameter (moving it from optional into required) is a breaking change...

@0x26res
Copy link
Author

0x26res commented Apr 24, 2025

I mean, changing parameter (moving it from optional into required) is a breaking change...

I think it was optional in name only. If you were provide a None context you'd get an exception in the current release.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants