Skip to content

Commit 3499108

Browse files
committedFeb 27, 2025
adding a lock for async batching
1 parent 6ede6ba commit 3499108

File tree

2 files changed

+25
-3
lines changed

2 files changed

+25
-3
lines changed
 

‎src/surrealdb/connections/async_ws.py

+16-3
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
"""
22
A basic async connection to a SurrealDB instance.
33
"""
4-
54
import asyncio
65
import uuid
76
from asyncio import Queue
@@ -49,7 +48,8 @@ def __init__(
4948
self.id: str = str(uuid.uuid4())
5049
self.token: Optional[str] = None
5150
self.socket = None
52-
self.recv_lock = asyncio.Lock()
51+
self.lock = asyncio.Lock()
52+
self.ref = dict()
5353

5454
async def _send(
5555
self, message: RequestMessage, process: str, bypass: bool = False
@@ -58,9 +58,22 @@ async def _send(
5858
assert (
5959
self.socket is not None
6060
) # will always not be None as the self.connect ensures there's a connection
61+
62+
query_id = message.id
6163
await self.socket.send(message.WS_CBOR_DESCRIPTOR)
62-
async with self.recv_lock:
64+
65+
async with self.lock:
6366
response = decode(await self.socket.recv())
67+
self.ref[response["id"]] = response
68+
69+
# wait for ID to be returned
70+
while self.ref.get(query_id) is None:
71+
await asyncio.sleep(0) # The await simply yields to the executor to avoid deadlocks
72+
73+
# set the response and clean up
74+
response = self.ref[query_id]
75+
del self.ref[query_id]
76+
6477
if bypass is False:
6578
self.check_response_for_error(response, process)
6679
return response
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
2+
3+
# singleton
4+
5+
# dict for sockets
6+
7+
# socket with smart reference counter
8+
9+
#

0 commit comments

Comments
 (0)