Skip to content

Commit 406a2bd

Browse files
roopahcedenhill
authored andcommitted
Producer client for handling avro schemas (confluentinc#40, @roopahc, @criccomini)
1 parent d02872b commit 406a2bd

21 files changed

+1684
-10
lines changed

.travis.yml

+4-2
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
language: python
2+
sudo: required
23
python:
34
- "2.7"
45
- "3.4"
56
before_install:
67
- bash tools/bootstrap-librdkafka.sh v0.9.2 tmp-build
8+
- pip install --upgrade pip
79
- pip install pytest-timeout
810
install:
9-
- pip install -v --global-option=build_ext --global-option="-Itmp-build/include/" --global-option="-Ltmp-build/lib" .
11+
- pip install -v --global-option=build_ext --global-option="-Itmp-build/include/" --global-option="-Ltmp-build/lib" . .[avro]
1012
env:
1113
- LD_LIBRARY_PATH=$PWD/tmp-build/lib
12-
script: py.test -v --timeout 20 --ignore=tmp-build
14+
script: py.test -v --timeout 20 --ignore=tmp-build --import-mode append

README.md

+20
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,20 @@ while running:
4646
c.close()
4747
```
4848

49+
**AvroProducer**
50+
```
51+
from confluent_kafka import avro
52+
from confluent_kafka.avro import AvroProducer
53+
54+
value_schema = avro.load('ValueSchema.avsc')
55+
key_schema = avro.load('KeySchema.avsc')
56+
value = {"name": "Value"}
57+
key = {"name": "Key"}
58+
59+
avroProducer = AvroProducer({'bootstrap.servers': 'mybroker,mybroker2', 'schema.registry.url': 'http://schem_registry_host:port'}, default_key_schema=key_schema, default_value_schema=value_schema)
60+
avroProducer.produce(topic='my_topic', value=value, key=key)
61+
```
62+
4963
See [examples](examples) for more examples.
5064

5165

@@ -85,12 +99,18 @@ Install
8599
**Install from PyPi:**
86100

87101
$ pip install confluent-kafka
102+
103+
# for AvroProducer
104+
$ pip install confluent-kafka[avro]
88105

89106

90107
**Install from source / tarball:**
91108

92109
$ pip install .
93110

111+
# for AvroProducer
112+
$ pip install .[avro]
113+
94114

95115
Build
96116
=====

confluent_kafka/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
__all__ = ['cimpl','kafkatest']
1+
__all__ = ['cimpl', 'avro', 'kafkatest']
22
from .cimpl import *

confluent_kafka/avro/__init__.py

+114
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
"""
2+
Avro schema registry module: Deals with encoding and decoding of messages with avro schemas
3+
4+
"""
5+
import sys
6+
7+
from confluent_kafka import Producer
8+
9+
VALID_LEVELS = ['NONE', 'FULL', 'FORWARD', 'BACKWARD']
10+
11+
12+
def loads(schema_str):
13+
""" Parse a schema given a schema string """
14+
if sys.version_info[0] < 3:
15+
return schema.parse(schema_str)
16+
else:
17+
return schema.Parse(schema_str)
18+
19+
20+
def load(fp):
21+
""" Parse a schema from a file path """
22+
with open(fp) as f:
23+
return loads(f.read())
24+
25+
26+
# avro.schema.RecordSchema and avro.schema.PrimitiveSchema classes are not hashable. Hence defining them explicitely as a quick fix
27+
def _hash_func(self):
28+
return hash(str(self))
29+
30+
31+
try:
32+
from avro import schema
33+
34+
schema.RecordSchema.__hash__ = _hash_func
35+
schema.PrimitiveSchema.__hash__ = _hash_func
36+
except ImportError:
37+
pass
38+
39+
40+
class ClientError(Exception):
41+
""" Error thrown by Schema Registry clients """
42+
43+
def __init__(self, message, http_code=None):
44+
self.message = message
45+
self.http_code = http_code
46+
super(ClientError, self).__init__(self.__str__())
47+
48+
def __repr__(self):
49+
return "ClientError(error={error})".format(error=self.message)
50+
51+
def __str__(self):
52+
return self.message
53+
54+
55+
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
56+
from confluent_kafka.avro.serializer import SerializerError
57+
from confluent_kafka.avro.serializer.message_serializer import MessageSerializer
58+
59+
60+
class AvroProducer(Producer):
61+
"""
62+
Kafka Producer client which does avro schema encoding to messages.
63+
Handles schema registration, Message serialization.
64+
65+
Constructor takes below parameters
66+
67+
@:param: config: dict object with config parameters containing url for schema registry (schema.registry.url).
68+
@:param: default_key_schema: Optional avro schema for key
69+
@:param: default_value_schema: Optional avro schema for value
70+
"""
71+
72+
def __init__(self, config, default_key_schema=None,
73+
default_value_schema=None):
74+
if ('schema.registry.url' not in config.keys()):
75+
raise ValueError("Missing parameter: schema.registry.url")
76+
schem_registry_url = config["schema.registry.url"]
77+
del config["schema.registry.url"]
78+
79+
super(AvroProducer, self).__init__(config)
80+
self._serializer = MessageSerializer(CachedSchemaRegistryClient(url=schem_registry_url))
81+
self._key_schema = default_key_schema
82+
self._value_schema = default_value_schema
83+
84+
def produce(self, **kwargs):
85+
"""
86+
Sends message to kafka by encoding with specified avro schema
87+
@:param: topic: topic name
88+
@:param: value: A dictionary object
89+
@:param: value_schema : Avro schema for value
90+
@:param: key: A dictionary object
91+
@:param: key_schema : Avro schema for key
92+
@:exception: SerializerError
93+
"""
94+
# get schemas from kwargs if defined
95+
key_schema = kwargs.pop('key_schema', self._key_schema)
96+
value_schema = kwargs.pop('value_schema', self._value_schema)
97+
topic = kwargs.pop('topic', None)
98+
if not topic:
99+
raise ClientError("Topic name not specified.")
100+
value = kwargs.pop('value', None)
101+
key = kwargs.pop('key', None)
102+
if value:
103+
if value_schema:
104+
value = self._serializer.encode_record_with_schema(topic, value_schema, value)
105+
else:
106+
raise SerializerError("Avro schema required for value")
107+
108+
if key:
109+
if key_schema:
110+
key = self._serializer.encode_record_with_schema(topic, key_schema, key, True)
111+
else:
112+
raise SerializerError("Avro schema required for key")
113+
114+
super(AvroProducer, self).produce(topic, value, key, **kwargs)

0 commit comments

Comments
 (0)