Skip to content

Commit 888829a

Browse files
authored
Add support for comma-separated URLs (#1866)
* Add support for comma-separated URLs * Fix test * Fix test * Minor fix
1 parent 231083f commit 888829a

File tree

4 files changed

+71
-21
lines changed

4 files changed

+71
-21
lines changed

src/confluent_kafka/schema_registry/schema_registry_client.py

+67-17
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
cast, Optional, Union, Any, Tuple
3434

3535
from cachetools import TTLCache, LRUCache
36+
from httpx import Response
3637

3738
from .error import SchemaRegistryError
3839

@@ -70,9 +71,15 @@ def __init__(self, conf: dict):
7071
raise ValueError("Missing required configuration property url")
7172
if not isinstance(base_url, string_type):
7273
raise TypeError("url must be a str, not " + str(type(base_url)))
73-
if not base_url.startswith('http') and not base_url.startswith('mock'):
74-
raise ValueError("Invalid url {}".format(base_url))
75-
self.base_url = base_url.rstrip('/')
74+
base_urls = []
75+
for url in base_url.split(','):
76+
url = url.strip().rstrip('/')
77+
if not url.startswith('http') and not url.startswith('mock'):
78+
raise ValueError("Invalid url {}".format(url))
79+
base_urls.append(url)
80+
if not base_urls:
81+
raise ValueError("Missing required configuration property url")
82+
self.base_urls = base_urls
7683

7784
self.verify = True
7885
ca = conf_copy.pop('ssl.ca.location', None)
@@ -93,7 +100,7 @@ def __init__(self, conf: dict):
93100
raise ValueError("ssl.certificate.location required when"
94101
" configuring ssl.key.location")
95102

96-
parsed = urlparse(base_url)
103+
parsed = urlparse(self.base_urls[0])
97104
try:
98105
userinfo = (unquote(parsed.username), unquote(parsed.password))
99106
except (AttributeError, TypeError):
@@ -219,7 +226,7 @@ def send_request(
219226
query: dict = None
220227
) -> Any:
221228
"""
222-
Sends HTTP request to the SchemaRegistry.
229+
Sends HTTP request to the SchemaRegistry, trying each base URL in turn.
223230
224231
All unsuccessful attempts will raise a SchemaRegistryError with the
225232
response contents. In most cases this will be accompanied by a
@@ -250,21 +257,22 @@ def send_request(
250257
'Content-Type': "application/vnd.schemaregistry.v1+json"}
251258

252259
response = None
253-
for i in range(self.max_retries + 1):
254-
response = self.session.request(
255-
method, url="/".join([self.base_url, url]),
256-
headers=headers, data=body, params=query)
260+
for i, base_url in enumerate(self.base_urls):
261+
try:
262+
response = self.send_http_request(
263+
base_url, url, method, headers, body, query)
257264

258-
if (is_success(response.status_code)
259-
or not is_retriable(response.status_code)
260-
or i >= self.max_retries):
261-
break
265+
if is_success(response.status_code):
266+
return response.json()
262267

263-
time.sleep(full_jitter(self.retries_wait_ms, self.retries_max_wait_ms, i) / 1000)
268+
if not is_retriable(response.status_code) or i == len(self.base_urls) - 1:
269+
break
270+
except Exception as e:
271+
if i == len(self.base_urls) - 1:
272+
# Raise the exception since we have no more urls to try
273+
raise e
264274

265275
try:
266-
if 200 <= response.status_code <= 299:
267-
return response.json()
268276
raise SchemaRegistryError(response.status_code,
269277
response.json().get('error_code'),
270278
response.json().get('message'))
@@ -275,6 +283,48 @@ def send_request(
275283
"Unknown Schema Registry Error: "
276284
+ str(response.content))
277285

286+
def send_http_request(
287+
self, base_url: str, url: str, method: str, headers: dict,
288+
body: Optional[str] = None, query: dict = None
289+
) -> Response:
290+
"""
291+
Sends HTTP request to the SchemaRegistry.
292+
293+
All unsuccessful attempts will raise a SchemaRegistryError with the
294+
response contents. In most cases this will be accompanied by a
295+
Schema Registry supplied error code.
296+
297+
In the event the response is malformed an error_code of -1 will be used.
298+
299+
Args:
300+
base_url (str): Schema Registry base URL
301+
302+
url (str): Request path
303+
304+
method (str): HTTP method
305+
306+
headers (dict): Headers
307+
308+
body (str): Request content
309+
310+
query (dict): Query params to attach to the URL
311+
312+
Returns:
313+
Response: Schema Registry response content.
314+
"""
315+
for i in range(self.max_retries + 1):
316+
response = self.session.request(
317+
method, url="/".join([base_url, url]),
318+
headers=headers, data=body, params=query)
319+
320+
if is_success(response.status_code):
321+
return response
322+
323+
if not is_retriable(response.status_code) or i >= self.max_retries:
324+
return response
325+
326+
time.sleep(full_jitter(self.retries_wait_ms, self.retries_max_wait_ms, i) / 1000)
327+
278328

279329
def is_success(status_code: int) -> bool:
280330
return 200 <= status_code <= 299
@@ -495,7 +545,7 @@ class SchemaRegistryClient(object):
495545
+------------------------------+------+-------------------------------------------------+
496546
| Property name | type | Description |
497547
+==============================+======+=================================================+
498-
| ``url`` * | str | Schema Registry URL. |
548+
| ``url`` * | str | Comma-separated list of Schema Registry URLs. |
499549
+------------------------------+------+-------------------------------------------------+
500550
| | | Path to CA certificate file used |
501551
| ``ssl.ca.location`` | str | to verify the Schema Registry's |

src/confluent_kafka/schema_registry/serde.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,7 @@ def _execute_rules(
311311

312312
for index in range(len(rules)):
313313
rule = rules[index]
314-
if rule.disabled:
314+
if self._is_disabled(rule):
315315
continue
316316
if rule.mode == RuleMode.WRITEREAD:
317317
if rule_mode != RuleMode.READ and rule_mode != RuleMode.WRITE:

tests/schema_registry/test_avro_serdes.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -632,13 +632,13 @@ def test_avro_cel_field_transform_disable():
632632
registry = RuleRegistry()
633633
registry.register_rule_executor(CelFieldExecutor())
634634
registry.register_override(RuleOverride("CEL_FIELD", None, None, True))
635-
ser = AvroSerializer(client, schema_str=None, conf=ser_conf)
635+
ser = AvroSerializer(client, schema_str=None, conf=ser_conf, rule_registry=registry)
636636
ser_ctx = SerializationContext(_TOPIC, MessageField.VALUE)
637637
obj_bytes = ser(obj, ser_ctx)
638638

639639
deser = AvroDeserializer(client)
640640
newobj = deser(obj_bytes, ser_ctx)
641-
assert obj == newobj
641+
assert "hi" == newobj['stringField']
642642

643643

644644
def test_avro_cel_field_transform_complex():

tests/schema_registry/test_config.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ def test_config_url_none():
5454
def test_config_url_trailing_slash():
5555
conf = {'url': 'http://SchemaRegistry:65534/'}
5656
test_client = SchemaRegistryClient(conf)
57-
assert test_client._rest_client.base_url == TEST_URL
57+
assert test_client._rest_client.base_urls == [TEST_URL]
5858

5959

6060
def test_config_ssl_key_no_certificate():

0 commit comments

Comments
 (0)