Skip to content

Feat: Add agent worker And Claude Support #66

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 34 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
a1422dc
Feat: Add agent-worker
baryhuang Mar 30, 2025
88f6354
update config
baryhuang Mar 31, 2025
786ea96
add shutdown for agent_worker
baryhuang Mar 31, 2025
3db1830
cleanup
baryhuang Mar 31, 2025
bd9a232
add anthropic path
baryhuang Mar 31, 2025
f81653e
cleanup
baryhuang Mar 31, 2025
74206e0
cleanup
baryhuang Mar 31, 2025
4636082
cleanup
baryhuang Mar 31, 2025
e9756dc
cleanup
baryhuang Mar 31, 2025
e06429e
add support for returning images
baryhuang Apr 3, 2025
fd68e9d
add
baryhuang Apr 3, 2025
853771e
cleanup
baryhuang Apr 3, 2025
72ade83
cleanup
baryhuang Apr 3, 2025
76c4780
fix passing of system prompt
baryhuang Apr 3, 2025
6b75133
cleanup
baryhuang Apr 3, 2025
0424e87
cleanup
baryhuang Apr 3, 2025
a8e6e77
initial cleanup of file structure
baryhuang Apr 4, 2025
531698b
initial cleanup of openai vs claude
baryhuang Apr 4, 2025
36fdc10
break handlers into files
baryhuang Apr 4, 2025
8d9296f
cleanup anthropic handler to break down long loop
baryhuang Apr 4, 2025
79c88db
update to use non-beta anthropic api
baryhuang Apr 4, 2025
3390d01
cleanup anthropic chat completion and cleanup
baryhuang Apr 4, 2025
d47aaf8
enabled thinking support for Claude
baryhuang Apr 4, 2025
20ac56a
added basic thinking support for Claude
baryhuang Apr 4, 2025
3617411
add customer logger
baryhuang Apr 4, 2025
59b1c03
Update mcp_bridge/agent_worker/run.py
baryhuang Apr 4, 2025
1d7c771
add script entrypoint for agent runner
SecretiveShell Apr 4, 2025
8df7ab0
add more logs and fix the process tool loop with thinking blocks
baryhuang Apr 5, 2025
62029a6
switch to use Claude beta api with latest computer-use
baryhuang Apr 5, 2025
9669d40
added support for calling AWS Bedrock
baryhuang Apr 6, 2025
99d1dda
cleanup logggin
baryhuang Apr 6, 2025
b4ada00
add rate limit handling
baryhuang Apr 6, 2025
a1c6d42
re organized imports and typing
baryhuang Apr 6, 2025
f77af61
Update mcp_bridge/anthropic_clients/genericClient.py
baryhuang Apr 6, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -165,4 +165,11 @@ cython_debug/
## custom
commands.md
compose.yml
config.json
config.json

screenshots/*
.DS_Store

.cursor
agent_worker_task.json
logs
7 changes: 7 additions & 0 deletions agent_worker_task.json.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"task": "Hi Olivia, tell me what's on your computer screen. You should include a response of 'the task has been completed' when the task is complete. Don't ask for what to do next.",
"model": "claude-3-7-sonnet-20250219",
"system_prompt": "You should include a response of 'the task has been completed' when the task is complete.\n\n You should include a response of 'the task has been completed' when you found yourself stuck in a loop with no progress.",
"verbose": true,
"max_iterations": 10
}
168 changes: 168 additions & 0 deletions mcp_bridge/agent_worker/agent_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
#!/usr/bin/env python
"""Agent worker module that provides standalone command-line agent execution"""

import asyncio
import os
from typing import Dict, List, Optional, Tuple

from loguru import logger

from mcp_bridge.agent_worker.utils import is_anthropic_model
from mcp_bridge.agent_worker.anthropic_handler import process_with_anthropic
from mcp_bridge.agent_worker.openai_handler import process_with_openai
from mcp_bridge.agent_worker.customer_logs import get_logger, CustomerMessageLogger
from mcp_bridge.mcp_clients.McpClientManager import ClientManager
from mcp_bridge.utils import force_exit
from lmos_openai_types import (
ChatCompletionRequestMessage,
ChatCompletionRequestSystemMessage,
ChatCompletionRequestUserMessage,
)

class AgentWorker:
"""A standalone worker that processes tasks using MCP clients and LLM completions"""

def __init__(
self,
task: str,
model: str = "anthropic.claude-3-haiku-20240307-v1:0",
system_prompt: Optional[str] = None,
max_iterations: int = 10,
session_id: Optional[str] = None,
):
self.task = task
self.model = model
self.system_prompt = system_prompt or "You are a helpful assistant that completes tasks using available tools. Use the tools provided to you to help complete the user's task."
self.messages: List[ChatCompletionRequestMessage] = []
self.max_iterations = max_iterations
self.thinking_blocks: List[Dict[str, object]] = []
self.session_id = session_id
# Initialize customer message logger
self.customer_logger: CustomerMessageLogger = get_logger(initialize=True, session_id=self.session_id)
self.customer_logger.log_system_event("initialization", {
"task": task,
"model": model,
"max_iterations": max_iterations
})

async def initialize(self) -> None:
"""Initialize the MCP clients"""
logger.info("Initializing MCP clients...")
# Start the ClientManager to load all available MCP clients
await ClientManager.initialize()

# Wait a moment for clients to start up
logger.info("Waiting for MCP clients to initialize...")
await asyncio.sleep(2)

# Check that at least one client is ready
max_attempts = 3
for attempt in range(max_attempts):
clients = ClientManager.get_clients()
ready_clients = [name for name, client in clients if client.session is not None]

if ready_clients:
logger.info(f"MCP clients ready: {', '.join(ready_clients)}")
# Log available clients to customer log
self.customer_logger.log_system_event("clients_ready", {
"clients": ready_clients
})
break

logger.warning(f"No MCP clients ready yet, waiting (attempt {attempt+1}/{max_attempts})...")
await asyncio.sleep(2)

# Initialize the conversation with system and user messages
self.messages = [
ChatCompletionRequestSystemMessage(
role="system",
content=self.system_prompt
),
ChatCompletionRequestUserMessage(
role="user",
content=self.task
)
]

# Log initial messages to customer log
self.customer_logger.log_message("system", self.system_prompt)
self.customer_logger.log_message("user", self.task)

async def shutdown(self) -> None:
"""Shutdown all MCP clients"""
logger.info("Shutting down MCP clients...")
# Log shutdown event
self.customer_logger.log_system_event("shutdown", {
"summary": self.customer_logger.get_summary()
})
# Exit the program
force_exit(0)

Comment on lines +98 to +100
Copy link
Preview

Copilot AI Apr 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using force_exit here immediately terminates the process, which may bypass important cleanup steps; consider implementing a more graceful shutdown mechanism.

Suggested change
# Exit the program
force_exit(0)
# Stop the event loop
loop = asyncio.get_event_loop()
loop.stop()
loop.close()

Copilot uses AI. Check for mistakes.

async def run_agent_loop(self) -> List[ChatCompletionRequestMessage]:
"""Run the agent loop to process the task until completion"""
await self.initialize()
logger.info("Starting agent loop...")
self.customer_logger.log_system_event("agent_loop_start", {})

# Keep running until the task is complete
for iteration in range(self.max_iterations):
logger.info(f"Agent iteration {iteration+1}/{self.max_iterations}")
self.customer_logger.log_system_event("iteration_start", {
"iteration": iteration + 1,
"max_iterations": self.max_iterations
})

# Process with either Anthropic or OpenAI API based on model name
task_complete = False
if is_anthropic_model(self.model):
# Use Anthropic processing
_, updated_messages, thinking_blocks, task_complete = await process_with_anthropic(
messages=self.messages,
model=self.model,
system_prompt=self.system_prompt,
thinking_blocks=self.thinking_blocks,
customer_logger=self.customer_logger
)
self.messages = updated_messages

# Check for duplicate thinking blocks before adding
# ThinkingBlock from Anthropic has a signature property
existing_signatures = {block.signature for block in self.thinking_blocks
if block.signature}
unique_blocks = [block for block in thinking_blocks
if not block.signature or block.signature not in existing_signatures]
self.thinking_blocks.extend(unique_blocks)

# Log thinking blocks to customer log
for block in unique_blocks:
if block.get("thinking"):
self.customer_logger.log_thinking(
block["thinking"],
block.get("signature")
)
else:
# Use OpenAI processing
updated_messages, task_complete = await process_with_openai(
messages=self.messages,
model=self.model,
customer_logger=self.customer_logger
)
self.messages = updated_messages

# If task is complete, return the messages
if task_complete:
self.customer_logger.log_system_event("task_complete", {
"iteration": iteration + 1,
"summary": self.customer_logger.get_summary()
})
return self.messages

# If we reached max iterations without completion
logger.warning(f"Reached maximum iterations ({self.max_iterations}) without task completion")
self.customer_logger.log_system_event("max_iterations_reached", {
"max_iterations": self.max_iterations,
"summary": self.customer_logger.get_summary()
})

# Return final messages for inspection
return self.messages
Loading