diff --git a/community/recall/.env.Example b/community/recall/.env.Example new file mode 100644 index 00000000..20d3f4fb --- /dev/null +++ b/community/recall/.env.Example @@ -0,0 +1,8 @@ +RECALL_API_KEY= +GEMINI_API_KEY= +# Restack Cloud (Optional) + +# RESTACK_ENGINE_ID= +# RESTACK_ENGINE_API_KEY= +# RESTACK_ENGINE_ADDRESS= +# RESTACK_ENGINE_API_ADDRESS= diff --git a/community/recall/.gitignore b/community/recall/.gitignore new file mode 100644 index 00000000..cff65427 --- /dev/null +++ b/community/recall/.gitignore @@ -0,0 +1,6 @@ +__pycache__ +.pytest_cache +venv +.env +.vscode +poetry.lock diff --git a/community/recall/Dockerfile b/community/recall/Dockerfile new file mode 100644 index 00000000..7530be1f --- /dev/null +++ b/community/recall/Dockerfile @@ -0,0 +1,22 @@ +FROM python:3.12-slim + +WORKDIR /app + +RUN apt-get update && apt-get install -y + +RUN pip install poetry + +COPY pyproject.toml ./ + +COPY . . + +# Configure poetry to not create virtual environment +RUN poetry config virtualenvs.create false + +# Install dependencies +RUN poetry install --no-interaction --no-ansi + +# Expose port 80 +EXPOSE 80 + +CMD poetry run python -m src.services \ No newline at end of file diff --git a/community/recall/README.md b/community/recall/README.md new file mode 100644 index 00000000..0733a18d --- /dev/null +++ b/community/recall/README.md @@ -0,0 +1,162 @@ +# Restack AI - Recall Example + +This repository demonstrates how to build a production-ready AI backend using [Restack](https://docs.restack.io) and [Recall](https://docs.recall.ai). It combines Recall’s universal API for capturing meeting data in real-time with Restack’s framework to build resilient ai workflows to handle concurrency, retries, and scheduling at scale. + +## Overview + +This example shows how to reliably scale workflows on a local machine, capturing meeting audio, video, and metadata via Recall, then processing it using Restack. You can define concurrency limits, automatically retry failed steps, and focus on building robust logic without managing manual locks or queues. + +## Walkthrough Video + + +## Motivation + +When building AI meeting-related workflows, you want to handle real-time data ingestion (Recall) along with safe, scalable processing (Restack). Restack ensures steps that call LLM APIs or other services adhere to concurrency constraints, automatically queueing and retrying operations to maintain reliability. + +### Workflow Steps + +Below is an example of 50 workflows in parallel, each using Recall data and calling LLM functions that are rate-limited to 1 concurrent call per second. + +| Step | Workflow 1 | Workflow 2 | ... | Workflow 50 | +| ---- | ---------- | ---------- | --- | ----------- | +| 1 | Recall | Recall | ... | Recall | +| 2 | Recall | Recall | ... | Recall | +| 3 | LLM | LLM | ... | LLM | + +### Rate Limit Management + +When processing data from Recall in parallel, you might rely on LLM or other external services. Managing concurrency is crucial: + +1. **Task Queue**: Traditional approach using Celery or RabbitMQ. +2. **Rate Limiting Middleware**: Custom logic to hold requests in a queue. +3. **Semaphore or Locking**: Single shared lock to ensure serial processing. + +### With Restack + +Restack automates rate-limit management and concurrency controls: + +```python +client.start_service( + task_queue="llm", + functions=[llm_generate, llm_evaluate], + options=ServiceOptions( + rate_limit=1, + max_concurrent_function_runs=1 + ) +) +``` + +Combine your Recall steps (fetch meeting transcripts, metadata, etc.) with LLM calls, and Restack ensures each step is handled in order without manual synchronization. + +## On Restack UI + +You can see how long each workflow or step stayed in the queue and the execution details: + +![Parent Workflow](./ui-parent.png) + +For each child workflow, you can see how many retries occurred and how long each function took to execute: + +![Child Workflow](./ui-child.png) + +## Prerequisites + +- Python 3.10 or higher +- Poetry (for dependency management) +- Docker (for running Restack) +- Recall account and API key +- (Optional) Gemini LLM API key + +## Prepare Environment + +Create a `.env` file from `.env.Example`: + +``` +RECALL_API_KEY= +GEMINI_API_KEY= +... +``` + +## Start Restack + +```bash +docker run -d --pull always --name restack -p 5233:5233 -p 6233:6233 -p 7233:7233 ghcr.io/restackio/restack:main +``` + +## Start Python Shell + +```bash +poetry env use 3.10 && poetry shell +``` + +## Install Dependencies + +```bash +poetry install +poetry env info +``` + +## Development + +```bash +poetry run dev +``` + +This will start the Restack services locally, using your configured environment. + +## Run Workflows + +### From UI + +Access http://localhost:5233 to see your workflows. Click “Run” to start them. + +![Run workflows from UI](./ui-endpoints.png) + +### From API + +Use the generated endpoints for your workflows: + +`POST http://localhost:6233/api/workflows/ChildWorkflow` + +or + +`POST http://localhost:6233/api/workflows/ExampleWorkflow` + +### From CLI + +```bash +poetry run schedule +``` + +Triggers `ChildWorkflow`. + +```bash +poetry run scale +``` + +Triggers `ExampleWorkflow` 50 times in parallel. + +```bash +poetry run interval +``` + +Schedules `ChildWorkflow` every second. + +## Deploy on Restack Cloud + +Create an account at [https://console.restack.io](https://console.restack.io). You can deploy your workflows to Restack Cloud for automated scaling and monitoring. + +## Project Structure + +- `src/` + - `client.py`: Initializes Restack client + - `functions/`: Contains function definitions + - `workflows/`: Contains workflow definitions (including steps that leverage Recall data) + - `services.py`: Sets up Restack services +- `schedule_workflow.py`: Scheduling a single workflow +- `schedule_interval.py`: Scheduling a workflow repeatedly +- `schedule_scale.py`: Scheduling 50 workflows at once +- `.env.Example`: Environment variable template for Recall and Gemini keys + +# Conclusion + +With Recall providing real-time meeting data and Restack handling durable, concurrent workflows, you can build a powerful AI-backed system for processing, summarizing, and analyzing meetings at scale. This setup dramatically reduces operational overhead, allowing you to focus on delivering meaningful product features without worrying about rate limits or concurrency. diff --git a/community/recall/pyproject.toml b/community/recall/pyproject.toml new file mode 100644 index 00000000..e1250dd2 --- /dev/null +++ b/community/recall/pyproject.toml @@ -0,0 +1,34 @@ +# Project metadata +[tool.poetry] +name = "community_recall" +version = "0.0.1" +description = "A simple example to show how to build a resilient backend with Recall to transcribe meetings" +authors = [ + "Restack Team ", +] +readme = "README.md" +packages = [{include = "src"}] + +[tool.poetry.dependencies] +python = ">=3.10,<4.0" +restack-ai = "^0.0.52" +watchfiles = "^1.0.0" +google-generativeai = "0.8.3" +pydantic = "^2.10.5" +requests = "^2.32.3" + +[tool.poetry.dev-dependencies] +pytest = "6.2" # Optional: Add if you want to include tests in your example + +# Build system configuration +[build-system] +requires = ["poetry-core"] +build-backend = "poetry.core.masonry.api" + +# CLI command configuration +[tool.poetry.scripts] +dev = "src.services:watch_services" +services = "src.services:run_services" +workflow = "schedule_workflow:run_schedule_workflow" +interval = "schedule_interval:run_schedule_interval" +scale = "schedule_scale:run_schedule_scale" diff --git a/community/recall/schedule_workflow.py b/community/recall/schedule_workflow.py new file mode 100644 index 00000000..fb2d3917 --- /dev/null +++ b/community/recall/schedule_workflow.py @@ -0,0 +1,26 @@ +import asyncio +import time +from restack_ai import Restack + +async def main(): + + client = Restack() + + workflow_id = f"{int(time.time() * 1000)}-ChildWorkflow" + run_id = await client.schedule_workflow( + workflow_name="ChildWorkflow", + workflow_id=workflow_id + ) + + await client.get_workflow_result( + workflow_id=workflow_id, + run_id=run_id + ) + + exit(0) + +def run_schedule_workflow(): + asyncio.run(main()) + +if __name__ == "__main__": + run_schedule_workflow() \ No newline at end of file diff --git a/community/recall/src/__init__.py b/community/recall/src/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/community/recall/src/client.py b/community/recall/src/client.py new file mode 100644 index 00000000..3bd8324a --- /dev/null +++ b/community/recall/src/client.py @@ -0,0 +1,21 @@ +import os +from restack_ai import Restack +from restack_ai.restack import CloudConnectionOptions +from dotenv import load_dotenv + +# Load environment variables from a .env file +load_dotenv() + + +engine_id = os.getenv("RESTACK_ENGINE_ID") +address = os.getenv("RESTACK_ENGINE_ADDRESS") +api_key = os.getenv("RESTACK_ENGINE_API_KEY") +api_address = os.getenv("RESTACK_ENGINE_API_ADDRESS") + +connection_options = CloudConnectionOptions( + engine_id=engine_id, + address=address, + api_key=api_key, + api_address=api_address +) +client = Restack(connection_options) diff --git a/community/recall/src/functions/__init__.py b/community/recall/src/functions/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/community/recall/src/functions/create_meet_bot.py b/community/recall/src/functions/create_meet_bot.py new file mode 100644 index 00000000..cb8fe03e --- /dev/null +++ b/community/recall/src/functions/create_meet_bot.py @@ -0,0 +1,43 @@ +from restack_ai.function import function, FunctionFailure, log +from pydantic import BaseModel +import requests +from typing import Optional +import os + +class CreateMeetBotInput(BaseModel): + meeting_url: str = "https://meet.google.com/jgv-jvev-jhe" + bot_name: Optional[str] = "Recall Bot" + transcription_options: Optional[dict] = { "provider": "meeting_captions" } + +@function.defn() +async def create_meet_bot(input: CreateMeetBotInput) -> dict: + try: + headers = { + "Authorization": f"Token {os.getenv('RECALL_API_KEY')}", + "Content-Type": "application/json" + } + + payload = { + "meeting_url": input.meeting_url, + "transcription_options": input.transcription_options, + "bot_name": input.bot_name, + "google_meet": { + "login_required": False + } + } + + response = requests.post( + "https://us-west-2.recall.ai/api/v1/bot", + headers=headers, + json=payload + ) + + response.raise_for_status() + return response.json() + + except requests.exceptions.RequestException as e: + log.error(f"Failed to create meet bot: {e}") + raise FunctionFailure(f"Failed to create meet bot: {e}", non_retryable=True) from e + except Exception as e: + log.error(f"Unexpected error creating meet bot: {e}") + raise FunctionFailure(f"Unexpected error: {e}", non_retryable=True) from e \ No newline at end of file diff --git a/community/recall/src/functions/get_bot_transcript.py b/community/recall/src/functions/get_bot_transcript.py new file mode 100644 index 00000000..3a4449d7 --- /dev/null +++ b/community/recall/src/functions/get_bot_transcript.py @@ -0,0 +1,31 @@ +from restack_ai.function import function, FunctionFailure, log +from pydantic import BaseModel +import requests +from typing import Optional +import os + +class GetBotTranscriptInput(BaseModel): + bot_id: str + +@function.defn() +async def get_bot_transcript(input: GetBotTranscriptInput) -> dict: + try: + headers = { + "Authorization": f"Token {os.getenv('RECALL_API_KEY')}", + "Content-Type": "application/json" + } + + response = requests.get( + f"https://us-west-2.recall.ai/api/v1/bot/{input.bot_id}/transcript/", + headers=headers + ) + + response.raise_for_status() + return {"segments": response.json()} + + except requests.exceptions.RequestException as e: + log.error(f"Failed to get bot transcript: {e}") + raise FunctionFailure(f"Failed to get bot transcript: {e}", non_retryable=True) from e + except Exception as e: + log.error(f"Unexpected error getting bot transcript: {e}") + raise FunctionFailure(f"Unexpected error: {e}", non_retryable=True) from e \ No newline at end of file diff --git a/community/recall/src/functions/list_bots.py b/community/recall/src/functions/list_bots.py new file mode 100644 index 00000000..4c62c605 --- /dev/null +++ b/community/recall/src/functions/list_bots.py @@ -0,0 +1,39 @@ +from restack_ai.function import function, FunctionFailure, log +from pydantic import BaseModel +import requests +from typing import Optional +import os + +class ListBotsInput(BaseModel): + limit: Optional[int] = None + offset: Optional[int] = None + +@function.defn() +async def list_bots(input: ListBotsInput) -> dict: + try: + headers = { + "Authorization": f"Token {os.getenv('RECALL_API_KEY')}", + "Content-Type": "application/json" + } + + params = {} + if input.limit is not None: + params["limit"] = input.limit + if input.offset is not None: + params["offset"] = input.offset + + response = requests.get( + "https://us-west-2.recall.ai/api/v1/bot/", + headers=headers, + params=params + ) + + response.raise_for_status() + return response.json() + + except requests.exceptions.RequestException as e: + log.error(f"Failed to list bots: {e}") + raise FunctionFailure(f"Failed to list bots: {e}", non_retryable=True) from e + except Exception as e: + log.error(f"Unexpected error listing bots: {e}") + raise FunctionFailure(f"Unexpected error: {e}", non_retryable=True) from e \ No newline at end of file diff --git a/community/recall/src/functions/retrieve_bot.py b/community/recall/src/functions/retrieve_bot.py new file mode 100644 index 00000000..23495031 --- /dev/null +++ b/community/recall/src/functions/retrieve_bot.py @@ -0,0 +1,31 @@ +from restack_ai.function import function, FunctionFailure, log +from pydantic import BaseModel +import requests +from typing import Optional +import os + +class RetrieveBotInput(BaseModel): + bot_id: str + +@function.defn() +async def retrieve_bot(input: RetrieveBotInput) -> dict: + try: + headers = { + "Authorization": f"Token {os.getenv('RECALL_API_KEY')}", + "Content-Type": "application/json" + } + + response = requests.get( + f"https://api.recall.ai/api/v1/bot/{input.bot_id}", + headers=headers + ) + + response.raise_for_status() + return response.json() + + except requests.exceptions.RequestException as e: + log.error(f"Failed to retrieve bot: {e}") + raise FunctionFailure(f"Failed to retrieve bot: {e}", non_retryable=True) from e + except Exception as e: + log.error(f"Unexpected error retrieving bot: {e}") + raise FunctionFailure(f"Unexpected error: {e}", non_retryable=True) from e \ No newline at end of file diff --git a/community/recall/src/functions/summarize_transcript.py b/community/recall/src/functions/summarize_transcript.py new file mode 100644 index 00000000..1a33bfeb --- /dev/null +++ b/community/recall/src/functions/summarize_transcript.py @@ -0,0 +1,24 @@ +from restack_ai.function import function, log +from dataclasses import dataclass +import google.generativeai as genai + +import os + +@dataclass +class SummarizeTranscriptInput: + transcript: str + +@function.defn() +async def summarize_transcript(input: SummarizeTranscriptInput) -> str: + try: + log.info("summarize_transcript function started", input=input) + genai.configure(api_key=os.environ.get("GEMINI_API_KEY")) + model = genai.GenerativeModel("gemini-1.5-flash") + + prompt = f"Please provide a concise summary of this meeting transcript: {input.transcript}" + response = model.generate_content(prompt) + log.info("summarize_transcript function completed", response=response.text) + return response.text + except Exception as e: + log.error("summarize_transcript function failed", error=e) + raise e diff --git a/community/recall/src/services.py b/community/recall/src/services.py new file mode 100644 index 00000000..41c97219 --- /dev/null +++ b/community/recall/src/services.py @@ -0,0 +1,53 @@ +import asyncio +import os +from watchfiles import run_process +import webbrowser + +from src.client import client +from restack_ai.restack import ServiceOptions + +from src.functions.create_meet_bot import create_meet_bot +from src.functions.get_bot_transcript import get_bot_transcript +from src.functions.retrieve_bot import retrieve_bot +from src.workflows.create_meet_bot import CreateMeetBotWorkflow +from src.workflows.summarize_meeting import SummarizeMeetingWorkflow +from src.functions.list_bots import list_bots +from src.functions.summarize_transcript import summarize_transcript + +async def main(): + + await asyncio.gather( + client.start_service( + functions=[create_meet_bot, get_bot_transcript, retrieve_bot, list_bots], + options=ServiceOptions( + max_concurrent_workflow_runs=1000 + ), + task_queue='recall', + ), + client.start_service( + workflows=[SummarizeMeetingWorkflow, CreateMeetBotWorkflow], + ), + client.start_service( + functions=[summarize_transcript], + task_queue='gemini', + options=ServiceOptions( + rate_limit=1, + max_concurrent_function_runs=5 + ), + ) + ) + +def run_services(): + try: + asyncio.run(main()) + except KeyboardInterrupt: + print("Service interrupted by user. Exiting gracefully.") + +def watch_services(): + watch_path = os.getcwd() + print(f"Watching {watch_path} and its subdirectories for changes...") + webbrowser.open("http://localhost:5233") + run_process(watch_path, recursive=True, target=run_services) + +if __name__ == "__main__": + run_services() \ No newline at end of file diff --git a/community/recall/src/workflows/__init__.py b/community/recall/src/workflows/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/community/recall/src/workflows/create_meet_bot.py b/community/recall/src/workflows/create_meet_bot.py new file mode 100644 index 00000000..da40b50e --- /dev/null +++ b/community/recall/src/workflows/create_meet_bot.py @@ -0,0 +1,13 @@ +from pydantic import BaseModel +from restack_ai.workflow import workflow, import_functions, log + +with import_functions(): + from src.functions.create_meet_bot import create_meet_bot, CreateMeetBotInput + +@workflow.defn() +class CreateMeetBotWorkflow: + @workflow.run + async def run(self, input: CreateMeetBotInput): + log.info("CreateMeetBotWorkflow started") + bot = await workflow.step(create_meet_bot, input, task_queue='recall') + return bot \ No newline at end of file diff --git a/community/recall/src/workflows/summarize_meeting.py b/community/recall/src/workflows/summarize_meeting.py new file mode 100644 index 00000000..2d59a57d --- /dev/null +++ b/community/recall/src/workflows/summarize_meeting.py @@ -0,0 +1,55 @@ +from pydantic import BaseModel +from restack_ai.workflow import workflow, import_functions, log +import json + +with import_functions(): + from src.functions.get_bot_transcript import get_bot_transcript, GetBotTranscriptInput + from src.functions.list_bots import list_bots, ListBotsInput + from src.functions.summarize_transcript import summarize_transcript, SummarizeTranscriptInput + from src.workflows.create_meet_bot import CreateMeetBotWorkflow + from src.functions.create_meet_bot import CreateMeetBotInput + +class SummarizeMeetingInput(BaseModel): + meeting_url: str + +@workflow.defn() +class SummarizeMeetingWorkflow: + @workflow.run + async def run(self, input: SummarizeMeetingInput): + log.info("SummarizeMeetingWorkflow started") + bots = await workflow.step(list_bots, ListBotsInput(), task_queue='recall') + existing_bot = None + meeting_id = input.meeting_url.split('/')[-1] + log.info("Meeting ID: ", meeting_id=meeting_id) + + for bot_item in bots["results"]: + is_done = any(status["code"] == "done" for status in bot_item["status_changes"]) + if meeting_id == bot_item["meeting_url"]["meeting_id"] and is_done: + existing_bot = bot_item + break + + if existing_bot: + log.info("Using existing bot") + transcript = await workflow.step( + get_bot_transcript, + GetBotTranscriptInput(bot_id=existing_bot["id"]), + task_queue='recall' + ) + transcript_str = json.dumps(transcript) + summary = await workflow.step( + summarize_transcript, + SummarizeTranscriptInput(transcript=transcript_str), + task_queue='gemini' + ) + return { + "bot": existing_bot, + "transcript": transcript, + "summary": summary + } + else: + log.info("No existing bot found, create a new bot") + + return { + "message": "No existing bot found, create a new bot", + "meeting_url": input.meeting_url + } diff --git a/community/recall/ui-child.png b/community/recall/ui-child.png new file mode 100644 index 00000000..4fe050a7 Binary files /dev/null and b/community/recall/ui-child.png differ diff --git a/community/recall/ui-endpoints.png b/community/recall/ui-endpoints.png new file mode 100644 index 00000000..c2f753aa Binary files /dev/null and b/community/recall/ui-endpoints.png differ