Skip to content

Commit 2eab7a7

Browse files
committed
State: add get_bytes / set_bytes methods
1 parent a487c04 commit 2eab7a7

File tree

4 files changed

+191
-30
lines changed

4 files changed

+191
-30
lines changed

docs/advanced/stateful-processing.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,3 +158,9 @@ for the state to become slightly out of sync with a topic in between shutdowns a
158158
While the impact of this is generally minimal and only for a small amount of messages, be aware this could cause side effects where the same message may be reprocessed differently, if it depended on certain state conditionals.
159159

160160
"Exactly Once" delivery guarantees avoid this. You can learn more about delivery/processing guarantees [here](https://quix.io/docs/quix-streams/configuration.html?h=#processing-guarantees).
161+
162+
## Serialization
163+
164+
By default, the keys and values are serialized to JSON for storage. If you need to change the serialization format, you can do so using the `rocksdb_options` parameter when creating the `Application` object. This change will apply to all state stores created by the application and existing state will be un-readable.
165+
166+
You can also handle the serialization and deserialization yourself by using the [`State.get_bytes`](../api-reference/state.md#stateget_bytes) and [`State.set_bytes`](../api-reference/state.md#stateset_bytes) methods. This allows you to store any type of values in the state store, as long as you can convert it to bytes and back.

quixstreams/state/base/state.py

Lines changed: 55 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import logging
22
from abc import ABC, abstractmethod
3-
from typing import TYPE_CHECKING, Generic, Optional, TypeVar, overload
3+
from typing import TYPE_CHECKING, Generic, Literal, Optional, TypeVar, overload
44

55
if TYPE_CHECKING:
66
from .transaction import PartitionTransaction
@@ -20,7 +20,7 @@ class State(ABC, Generic[K, V]):
2020
"""
2121

2222
@overload
23-
def get(self, key: K) -> Optional[V]: ...
23+
def get(self, key: K, default: Literal[None] = None) -> Optional[V]: ...
2424

2525
@overload
2626
def get(self, key: K, default: V) -> V: ...
@@ -36,8 +36,32 @@ def get(self, key: K, default: Optional[V] = None) -> Optional[V]:
3636
"""
3737
...
3838

39+
@overload
40+
def get_bytes(self, key: K, default: Literal[None] = None) -> Optional[bytes]: ...
41+
42+
@overload
43+
def get_bytes(self, key: K, default: bytes) -> bytes: ...
44+
45+
def get_bytes(self, key: K, default: Optional[bytes] = None) -> Optional[bytes]:
46+
"""
47+
Get the value for key if key is present in the state, else default
48+
49+
:param key: key
50+
:param default: default value to return if the key is not found
51+
:return: value as bytes or None if the key is not found and `default` is not provided
52+
"""
53+
54+
@abstractmethod
55+
def set(self, key: K, value: V) -> None:
56+
"""
57+
Set value for the key.
58+
:param key: key
59+
:param value: value
60+
"""
61+
...
62+
3963
@abstractmethod
40-
def set(self, key: K, value: V):
64+
def set_bytes(self, key: K, value: bytes) -> None:
4165
"""
4266
Set value for the key.
4367
:param key: key
@@ -81,7 +105,7 @@ def __init__(self, prefix: bytes, transaction: "PartitionTransaction"):
81105
self._transaction = transaction
82106

83107
@overload
84-
def get(self, key: K) -> Optional[V]: ...
108+
def get(self, key: K, default: Literal[None] = None) -> Optional[V]: ...
85109

86110
@overload
87111
def get(self, key: K, default: V) -> V: ...
@@ -96,14 +120,40 @@ def get(self, key: K, default: Optional[V] = None) -> Optional[V]:
96120
"""
97121
return self._transaction.get(key=key, prefix=self._prefix, default=default)
98122

99-
def set(self, key: K, value: V):
123+
@overload
124+
def get_bytes(self, key: K, default: Literal[None] = None) -> Optional[bytes]: ...
125+
126+
@overload
127+
def get_bytes(self, key: K, default: bytes) -> bytes: ...
128+
129+
def get_bytes(self, key: K, default: Optional[bytes] = None) -> Optional[bytes]:
130+
"""
131+
Get the bytes value for key if key is present in the state, else default
132+
133+
:param key: key
134+
:param default: default value to return if the key is not found
135+
:return: value or None if the key is not found and `default` is not provided
136+
"""
137+
return self._transaction.get_bytes(
138+
key=key, prefix=self._prefix, default=default
139+
)
140+
141+
def set(self, key: K, value: V) -> None:
100142
"""
101143
Set value for the key.
102144
:param key: key
103145
:param value: value
104146
"""
105147
return self._transaction.set(key=key, value=value, prefix=self._prefix)
106148

149+
def set_bytes(self, key: K, value: bytes) -> None:
150+
"""
151+
Set value for the key.
152+
:param key: key
153+
:param value: value
154+
"""
155+
return self._transaction.set_bytes(key=key, value=value, prefix=self._prefix)
156+
107157
def delete(self, key: K):
108158
"""
109159
Delete value for the key.

quixstreams/state/base/transaction.py

Lines changed: 77 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
Any,
99
Dict,
1010
Generic,
11+
Literal,
1112
Optional,
1213
Set,
1314
Tuple,
@@ -301,7 +302,6 @@ def get(
301302
@overload
302303
def get(self, key: K, prefix: bytes, default: V, cf_name: str = "default") -> V: ...
303304

304-
@validate_transaction_status(PartitionTransactionStatus.STARTED)
305305
def get(
306306
self,
307307
key: K,
@@ -320,25 +320,71 @@ def get(
320320
:param cf_name: column family name
321321
:return: value or None if the key is not found and `default` is not provided
322322
"""
323+
324+
data = self._get_bytes(key, prefix, cf_name)
325+
if data is Marker.DELETED or data is Marker.UNDEFINED:
326+
return default
327+
328+
return self._deserialize_value(data)
329+
330+
@overload
331+
def get_bytes(
332+
self,
333+
key: K,
334+
prefix: bytes,
335+
default: Literal[None] = None,
336+
cf_name: str = "default",
337+
) -> Optional[bytes]: ...
338+
339+
@overload
340+
def get_bytes(
341+
self, key: K, prefix: bytes, default: bytes, cf_name: str = "default"
342+
) -> bytes: ...
343+
344+
def get_bytes(
345+
self,
346+
key: K,
347+
prefix: bytes,
348+
default: Optional[bytes] = None,
349+
cf_name: str = "default",
350+
) -> Optional[bytes]:
351+
"""
352+
Get a key from the store.
353+
354+
It returns `None` if the key is not found and `default` is not provided.
355+
356+
:param key: key
357+
:param prefix: a key prefix
358+
:param default: default value to return if the key is not found
359+
:param cf_name: column family name
360+
:return: value as bytes or None if the key is not found and `default` is not provided
361+
"""
362+
data = self._get_bytes(key, prefix, cf_name)
363+
if data is Marker.DELETED or data is Marker.UNDEFINED:
364+
return default
365+
366+
return data
367+
368+
@validate_transaction_status(PartitionTransactionStatus.STARTED)
369+
def _get_bytes(
370+
self,
371+
key: K,
372+
prefix: bytes,
373+
cf_name: str = "default",
374+
) -> Union[bytes, Literal[Marker.DELETED, Marker.UNDEFINED]]:
323375
key_serialized = self._serialize_key(key, prefix=prefix)
324376

325377
cached = self._update_cache.get(
326378
key=key_serialized, prefix=prefix, cf_name=cf_name
327379
)
328-
if cached is Marker.DELETED:
329-
return default
330380

331-
if cached is not Marker.UNDEFINED:
332-
return self._deserialize_value(cached)
333-
334-
stored = self._partition.get(key_serialized, cf_name)
335-
if stored is Marker.UNDEFINED:
336-
return default
381+
if cached is Marker.UNDEFINED:
382+
return self._partition.get(key_serialized, cf_name)
337383

338-
return self._deserialize_value(stored)
384+
return cached
339385

340386
@validate_transaction_status(PartitionTransactionStatus.STARTED)
341-
def set(self, key: K, value: V, prefix: bytes, cf_name: str = "default"):
387+
def set(self, key: K, value: V, prefix: bytes, cf_name: str = "default") -> None:
342388
"""
343389
Set value for the key.
344390
:param key: key
@@ -348,11 +394,29 @@ def set(self, key: K, value: V, prefix: bytes, cf_name: str = "default"):
348394
"""
349395

350396
try:
351-
key_serialized = self._serialize_key(key, prefix=prefix)
352397
value_serialized = self._serialize_value(value)
398+
except Exception:
399+
self._status = PartitionTransactionStatus.FAILED
400+
raise
401+
402+
self.set_bytes(key, value_serialized, prefix, cf_name=cf_name)
403+
404+
@validate_transaction_status(PartitionTransactionStatus.STARTED)
405+
def set_bytes(
406+
self, key: K, value: bytes, prefix: bytes, cf_name: str = "default"
407+
) -> None:
408+
"""
409+
Set bytes value for the key.
410+
:param key: key
411+
:param prefix: a key prefix
412+
:param value: value
413+
:param cf_name: column family name
414+
"""
415+
try:
416+
key_serialized = self._serialize_key(key, prefix=prefix)
353417
self._update_cache.set(
354418
key=key_serialized,
355-
value=value_serialized,
419+
value=value,
356420
prefix=prefix,
357421
cf_name=cf_name,
358422
)

tests/test_quixstreams/test_state/test_transaction.py

Lines changed: 53 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,15 @@
3737
[123, 456],
3838
]
3939

40+
TEST_BYTES_VALUES = [
41+
b"null",
42+
b'"string"',
43+
b"123",
44+
b"123.123",
45+
b'{"key":"value","mapping":{"key":"value"}}',
46+
b"[123, 456]",
47+
]
48+
4049
TEST_PREFIXES = [
4150
b"some_bytes",
4251
"string",
@@ -89,29 +98,40 @@ def test_delete_key_doesnt_exist(self, store_partition):
8998
tx.delete("key", prefix=prefix)
9099

91100
@pytest.mark.parametrize(
92-
"key",
93-
TEST_KEYS,
94-
)
95-
@pytest.mark.parametrize(
96-
"value",
97-
TEST_VALUES,
101+
"key,value,bytes_value",
102+
zip(TEST_KEYS, TEST_VALUES, TEST_BYTES_VALUES),
98103
)
99-
def test_get_key_exists_cached(self, key, value, store_partition):
104+
def test_get_key_exists_cached(self, key, value, bytes_value, store_partition):
100105
prefix = b"__key__"
101106
with store_partition.begin() as tx:
102107
tx.set(key, value, prefix=prefix)
103108
stored = tx.get(key, prefix=prefix)
104109
assert stored == value
105110

111+
stored_bytes = tx.get_bytes(key, prefix=prefix)
112+
assert stored_bytes == bytes_value
113+
106114
@pytest.mark.parametrize(
107-
"key",
108-
TEST_KEYS,
115+
"key,value,bytes_value",
116+
zip(TEST_KEYS, TEST_VALUES, TEST_BYTES_VALUES),
109117
)
118+
def test_bytes_get_key_exists_cached(
119+
self, key, value, bytes_value, store_partition
120+
):
121+
prefix = b"__key__"
122+
with store_partition.begin() as tx:
123+
tx.set_bytes(key, bytes_value, prefix=prefix)
124+
stored = tx.get(key, prefix=prefix)
125+
assert stored == value
126+
127+
stored_bytes = tx.get_bytes(key, prefix=prefix)
128+
assert stored_bytes == bytes_value
129+
110130
@pytest.mark.parametrize(
111-
"value",
112-
TEST_VALUES,
131+
"key,value,bytes_value",
132+
zip(TEST_KEYS, TEST_VALUES, TEST_BYTES_VALUES),
113133
)
114-
def test_get_key_exists_no_cache(self, key, value, store_partition):
134+
def test_get_key_exists_no_cache(self, key, value, bytes_value, store_partition):
115135
prefix = b"__key__"
116136
with store_partition.begin() as tx:
117137
tx.set(key, value, prefix=prefix)
@@ -120,6 +140,27 @@ def test_get_key_exists_no_cache(self, key, value, store_partition):
120140
stored = tx.get(key, prefix=prefix)
121141
assert stored == value
122142

143+
stored_bytes = tx.get_bytes(key, prefix=prefix)
144+
assert stored_bytes == bytes_value
145+
146+
@pytest.mark.parametrize(
147+
"key,value,bytes_value",
148+
zip(TEST_KEYS, TEST_VALUES, TEST_BYTES_VALUES),
149+
)
150+
def test_bytes_get_key_exists_no_cache(
151+
self, key, value, bytes_value, store_partition
152+
):
153+
prefix = b"__key__"
154+
with store_partition.begin() as tx:
155+
tx.set_bytes(key, bytes_value, prefix=prefix)
156+
157+
with store_partition.begin() as tx:
158+
stored = tx.get(key, prefix=prefix)
159+
assert stored == value
160+
161+
stored_bytes = tx.get_bytes(key, prefix=prefix)
162+
assert stored_bytes == bytes_value
163+
123164
def test_get_key_doesnt_exist_default(self, store_partition):
124165
prefix = b"__key__"
125166
with store_partition.begin() as tx:

0 commit comments

Comments
 (0)