Skip to content

Commit e7ec855

Browse files
committed
Add read_stream and read_category special methods
1 parent cd6c6f6 commit e7ec855

File tree

4 files changed

+121
-15
lines changed

4 files changed

+121
-15
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,3 +145,6 @@ poetry.lock
145145

146146
# Postgres data
147147
pgdata/
148+
149+
# Project-specific
150+
database/

message_db/client.py

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from __future__ import annotations
22

3-
from typing import Dict
3+
from typing import Dict, List
44
from uuid import uuid4
55

66
from psycopg2.extensions import connection
@@ -98,7 +98,7 @@ def write(
9898

9999
return position
100100

101-
def write_batch(self, stream_name, data, expected_version: int = None):
101+
def write_batch(self, stream_name, data, expected_version: int = None) -> None:
102102
conn = self.connection_pool.get_connection()
103103

104104
try:
@@ -117,14 +117,21 @@ def write_batch(self, stream_name, data, expected_version: int = None):
117117

118118
return expected_version
119119

120-
def read(self, stream_name, position=0, no_of_messages=1000):
120+
def read(
121+
self,
122+
stream_name: str,
123+
sql: str = None,
124+
position: int = 0,
125+
no_of_messages: int = 1000,
126+
) -> List[Dict]:
121127
conn = self.connection_pool.get_connection()
122128
cursor = conn.cursor(cursor_factory=RealDictCursor)
123129

124-
if "-" in stream_name:
125-
sql = "SELECT * FROM get_stream_messages(%(stream_name)s, %(position)s, %(batch_size)s);"
126-
else:
127-
sql = "SELECT * FROM get_category_messages(%(stream_name)s, %(position)s, %(batch_size)s);"
130+
if not sql:
131+
if "-" in stream_name:
132+
sql = "SELECT * FROM get_stream_messages(%(stream_name)s, %(position)s, %(batch_size)s);"
133+
else:
134+
sql = "SELECT * FROM get_category_messages(%(stream_name)s, %(position)s, %(batch_size)s);"
128135

129136
cursor.execute(
130137
sql,
@@ -142,6 +149,28 @@ def read(self, stream_name, position=0, no_of_messages=1000):
142149

143150
return messages
144151

152+
def read_stream(self, stream_name, position=0, no_of_messages=1000) -> List[Dict]:
153+
if "-" not in stream_name:
154+
raise ValueError(f"{stream_name} is not a stream")
155+
156+
sql = "SELECT * FROM get_stream_messages(%(stream_name)s, %(position)s, %(batch_size)s);"
157+
158+
return self.read(
159+
stream_name, sql=sql, position=position, no_of_messages=no_of_messages
160+
)
161+
162+
def read_category(
163+
self, category_name, position=0, no_of_messages=1000
164+
) -> List[Dict]:
165+
if "-" in category_name:
166+
raise ValueError(f"{category_name} is not a category")
167+
168+
sql = "SELECT * FROM get_category_messages(%(stream_name)s, %(position)s, %(batch_size)s);"
169+
170+
return self.read(
171+
category_name, sql=sql, position=position, no_of_messages=no_of_messages
172+
)
173+
145174
def read_last_message(self, stream_name):
146175
conn = self.connection_pool.get_connection()
147176
cursor = conn.cursor(cursor_factory=RealDictCursor)

pyproject.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@ version = "0.1.0"
44
description = "The Python interface to the MessageDB Event Store and Message Store"
55
authors = ["Subhash Bhushan <[email protected]>"]
66

7+
packages = [
8+
{ include = "message_db" },
9+
]
10+
711
[tool.poetry.dependencies]
812
python = "^3.7"
913
psycopg2 = "^2.9.2"

tests/test_client.py

Lines changed: 78 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,13 @@ class TestMessageWrite:
2828
def test_write_to_store(self, client):
2929
client.write("testStream-123", "Event1", {"foo": "bar"})
3030

31-
messages = client.read("testStream-123", 0, 100)
31+
messages = client.read("testStream-123")
3232
assert len(messages) == 1
3333

3434
def test_writing_message_with_metadata(self, client):
3535
client.write("testStream-123", "Event1", {"foo": "bar"}, {"trace_id": "baz"})
3636

37-
messages = client.read("testStream-123", 0, 100)
37+
messages = client.read("testStream-123")
3838
assert messages[0]["metadata"] == json.dumps({"trace_id": "baz"})
3939

4040
def test_that_write_returns_position_of_message_written(self, client):
@@ -84,19 +84,19 @@ def test_that_write_fails_on_expected_version_mismatch(self, client):
8484
)
8585

8686

87-
class TestEventIO:
87+
class TestRead:
8888
def test_read_stream_from_store(self, client):
8989
client.write("testStream-123", "Event1", {"foo": "bar"})
9090

91-
messages = client.read("testStream-123", 0, 100)
91+
messages = client.read("testStream-123")
9292
assert messages is not None
9393
assert messages[0]["data"] == json.dumps({"foo": "bar"})
9494

9595
def test_read_multiple_stream_messages_from_store(self, client):
9696
for i in range(5):
9797
client.write("testStream-123", "Event1", {f"foo{i}": f"bar{i}"})
9898

99-
messages = client.read("testStream-123", 0, 100)
99+
messages = client.read("testStream-123")
100100

101101
assert messages is not None
102102
assert len(messages) == 5
@@ -106,7 +106,7 @@ def test_read_paginated_stream_messages_from_store(self, client):
106106
for i in range(5):
107107
client.write("testStream-123", "Event1", {f"foo{i}": f"bar{i}"})
108108

109-
messages = client.read("testStream-123", 0, 3)
109+
messages = client.read("testStream-123", no_of_messages=3)
110110

111111
assert messages is not None
112112
assert len(messages) == 3
@@ -126,7 +126,7 @@ def test_read_specific_stream_message(self, client):
126126
for i in range(5):
127127
client.write("testStream-456", "Event1", {"foo": f"baz{i}"})
128128

129-
messages = client.read("testStream-456", 0, 100)
129+
messages = client.read("testStream-456")
130130

131131
assert len(messages) == 5
132132
assert messages[4]["data"] == json.dumps({"foo": "baz4"})
@@ -135,7 +135,77 @@ def test_read_category_messages(self, client):
135135
for i in range(5):
136136
client.write("testStream-123", "Event1", {"foo": f"bar{i}"})
137137

138-
messages = client.read("testStream", 0, 100)
138+
messages = client.read("testStream")
139139

140140
assert len(messages) == 5
141141
assert messages[4]["data"] == json.dumps({"foo": "bar4"})
142+
143+
144+
class TestReadStream:
145+
def test_reading_a_category_throws_error(self, client):
146+
with pytest.raises(ValueError) as exc:
147+
client.read_stream("testStream")
148+
149+
assert exc.value.args[0] == "testStream is not a stream"
150+
151+
def test_read_stream_from_store(self, client):
152+
client.write("testStream-123", "Event1", {"foo": "bar"})
153+
154+
messages = client.read_stream("testStream-123")
155+
assert messages is not None
156+
assert messages[0]["data"] == json.dumps({"foo": "bar"})
157+
158+
def test_read_multiple_stream_messages_from_store(self, client):
159+
for i in range(5):
160+
client.write("testStream-123", "Event1", {f"foo{i}": f"bar{i}"})
161+
162+
messages = client.read_stream("testStream-123")
163+
164+
assert messages is not None
165+
assert len(messages) == 5
166+
assert messages[4]["data"] == json.dumps({"foo4": "bar4"})
167+
168+
def test_read_paginated_stream_messages_from_store(self, client):
169+
for i in range(5):
170+
client.write("testStream-123", "Event1", {f"foo{i}": f"bar{i}"})
171+
172+
messages = client.read_stream("testStream-123", no_of_messages=3)
173+
174+
assert messages is not None
175+
assert len(messages) == 3
176+
assert messages[2]["data"] == json.dumps({"foo2": "bar2"})
177+
178+
179+
class TestReadCategory:
180+
def test_reading_a_stream_throws_error(self, client):
181+
with pytest.raises(ValueError) as exc:
182+
client.read_category("testStream-123")
183+
184+
assert exc.value.args[0] == "testStream-123 is not a category"
185+
186+
def test_read_category_messages_from_store(self, client):
187+
client.write("testStream-123", "Event1", {"foo": "bar"})
188+
189+
messages = client.read_category("testStream")
190+
assert messages is not None
191+
assert messages[0]["data"] == json.dumps({"foo": "bar"})
192+
193+
def test_read_multiple_category_messages_from_store(self, client):
194+
for i in range(5):
195+
client.write("testStream-123", "Event1", {f"foo{i}": f"bar{i}"})
196+
197+
messages = client.read_category("testStream")
198+
199+
assert messages is not None
200+
assert len(messages) == 5
201+
assert messages[4]["data"] == json.dumps({"foo4": "bar4"})
202+
203+
def test_read_paginated_category_messages_from_store(self, client):
204+
for i in range(5):
205+
client.write("testStream-123", "Event1", {f"foo{i}": f"bar{i}"})
206+
207+
messages = client.read_category("testStream", no_of_messages=3)
208+
209+
assert messages is not None
210+
assert len(messages) == 3
211+
assert messages[2]["data"] == json.dumps({"foo2": "bar2"})

0 commit comments

Comments
 (0)