Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug: Can't use async concurrency with websocket connection. #164

Closed
2 tasks done
Anton-2 opened this issue Feb 25, 2025 · 9 comments
Closed
2 tasks done

Bug: Can't use async concurrency with websocket connection. #164

Anton-2 opened this issue Feb 25, 2025 · 9 comments
Labels
bug Something isn't working

Comments

@Anton-2
Copy link
Contributor

Anton-2 commented Feb 25, 2025

Describe the bug

When using AsyncSurreal with web socket (ws:// url), any attempt to have concurrent async requests fails with "cannot call recv while another coroutine is already running recv or recv_streaming".

This does not happen with the http client in v1.0.3.

Probably related to #101 and maybe #160 (async issues)

What is the status of this ? Should we use the http connection for async or is it a bug ? I remember the http connection having restrictions, and ws being the recommended way to use SurrealDB, so it seems strange to me.

#101 is closed with a message saying that concurrency issues are solved #101 (comment) but how can we "get 100% control over the async coroutine" ?

I really want to use SurrealDB, but for this async support in python should be rock solid.

Steps to reproduce

Here is a demo program (adapt for your auth, namespace and database)
Change the connection method (http to ws in AsyncSurreal("http://localhost:8000/rpc")) to go from working to failure.

import asyncio
from surrealdb import AsyncSurreal

async def main(n):
    async with AsyncSurreal("http://localhost:8000/rpc") as db:
        await db.signin({"username": 'root', "password": 'root'})
        await db.use("test", "test")
        async with asyncio.TaskGroup() as tg:
            tasks = [tg.create_task(db.query("select $p**2 as ret from {}", dict(p=num))) for num in range(n)]
    return [t.result()[0]["ret"] for t in tasks]

if __name__ == "__main__":
    print(asyncio.run(main(5)))

Expected behaviour

Working : (with http:) [0, 1, 4, 9, 16]
Not Working : (with ws:) websockets.exceptions.ConcurrencyError: cannot call recv while another coroutine is already running recv or recv_streaming

SurrealDB version

surreal version 2.2.1 for macos on aarch64

surrealdb.py version

surrealdb.py 1.0.3 for macOS on aarch64 using python 3.13.1

Contact Details

No response

Is there an existing issue for this?

  • I have searched the existing issues

Code of Conduct

  • I agree to follow this project's Code of Conduct
@Anton-2 Anton-2 added the bug Something isn't working label Feb 25, 2025
@maxwellflitton
Copy link
Contributor

@Anton-2 thanks for the bug report, I'll run through this code example in a debugger to see what's going on. Currently working on a serialization of None error, then a RecordId check error but then after those this will be next.

@maxwellflitton
Copy link
Contributor

Hey @Anton-2 I've now prepared a pull request. I put an async lock on the recv function and it's working. You can see the pull request here. However, this only has support for python 3.11 onwards. I'll work on checking the python version before enabling the lock

@Anton-2
Copy link
Contributor Author

Anton-2 commented Feb 26, 2025

Hi @maxwellflitton, great to see that you are looking into this !

I fear a lock will destroy performance in case of query taking some time to return from the server. All queries coming after the one taking time will be waiting, while issuing them to SurrealDB would ensure the server is working concurrently.

And I don't see how it will work with live queries...

For what I've seen in the WS protocol, ideally you issue each request with a unique ID, and wait for the response (may be out of order). The response is correlated with the request id.

I've got a POC working : https://gist.github.com/Anton-2/86370b562c7c33ebe455c5da7e236c82

a query:

  • dispatch each request with an unique id
  • creates a future, and associate it with the request id
  • waits for the future to be done before returning results.

And we've got a receive task taking all response, finding the associated future and setting the result.

It's a bit rough :

  • needs to look at live queries
  • need to rework the message.id part (no need to put a constant one to overwrite after, no need to use an UUID, ...)
  • There is two path for connect (using .connect or in a context manager) : I've only handled the context manager, and it should be unified (context manager part should just call connect and close).

I could try to make a real PR tomorrow, but feel free to use it to build a better solution if you're on the subject !

@maxwellflitton
Copy link
Contributor

@Anton-2 I like how you're thinking. On my pull request, you can see that I have switched the creation of the id of the request to the RequestMessage here so we can extract the id of the request message. The lock still makes it pass the tests, now that we have a stable implementation, we can move onto more optimized approaches, running the tests again and again as we iterate. I also need to see if the lock is blocking. If the lock is non-blocking then it's not going to be much of a performance cost as we can only have one coroutine reading the buffer from the socket at a time.

@Anton-2
Copy link
Contributor Author

Anton-2 commented Feb 26, 2025

PR #166 adds a delay in some of the requests sent concurrently during the async test. With the current lock approach it fails as query 2 will return before query 1, and so query 1 will respond with query 2 results...

You can trivially fix this by moving the await self.socket.send(message.WS_CBOR_DESCRIPTOR) line in the lock.

However, I still think the lock will kill concurrency, and that a single receive task dispatching responses to requests is the way to go. I'll try to create a PR for this tomorrow.

@maxwellflitton
Copy link
Contributor

I'm going to do some further testing but I don't think that locks will affect performance too much as they are not blocking. We can test a lock with the following code:

import asyncio

async def worker(name, lock):
    print(f"{name}: Waiting for lock...")
    async with lock:  # This suspends the coroutine instead of blocking the loop
        print(f"{name}: Acquired lock!")
        await asyncio.sleep(2)  # Simulate work while holding the lock
        print(f"{name}: Releasing lock!")

async def other_task():
    """A task that runs independently while others wait for the lock."""
    for i in range(5):
        print(f"Other Task: Running iteration {i}")
        await asyncio.sleep(0.5)  # This keeps running while workers wait for lock

async def main():
    lock = asyncio.Lock()
    
    # Launch multiple worker tasks that need the lock
    workers = [worker(f"Task-{i}", lock) for i in range(3)]
    
    # Run workers along with an independent task
    await asyncio.gather(*workers, other_task())

asyncio.run(main())

Running this code will give us the statements below:

Task-0: Waiting for lock...
Task-0: Acquired lock!
Task-1: Waiting for lock...
Task-2: Waiting for lock...
Other Task: Running iteration 0
Other Task: Running iteration 1
Other Task: Running iteration 2
Other Task: Running iteration 3
Task-0: Releasing lock!
Task-1: Acquired lock!
Other Task: Running iteration 4
Task-1: Releasing lock!
Task-2: Acquired lock!
Task-2: Releasing lock!

This is single threaded and other tasks are running while we're waiting for the lock. We can also never have two coroutines trying to hit the recv at the same time otherwise we get an error. So a non-blocking lock should be very minimal. We also have to be safe. If a user copies the handle and there isn't a lock for the recv then we also run the risk of erroring out. I've now implemented the following:

    async def _send(
        self, message: RequestMessage, process: str, bypass: bool = False
    ) -> dict:
        await self.connect()
        assert (
            self.socket is not None
        )  # will always not be None as the self.connect ensures there's a connection

        query_id = message.id
        await self.socket.send(message.WS_CBOR_DESCRIPTOR)

        async with self.lock:
            response = decode(await self.socket.recv())
        self.ref[response["id"]] = response

        # wait for ID to be returned
        while self.ref.get(query_id) is None:
            await asyncio.sleep(0)  # The await simply yields to the executor to avoid deadlocks

        # set the response and clean up
        response = self.ref[query_id]
        del self.ref[query_id]

        if bypass is False:
            self.check_response_for_error(response, process)
        return response

This ensures that the following test you provided passes:

    async def test_batch(self):
        async with asyncio.TaskGroup() as tg:
            tasks = [tg.create_task(self.connection.query("RETURN sleep(duration::from::millis($d)) or $p**2", dict(d=10 if num%2 else 0, p=num))) for num in range(5)]

        outcome = [t.result() for t in tasks]
        self.assertEqual([0, 1, 4, 9, 16], outcome)
        await self.connection.socket.close()

Also, spawning tasks is not free. This will require extra overhead in terms of memory and CPU. I'm willing to do some tests but I think starting with the lock reduces the complexity and potentially the latency. Getting the running loop and spawning tasks may be more overhead than acquiring a lock that is non-blocking.

@Anton-2
Copy link
Contributor Author

Anton-2 commented Feb 27, 2025

Yes, I should have been more precise in my wording of 'performance'... I agree that the lock mechanism is lightweight, this is not what concerns me.

The problem is that you are serialising query, and can't send another one to SurrealDB while the previous query is executing.

Ex, on a web server scenario : user A triggers a 'long' query, and waits for ex 2 seconds for it to complete. If user B triggers another query, even if the query is very quick, she won't see the results before 2 seconds because the connection is stil waiting for the return of query A.

SurrealDB can handle many concurrent queries, and if you send query A and query B, even on the same websocket, you'll receive the results for B quickly and A after two seconds.

My latests PR #167 enable this, let's discuss this further here if you want !

edit: I see that you address this with the new system in the previous comment. But this is sort of "busy waiting" here :

      # wait for ID to be returned
      while self.ref.get(query_id) is None:
          await asyncio.sleep(0)  # The await simply yields to the executor to avoid deadlocks

... and won't be able to handle live queries returning changes.

@maxwellflitton
Copy link
Contributor

@Anton-2 thanks you're solution is now being merged and it passes the tests, closing this issue now really appreciate your collaboration on this

@Anton-2
Copy link
Contributor Author

Anton-2 commented Mar 3, 2025

Great, thanks ! It was a pleasure for me too.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants