Skip to content

Commit 160c1c0

Browse files
author
Ryan P
authored
Support encoding with fastavro (confluentinc#492)
1 parent df649a4 commit 160c1c0

File tree

1 file changed

+14
-20
lines changed

1 file changed

+14
-20
lines changed

confluent_kafka/avro/serializer/message_serializer.py

+14-20
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939

4040
HAS_FAST = False
4141
try:
42-
from fastavro import schemaless_reader
42+
from fastavro import schemaless_reader, schemaless_writer
4343

4444
HAS_FAST = True
4545
except ImportError:
@@ -75,9 +75,13 @@ def __init__(self, registry_client, reader_key_schema=None, reader_value_schema=
7575
self.reader_key_schema = reader_key_schema
7676
self.reader_value_schema = reader_value_schema
7777

78-
'''
79-
80-
'''
78+
# Encoder support
79+
def _get_encoder_func(self, writer_schema):
80+
if HAS_FAST:
81+
schema = writer_schema.to_json()
82+
return lambda record, fp: schemaless_writer(fp, schema, record)
83+
writer = avro.io.DatumWriter(writer_schema)
84+
return lambda record, fp: writer.write(record, avro.io.BinaryEncoder(fp))
8185

8286
def encode_record_with_schema(self, topic, schema, record, is_key=False):
8387
"""
@@ -104,7 +108,7 @@ def encode_record_with_schema(self, topic, schema, record, is_key=False):
104108
raise serialize_err(message)
105109

106110
# cache writer
107-
self.id_to_writers[schema_id] = avro.io.DatumWriter(schema)
111+
self.id_to_writers[schema_id] = self._get_encoder_func(schema)
108112

109113
return self.encode_record_with_schema_id(schema_id, record, is_key=is_key)
110114

@@ -128,29 +132,19 @@ def encode_record_with_schema_id(self, schema_id, record, is_key=False):
128132
schema = self.registry_client.get_by_id(schema_id)
129133
if not schema:
130134
raise serialize_err("Schema does not exist")
131-
self.id_to_writers[schema_id] = avro.io.DatumWriter(schema)
135+
self.id_to_writers[schema_id] = self._get_encoder_func(schema)
132136
except ClientError:
133137
exc_type, exc_value, exc_traceback = sys.exc_info()
134138
raise serialize_err(repr(traceback.format_exception(exc_type, exc_value, exc_traceback)))
135139

136140
# get the writer
137141
writer = self.id_to_writers[schema_id]
138142
with ContextStringIO() as outf:
139-
# write the header
140-
# magic byte
141-
142-
outf.write(struct.pack('b', MAGIC_BYTE))
143-
144-
# write the schema ID in network byte order (big end)
145-
146-
outf.write(struct.pack('>I', schema_id))
143+
# Write the magic byte and schema ID in network byte order (big endian)
144+
outf.write(struct.pack('>bI', MAGIC_BYTE, schema_id))
147145

148-
# write the record to the rest of it
149-
# Create an encoder that we'll write to
150-
encoder = avro.io.BinaryEncoder(outf)
151-
# write the magic byte
152-
# write the object in 'obj' as Avro to the fake file...
153-
writer.write(record, encoder)
146+
# write the record to the rest of the buffer
147+
writer(record, outf)
154148

155149
return outf.getvalue()
156150

0 commit comments

Comments
 (0)