Skip to content

Add authentication support for MCP servers #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 67 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ A zero-configuration tool for automatically exposing FastAPI endpoints as Model
- **Preserving schemas** of your request models and response models
- **Preserve documentation** of all your endpoints, just as it is in Swagger
- **Extend** - Add custom MCP tools alongside the auto-generated ones
- **Authentication** - Secure your MCP server with various authentication methods

## Installation

Expand Down Expand Up @@ -49,6 +50,71 @@ add_mcp_server(

That's it! Your auto-generated MCP server is now available at `https://app.base.url/mcp`.

## Authentication

FastAPI-MCP supports various authentication methods to secure your MCP server:

### Simple Bearer Token Authentication

```python
from fastapi import FastAPI
from fastapi_mcp import add_mcp_server, AuthConfig

app = FastAPI()

# Configure authentication with a bearer token
auth_config = AuthConfig(
enabled=True,
bearer_token="your-secret-token" # This should be a secure token
)

# Add MCP server with authentication
mcp_server = add_mcp_server(
app,
mount_path="/mcp",
name="Authenticated MCP API",
auth_config=auth_config
)
```

### API Key Authentication

```python
from fastapi_mcp import AuthConfig

# Configure authentication with an API key in header
auth_config = AuthConfig(
enabled=True,
api_key="your-api-key",
api_key_name="X-API-Key",
api_key_in="header" # Can be "header" or "query"
)
```

### Custom Authentication

For more complex authentication scenarios, you can use a custom authentication function:

```python
from fastapi import Request
from fastapi_mcp import AuthConfig

# Define your custom authentication function
async def my_auth_function(request: Request) -> bool:
# Your authentication logic here
# Return True if authenticated, False otherwise
token = request.headers.get("Authorization", "").replace("Bearer ", "")
return token == "valid-token"

# Configure authentication with a custom function
auth_config = AuthConfig(
enabled=True,
custom_auth_func=my_auth_function
)
```

See the [examples/authenticated_server.py](examples/authenticated_server.py) and [examples/simple_bearer_auth.py](examples/simple_bearer_auth.py) for complete working examples.

## Advanced Usage

FastAPI-MCP provides several ways to customize and control how your MCP server is created and configured. Here are some advanced usage patterns:
Expand Down Expand Up @@ -160,4 +226,4 @@ MIT License. Copyright (c) 2024 Tadata Inc.

## About

Developed and maintained by [Tadata Inc.](https://github.com/tadata-org)
Developed and maintained by [Tadata Inc.](https://github.com/tadata-org)
231 changes: 231 additions & 0 deletions examples/authenticated_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
"""
Example of a FastAPI-MCP server with authentication.

This example demonstrates how to add authentication to an MCP server.
"""

from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
from typing import List, Optional, Dict
from datetime import datetime, timedelta
import jwt
from jwt.exceptions import PyJWTError

from fastapi_mcp import add_mcp_server, AuthConfig

# Create FastAPI app
app = FastAPI(
title="Authenticated MCP Example",
description="An example of an MCP server with authentication",
)

# Secret key for JWT token
SECRET_KEY = "a_very_secret_key_that_should_be_changed_in_production"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

# Fake users database
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "[email protected]",
"hashed_password": "fakehashedsecret",
"disabled": False,
},
"alice": {
"username": "alice",
"full_name": "Alice Wonderland",
"email": "[email protected]",
"hashed_password": "fakehashedsecret2",
"disabled": False,
},
}

# OAuth2 scheme
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")


# Models
class User(BaseModel):
username: str
email: Optional[str] = None
full_name: Optional[str] = None
disabled: Optional[bool] = None


class UserInDB(User):
hashed_password: str


class Token(BaseModel):
access_token: str
token_type: str


class TokenData(BaseModel):
username: Optional[str] = None


def verify_password(plain_password, hashed_password):
"""Verify password."""
# This is a fake hash verification - in a real app, use proper hashing
return plain_password == "secret" and hashed_password == "fakehashedsecret"


def get_user(db, username: str):
"""Get user from the database."""
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)


def authenticate_user(fake_db, username: str, password: str):
"""Authenticate a user."""
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user


def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
"""Create a JWT token."""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt


async def get_current_user(token: str = Depends(oauth2_scheme)):
"""Get the current user from JWT token."""
credentials_exception = HTTPException(
status_code=401,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except PyJWTError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user


async def get_current_active_user(current_user: User = Depends(get_current_user)):
"""Get the current active user."""
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user


@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
"""Login to get an access token."""
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=401,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}


@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
"""Get current user information."""
return current_user


# Sample items for a simple API
items_db: Dict[str, dict] = {
"1": {"id": "1", "name": "Foo", "description": "This is foo"},
"2": {"id": "2", "name": "Bar", "description": "This is bar"},
"3": {"id": "3", "name": "Baz", "description": "This is baz"},
}


@app.get("/items/", response_model=List[dict])
async def read_items(current_user: User = Depends(get_current_active_user)):
"""Get all items, requires authentication."""
return list(items_db.values())


@app.get("/items/{item_id}", response_model=dict)
async def read_item(item_id: str, current_user: User = Depends(get_current_active_user)):
"""Get a specific item, requires authentication."""
if item_id not in items_db:
raise HTTPException(status_code=404, detail="Item not found")
return items_db[item_id]


# Function to verify JWT tokens for MCP authentication
async def authenticate_mcp_request(request):
"""Custom authentication function for MCP server."""
try:
# Get token from Authorization header
auth_header = request.headers.get("Authorization", "")
if not auth_header or not auth_header.startswith("Bearer "):
return False

token = auth_header.replace("Bearer ", "")
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username = payload.get("sub")

# Verify user exists in database
if not username or username not in fake_users_db:
return False

# Check if user is disabled
if fake_users_db[username].get("disabled", False):
return False

return True
except Exception:
return False


# Add MCP server with authentication
auth_config = AuthConfig(
enabled=True,
# Define a custom auth function that validates JWT tokens
custom_auth_func=authenticate_mcp_request
)

mcp_server = add_mcp_server(
app,
mount_path="/mcp",
name="Authenticated MCP API",
description="MCP server with JWT authentication",
base_url="http://localhost:8000",
auth_config=auth_config
)


# Add a custom MCP tool
@mcp_server.tool()
async def get_item_count() -> int:
"""Get the total number of items in the database."""
return len(items_db)


if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8000)
Loading