Warning
This python package is currently in beta and will likely change. It is not yet ready for production use.
In addition to supporting OpenAI API compatible agents, Sentient Chat supports a custom, open source event system for agent responses. These events can be rendered in Sentient Chat to provide a richer user experience. This particularly useful for streaming responses from an AI agent, when you might want to show the agent's work while the response is being generated, rather than having the user wait for the final response. This python package provides an agent framework that can be used to build agents that serve Sentient Chat events.
Examples of agents that use this framework/package can be found here. A client for testing agents built with this framework is available here.
pip install sentient-agent-framework
The simplest way to use this framework is to import and use the AbstractAgent
class and the DefaultServer
class.
The AbstractAgent
class is lightweight and extensible. To use it, simply subclass the class and implement the assist()
method. Use the ResponseHandler
object passed to the assist()
method to emit events to the client.
The DefaultServer
class is designed to be used with the AbstractAgent
class. A concrete implementation of the AbstractAgent
class is passed into the DefaultServer
constructor. The DefaultServer
provides SSE server with /assist
endpoint and automatically streams events emitted in the assist()
method to the client.
Note
The snippet below comes from the Sentient Agent Framework Examples repository.
import logging
import os
from dotenv import load_dotenv
from src.search_agent.providers.model_provider import ModelProvider
from src.search_agent.providers.search_provider import SearchProvider
from sentient_agent_framework import (
AbstractAgent,
DefaultServer,
Session,
Query,
ResponseHandler)
from typing import AsyncIterator
load_dotenv()
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
class SearchAgent(AbstractAgent):
def __init__(
self,
name: str
):
super().__init__(name)
model_api_key = os.getenv("MODEL_API_KEY")
if not model_api_key:
raise ValueError("MODEL_API_KEY is not set")
self._model_provider = ModelProvider(api_key=model_api_key)
search_api_key = os.getenv("TAVILY_API_KEY")
if not search_api_key:
raise ValueError("TAVILY_API_KEY is not set")
self._search_provider = SearchProvider(api_key=search_api_key)
# Implement the assist method as required by the AbstractAgent class
async def assist(
self,
session: Session,
query: Query,
response_handler: ResponseHandler
):
"""Search the internet for information."""
# Search for information
await response_handler.emit_text_block(
"SEARCH", "Searching internet for results..."
)
search_results = await self._search_provider.search(query.prompt)
if len(search_results["results"]) > 0:
# Use response handler to emit JSON to the client
await response_handler.emit_json(
"SOURCES", {"results": search_results["results"]}
)
if len(search_results["images"]) > 0:
# Use response handler to emit JSON to the client
await response_handler.emit_json(
"IMAGES", {"images": search_results["images"]}
)
# Process search results
# Use response handler to create a text stream to stream the final
# response to the client
final_response_stream = response_handler.create_text_stream(
"FINAL_RESPONSE"
)
async for chunk in self.__process_search_results(search_results["results"]):
# Use the text stream to emit chunks of the final response to the client
await final_response_stream.emit_chunk(chunk)
# Mark the text stream as complete
await final_response_stream.complete()
# Mark the response as complete
await response_handler.complete()
async def __process_search_results(
self,
search_results: dict
) -> AsyncIterator[str]:
"""Process the search results."""
process_search_results_query = f"Summarise the following search results: {search_results}"
async for chunk in self._model_provider.query_stream(process_search_results_query):
yield chunk
if __name__ == "__main__":
# Create an instance of a SearchAgent
agent = SearchAgent(name="Search Agent")
# Create a server to handle requests to the agent
server = DefaultServer(agent)
# Run the server
server.run()
Whether using the AbstractAgent
or the DefaultResponseHandler
, a ResponseHandler
is created for every agent query and is used to emit events to the client.
Text events are used to send single, complete messages to the client:
await response_handler.emit_text_block(
"PLAN", "Rephrasing user query..."
)
JSON events are used to send JSON objects to the client:
await response_handler.emit_json(
"SOURCES", {"results": search_results["results"]}
)
Error events are used to send error messages to the client:
await response_handler.emit_error(
"ERROR", {"message": "An error occurred"}
)
At the end of a response, response_handler.complete()
is called to signal the end of the response (this will emit a DoneEvent
using the Hook
):
await response_handler.complete()
To stream a longer response one chunk at a time, use the response_handler.create_text_stream
method. This returns a StreamEventEmitter
that can be used to stream text to the client using the emit_chunk
method:
final_response_stream = response_handler.create_text_stream(
"FINAL_RESPONSE"
)
for chunk in self.__process_search_results(search_results["results"]):
await final_response_stream.emit_chunk(chunk)
At the end of the stream, final_response_stream.complete()
is called to signal the end of the stream (this will emit a TextChunkEvent
with is_complete=True
):
await final_response_stream.complete()