mirror of
https://github.com/github/awesome-copilot.git
synced 2026-02-20 10:25:13 +00:00
Adding copilot-sdk stuff
This commit is contained in:
806
instructions/copilot-sdk-python.instructions.md
Normal file
806
instructions/copilot-sdk-python.instructions.md
Normal file
@@ -0,0 +1,806 @@
|
||||
---
|
||||
applyTo: "**.py, pyproject.toml, setup.py"
|
||||
description: "This file provides guidance on building Python applications using GitHub Copilot SDK."
|
||||
name: "GitHub Copilot SDK Python Instructions"
|
||||
---
|
||||
|
||||
## Core Principles
|
||||
|
||||
- The SDK is in technical preview and may have breaking changes
|
||||
- Requires Python 3.9 or later
|
||||
- Requires GitHub Copilot CLI installed and in PATH
|
||||
- Uses async/await patterns throughout (asyncio)
|
||||
- Supports both async context managers and manual lifecycle management
|
||||
- Type hints provided for better IDE support
|
||||
|
||||
## Installation
|
||||
|
||||
Always install via pip:
|
||||
|
||||
```bash
|
||||
pip install copilot-sdk
|
||||
# or with poetry
|
||||
poetry add copilot-sdk
|
||||
# or with uv
|
||||
uv add copilot-sdk
|
||||
```
|
||||
|
||||
## Client Initialization
|
||||
|
||||
### Basic Client Setup
|
||||
|
||||
```python
|
||||
from copilot import CopilotClient
|
||||
|
||||
async with CopilotClient() as client:
|
||||
# Use client...
|
||||
pass
|
||||
```
|
||||
|
||||
### Client Configuration Options
|
||||
|
||||
When creating a CopilotClient, use a dict with these keys:
|
||||
|
||||
- `cli_path` - Path to CLI executable (default: "copilot" from PATH or COPILOT_CLI_PATH env var)
|
||||
- `cli_url` - URL of existing CLI server (e.g., "localhost:8080"). When provided, client won't spawn a process
|
||||
- `port` - Server port (default: 0 for random)
|
||||
- `use_stdio` - Use stdio transport instead of TCP (default: True)
|
||||
- `log_level` - Log level (default: "info")
|
||||
- `auto_start` - Auto-start server (default: True)
|
||||
- `auto_restart` - Auto-restart on crash (default: True)
|
||||
- `cwd` - Working directory for the CLI process (default: os.getcwd())
|
||||
- `env` - Environment variables for the CLI process (dict)
|
||||
|
||||
### Manual Server Control
|
||||
|
||||
For explicit control:
|
||||
|
||||
```python
|
||||
client = CopilotClient({"auto_start": False})
|
||||
await client.start()
|
||||
# Use client...
|
||||
await client.stop()
|
||||
```
|
||||
|
||||
Use `force_stop()` when `stop()` takes too long.
|
||||
|
||||
## Session Management
|
||||
|
||||
### Creating Sessions
|
||||
|
||||
Use a dict for SessionConfig:
|
||||
|
||||
```python
|
||||
session = await client.create_session({
|
||||
"model": "gpt-5",
|
||||
"streaming": True,
|
||||
"tools": [...],
|
||||
"system_message": { ... },
|
||||
"available_tools": ["tool1", "tool2"],
|
||||
"excluded_tools": ["tool3"],
|
||||
"provider": { ... }
|
||||
})
|
||||
```
|
||||
|
||||
### Session Config Options
|
||||
|
||||
- `session_id` - Custom session ID (str)
|
||||
- `model` - Model name ("gpt-5", "claude-sonnet-4.5", etc.)
|
||||
- `tools` - Custom tools exposed to the CLI (list[Tool])
|
||||
- `system_message` - System message customization (dict)
|
||||
- `available_tools` - Allowlist of tool names (list[str])
|
||||
- `excluded_tools` - Blocklist of tool names (list[str])
|
||||
- `provider` - Custom API provider configuration (BYOK) (ProviderConfig)
|
||||
- `streaming` - Enable streaming response chunks (bool)
|
||||
- `mcp_servers` - MCP server configurations (list)
|
||||
- `custom_agents` - Custom agent configurations (list)
|
||||
- `config_dir` - Config directory override (str)
|
||||
- `skill_directories` - Skill directories (list[str])
|
||||
- `disabled_skills` - Disabled skills (list[str])
|
||||
- `on_permission_request` - Permission request handler (callable)
|
||||
|
||||
### Resuming Sessions
|
||||
|
||||
```python
|
||||
session = await client.resume_session("session-id", {
|
||||
"tools": [my_new_tool]
|
||||
})
|
||||
```
|
||||
|
||||
### Session Operations
|
||||
|
||||
- `session.session_id` - Get session identifier (str)
|
||||
- `await session.send({"prompt": "...", "attachments": [...]})` - Send message, returns str (message ID)
|
||||
- `await session.send_and_wait({"prompt": "..."}, timeout=60.0)` - Send and wait for idle, returns SessionEvent | None
|
||||
- `await session.abort()` - Abort current processing
|
||||
- `await session.get_messages()` - Get all events/messages, returns list[SessionEvent]
|
||||
- `await session.destroy()` - Clean up session
|
||||
|
||||
## Event Handling
|
||||
|
||||
### Event Subscription Pattern
|
||||
|
||||
ALWAYS use asyncio events or futures for waiting on session events:
|
||||
|
||||
```python
|
||||
import asyncio
|
||||
|
||||
done = asyncio.Event()
|
||||
|
||||
def handler(event):
|
||||
if event.type == "assistant.message":
|
||||
print(event.data.content)
|
||||
elif event.type == "session.idle":
|
||||
done.set()
|
||||
|
||||
session.on(handler)
|
||||
await session.send({"prompt": "..."})
|
||||
await done.wait()
|
||||
```
|
||||
|
||||
### Unsubscribing from Events
|
||||
|
||||
The `on()` method returns a function that unsubscribes:
|
||||
|
||||
```python
|
||||
unsubscribe = session.on(lambda event: print(event.type))
|
||||
# Later...
|
||||
unsubscribe()
|
||||
```
|
||||
|
||||
### Event Types
|
||||
|
||||
Use attribute access for event type checking:
|
||||
|
||||
```python
|
||||
def handler(event):
|
||||
if event.type == "user.message":
|
||||
# Handle user message
|
||||
pass
|
||||
elif event.type == "assistant.message":
|
||||
print(event.data.content)
|
||||
elif event.type == "tool.executionStart":
|
||||
# Tool execution started
|
||||
pass
|
||||
elif event.type == "tool.executionComplete":
|
||||
# Tool execution completed
|
||||
pass
|
||||
elif event.type == "session.start":
|
||||
# Session started
|
||||
pass
|
||||
elif event.type == "session.idle":
|
||||
# Session is idle (processing complete)
|
||||
pass
|
||||
elif event.type == "session.error":
|
||||
print(f"Error: {event.data.message}")
|
||||
|
||||
session.on(handler)
|
||||
```
|
||||
|
||||
## Streaming Responses
|
||||
|
||||
### Enabling Streaming
|
||||
|
||||
Set `streaming: True` in SessionConfig:
|
||||
|
||||
```python
|
||||
session = await client.create_session({
|
||||
"model": "gpt-5",
|
||||
"streaming": True
|
||||
})
|
||||
```
|
||||
|
||||
### Handling Streaming Events
|
||||
|
||||
Handle both delta events (incremental) and final events:
|
||||
|
||||
```python
|
||||
import asyncio
|
||||
|
||||
done = asyncio.Event()
|
||||
|
||||
def handler(event):
|
||||
if event.type == "assistant.message.delta":
|
||||
# Incremental text chunk
|
||||
print(event.data.delta_content, end="", flush=True)
|
||||
elif event.type == "assistant.reasoning.delta":
|
||||
# Incremental reasoning chunk (model-dependent)
|
||||
print(event.data.delta_content, end="", flush=True)
|
||||
elif event.type == "assistant.message":
|
||||
# Final complete message
|
||||
print("\n--- Final ---")
|
||||
print(event.data.content)
|
||||
elif event.type == "assistant.reasoning":
|
||||
# Final reasoning content
|
||||
print("--- Reasoning ---")
|
||||
print(event.data.content)
|
||||
elif event.type == "session.idle":
|
||||
done.set()
|
||||
|
||||
session.on(handler)
|
||||
await session.send({"prompt": "Tell me a story"})
|
||||
await done.wait()
|
||||
```
|
||||
|
||||
Note: Final events (`assistant.message`, `assistant.reasoning`) are ALWAYS sent regardless of streaming setting.
|
||||
|
||||
## Custom Tools
|
||||
|
||||
### Defining Tools with define_tool
|
||||
|
||||
Use `define_tool` for tool definitions:
|
||||
|
||||
```python
|
||||
from copilot import define_tool
|
||||
|
||||
async def fetch_issue(issue_id: str):
|
||||
# Fetch issue from tracker
|
||||
return {"id": issue_id, "status": "open"}
|
||||
|
||||
session = await client.create_session({
|
||||
"model": "gpt-5",
|
||||
"tools": [
|
||||
define_tool(
|
||||
name="lookup_issue",
|
||||
description="Fetch issue details from tracker",
|
||||
parameters={
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"id": {"type": "string", "description": "Issue ID"}
|
||||
},
|
||||
"required": ["id"]
|
||||
},
|
||||
handler=lambda args, inv: fetch_issue(args["id"])
|
||||
)
|
||||
]
|
||||
})
|
||||
```
|
||||
|
||||
### Using Pydantic for Parameters
|
||||
|
||||
The SDK works well with Pydantic models:
|
||||
|
||||
```python
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
class WeatherArgs(BaseModel):
|
||||
location: str = Field(description="City name")
|
||||
units: str = Field(default="fahrenheit", description="Temperature units")
|
||||
|
||||
async def get_weather(args: WeatherArgs, inv):
|
||||
return {"temperature": 72, "units": args.units}
|
||||
|
||||
session = await client.create_session({
|
||||
"tools": [
|
||||
define_tool(
|
||||
name="get_weather",
|
||||
description="Get weather for a location",
|
||||
parameters=WeatherArgs.model_json_schema(),
|
||||
handler=lambda args, inv: get_weather(WeatherArgs(**args), inv)
|
||||
)
|
||||
]
|
||||
})
|
||||
```
|
||||
|
||||
### Tool Return Types
|
||||
|
||||
- Return any JSON-serializable value (automatically wrapped)
|
||||
- Or return a ToolResult dict for full control:
|
||||
|
||||
```python
|
||||
{
|
||||
"text_result_for_llm": str, # Result shown to LLM
|
||||
"result_type": "success" | "failure",
|
||||
"error": str, # Optional: Internal error (not shown to LLM)
|
||||
"tool_telemetry": dict # Optional: Telemetry data
|
||||
}
|
||||
```
|
||||
|
||||
### Tool Handler Signature
|
||||
|
||||
Tool handlers receive two arguments:
|
||||
|
||||
- `args` (dict) - The tool arguments passed by the LLM
|
||||
- `invocation` (ToolInvocation) - Metadata about the invocation
|
||||
- `invocation.session_id` - Session ID
|
||||
- `invocation.tool_call_id` - Tool call ID
|
||||
- `invocation.tool_name` - Tool name
|
||||
- `invocation.arguments` - Same as args parameter
|
||||
|
||||
### Tool Execution Flow
|
||||
|
||||
When Copilot invokes a tool, the client automatically:
|
||||
|
||||
1. Runs your handler function
|
||||
2. Serializes the return value
|
||||
3. Responds to the CLI
|
||||
|
||||
## System Message Customization
|
||||
|
||||
### Append Mode (Default - Preserves Guardrails)
|
||||
|
||||
```python
|
||||
session = await client.create_session({
|
||||
"model": "gpt-5",
|
||||
"system_message": {
|
||||
"mode": "append",
|
||||
"content": """
|
||||
<workflow_rules>
|
||||
- Always check for security vulnerabilities
|
||||
- Suggest performance improvements when applicable
|
||||
</workflow_rules>
|
||||
"""
|
||||
}
|
||||
})
|
||||
```
|
||||
|
||||
### Replace Mode (Full Control - Removes Guardrails)
|
||||
|
||||
```python
|
||||
session = await client.create_session({
|
||||
"model": "gpt-5",
|
||||
"system_message": {
|
||||
"mode": "replace",
|
||||
"content": "You are a helpful assistant."
|
||||
}
|
||||
})
|
||||
```
|
||||
|
||||
## File Attachments
|
||||
|
||||
Attach files to messages:
|
||||
|
||||
```python
|
||||
await session.send({
|
||||
"prompt": "Analyze this file",
|
||||
"attachments": [
|
||||
{
|
||||
"type": "file",
|
||||
"path": "/path/to/file.py",
|
||||
"display_name": "My File"
|
||||
}
|
||||
]
|
||||
})
|
||||
```
|
||||
|
||||
## Message Delivery Modes
|
||||
|
||||
Use the `mode` key in message options:
|
||||
|
||||
- `"enqueue"` - Queue message for processing
|
||||
- `"immediate"` - Process message immediately
|
||||
|
||||
```python
|
||||
await session.send({
|
||||
"prompt": "...",
|
||||
"mode": "enqueue"
|
||||
})
|
||||
```
|
||||
|
||||
## Multiple Sessions
|
||||
|
||||
Sessions are independent and can run concurrently:
|
||||
|
||||
```python
|
||||
session1 = await client.create_session({"model": "gpt-5"})
|
||||
session2 = await client.create_session({"model": "claude-sonnet-4.5"})
|
||||
|
||||
await asyncio.gather(
|
||||
session1.send({"prompt": "Hello from session 1"}),
|
||||
session2.send({"prompt": "Hello from session 2"})
|
||||
)
|
||||
```
|
||||
|
||||
## Bring Your Own Key (BYOK)
|
||||
|
||||
Use custom API providers via `provider`:
|
||||
|
||||
```python
|
||||
session = await client.create_session({
|
||||
"provider": {
|
||||
"type": "openai",
|
||||
"base_url": "https://api.openai.com/v1",
|
||||
"api_key": "your-api-key"
|
||||
}
|
||||
})
|
||||
```
|
||||
|
||||
## Session Lifecycle Management
|
||||
|
||||
### Listing Sessions
|
||||
|
||||
```python
|
||||
sessions = await client.list_sessions()
|
||||
for metadata in sessions:
|
||||
print(f"{metadata.session_id}: {metadata.summary}")
|
||||
```
|
||||
|
||||
### Deleting Sessions
|
||||
|
||||
```python
|
||||
await client.delete_session(session_id)
|
||||
```
|
||||
|
||||
### Getting Last Session ID
|
||||
|
||||
```python
|
||||
last_id = await client.get_last_session_id()
|
||||
if last_id:
|
||||
session = await client.resume_session(last_id)
|
||||
```
|
||||
|
||||
### Checking Connection State
|
||||
|
||||
```python
|
||||
state = client.get_state()
|
||||
# Returns: "disconnected" | "connecting" | "connected" | "error"
|
||||
```
|
||||
|
||||
## Error Handling
|
||||
|
||||
### Standard Exception Handling
|
||||
|
||||
```python
|
||||
try:
|
||||
session = await client.create_session()
|
||||
await session.send({"prompt": "Hello"})
|
||||
except Exception as e:
|
||||
print(f"Error: {e}")
|
||||
```
|
||||
|
||||
### Session Error Events
|
||||
|
||||
Monitor `session.error` event type for runtime errors:
|
||||
|
||||
```python
|
||||
def handler(event):
|
||||
if event.type == "session.error":
|
||||
print(f"Session Error: {event.data.message}")
|
||||
|
||||
session.on(handler)
|
||||
```
|
||||
|
||||
## Connectivity Testing
|
||||
|
||||
Use ping to verify server connectivity:
|
||||
|
||||
```python
|
||||
response = await client.ping("health check")
|
||||
print(f"Server responded at {response['timestamp']}")
|
||||
```
|
||||
|
||||
## Resource Cleanup
|
||||
|
||||
### Automatic Cleanup with Context Managers
|
||||
|
||||
ALWAYS use async context managers for automatic cleanup:
|
||||
|
||||
```python
|
||||
async with CopilotClient() as client:
|
||||
async with await client.create_session() as session:
|
||||
# Use session...
|
||||
await session.send({"prompt": "Hello"})
|
||||
# Session automatically destroyed
|
||||
# Client automatically stopped
|
||||
```
|
||||
|
||||
### Manual Cleanup with Try-Finally
|
||||
|
||||
```python
|
||||
client = CopilotClient()
|
||||
try:
|
||||
await client.start()
|
||||
session = await client.create_session()
|
||||
try:
|
||||
# Use session...
|
||||
pass
|
||||
finally:
|
||||
await session.destroy()
|
||||
finally:
|
||||
await client.stop()
|
||||
```
|
||||
|
||||
## Best Practices
|
||||
|
||||
1. **Always use async context managers** (`async with`) for automatic cleanup
|
||||
2. **Use asyncio.Event or asyncio.Future** to wait for session.idle event
|
||||
3. **Handle session.error** events for robust error handling
|
||||
4. **Use if/elif chains** for event type checking
|
||||
5. **Enable streaming** for better UX in interactive scenarios
|
||||
6. **Use define_tool** for tool definitions
|
||||
7. **Use Pydantic models** for type-safe parameter validation
|
||||
8. **Dispose event subscriptions** when no longer needed
|
||||
9. **Use system_message with mode: "append"** to preserve safety guardrails
|
||||
10. **Handle both delta and final events** when streaming is enabled
|
||||
11. **Use type hints** for better IDE support and code clarity
|
||||
|
||||
## Common Patterns
|
||||
|
||||
### Simple Query-Response
|
||||
|
||||
```python
|
||||
from copilot import CopilotClient
|
||||
import asyncio
|
||||
|
||||
async def main():
|
||||
async with CopilotClient() as client:
|
||||
async with await client.create_session({"model": "gpt-5"}) as session:
|
||||
done = asyncio.Event()
|
||||
|
||||
def handler(event):
|
||||
if event.type == "assistant.message":
|
||||
print(event.data.content)
|
||||
elif event.type == "session.idle":
|
||||
done.set()
|
||||
|
||||
session.on(handler)
|
||||
await session.send({"prompt": "What is 2+2?"})
|
||||
await done.wait()
|
||||
|
||||
asyncio.run(main())
|
||||
```
|
||||
|
||||
### Multi-Turn Conversation
|
||||
|
||||
```python
|
||||
async def send_and_wait(session, prompt: str):
|
||||
done = asyncio.Event()
|
||||
result = []
|
||||
|
||||
def handler(event):
|
||||
if event.type == "assistant.message":
|
||||
result.append(event.data.content)
|
||||
print(event.data.content)
|
||||
elif event.type == "session.idle":
|
||||
done.set()
|
||||
elif event.type == "session.error":
|
||||
result.append(None)
|
||||
done.set()
|
||||
|
||||
unsubscribe = session.on(handler)
|
||||
await session.send({"prompt": prompt})
|
||||
await done.wait()
|
||||
unsubscribe()
|
||||
|
||||
return result[0] if result else None
|
||||
|
||||
async with await client.create_session() as session:
|
||||
await send_and_wait(session, "What is the capital of France?")
|
||||
await send_and_wait(session, "What is its population?")
|
||||
```
|
||||
|
||||
### SendAndWait Helper
|
||||
|
||||
```python
|
||||
# Use built-in send_and_wait for simpler synchronous interaction
|
||||
async with await client.create_session() as session:
|
||||
response = await session.send_and_wait(
|
||||
{"prompt": "What is 2+2?"},
|
||||
timeout=60.0
|
||||
)
|
||||
|
||||
if response and response.type == "assistant.message":
|
||||
print(response.data.content)
|
||||
```
|
||||
|
||||
### Tool with Dataclass Return Type
|
||||
|
||||
```python
|
||||
from dataclasses import dataclass, asdict
|
||||
from copilot import define_tool
|
||||
|
||||
@dataclass
|
||||
class UserInfo:
|
||||
id: str
|
||||
name: str
|
||||
email: str
|
||||
role: str
|
||||
|
||||
async def get_user(args, inv) -> dict:
|
||||
user = UserInfo(
|
||||
id=args["user_id"],
|
||||
name="John Doe",
|
||||
email="john@example.com",
|
||||
role="Developer"
|
||||
)
|
||||
return asdict(user)
|
||||
|
||||
session = await client.create_session({
|
||||
"tools": [
|
||||
define_tool(
|
||||
name="get_user",
|
||||
description="Retrieve user information",
|
||||
parameters={
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"user_id": {"type": "string", "description": "User ID"}
|
||||
},
|
||||
"required": ["user_id"]
|
||||
},
|
||||
handler=get_user
|
||||
)
|
||||
]
|
||||
})
|
||||
```
|
||||
|
||||
### Streaming with Progress
|
||||
|
||||
```python
|
||||
import asyncio
|
||||
|
||||
current_message = []
|
||||
done = asyncio.Event()
|
||||
|
||||
def handler(event):
|
||||
if event.type == "assistant.message.delta":
|
||||
current_message.append(event.data.delta_content)
|
||||
print(event.data.delta_content, end="", flush=True)
|
||||
elif event.type == "assistant.message":
|
||||
print(f"\n\n=== Complete ===")
|
||||
print(f"Total length: {len(event.data.content)} chars")
|
||||
elif event.type == "session.idle":
|
||||
done.set()
|
||||
|
||||
unsubscribe = session.on(handler)
|
||||
await session.send({"prompt": "Write a long story"})
|
||||
await done.wait()
|
||||
unsubscribe()
|
||||
```
|
||||
|
||||
### Error Recovery
|
||||
|
||||
```python
|
||||
def handler(event):
|
||||
if event.type == "session.error":
|
||||
print(f"Session error: {event.data.message}")
|
||||
# Optionally retry or handle error
|
||||
|
||||
session.on(handler)
|
||||
|
||||
try:
|
||||
await session.send({"prompt": "risky operation"})
|
||||
except Exception as e:
|
||||
# Handle send errors
|
||||
print(f"Failed to send: {e}")
|
||||
```
|
||||
|
||||
### Using TypedDict for Type Safety
|
||||
|
||||
```python
|
||||
from typing import TypedDict, List
|
||||
|
||||
class MessageOptions(TypedDict, total=False):
|
||||
prompt: str
|
||||
attachments: List[dict]
|
||||
mode: str
|
||||
|
||||
class SessionConfig(TypedDict, total=False):
|
||||
model: str
|
||||
streaming: bool
|
||||
tools: List
|
||||
|
||||
# Usage with type hints
|
||||
options: MessageOptions = {
|
||||
"prompt": "Hello",
|
||||
"mode": "enqueue"
|
||||
}
|
||||
await session.send(options)
|
||||
|
||||
config: SessionConfig = {
|
||||
"model": "gpt-5",
|
||||
"streaming": True
|
||||
}
|
||||
session = await client.create_session(config)
|
||||
```
|
||||
|
||||
### Async Generator for Streaming
|
||||
|
||||
```python
|
||||
from typing import AsyncGenerator
|
||||
|
||||
async def stream_response(session, prompt: str) -> AsyncGenerator[str, None]:
|
||||
"""Stream response chunks as an async generator."""
|
||||
queue = asyncio.Queue()
|
||||
done = asyncio.Event()
|
||||
|
||||
def handler(event):
|
||||
if event.type == "assistant.message.delta":
|
||||
queue.put_nowait(event.data.delta_content)
|
||||
elif event.type == "session.idle":
|
||||
done.set()
|
||||
|
||||
unsubscribe = session.on(handler)
|
||||
await session.send({"prompt": prompt})
|
||||
|
||||
while not done.is_set():
|
||||
try:
|
||||
chunk = await asyncio.wait_for(queue.get(), timeout=0.1)
|
||||
yield chunk
|
||||
except asyncio.TimeoutError:
|
||||
continue
|
||||
|
||||
# Drain remaining items
|
||||
while not queue.empty():
|
||||
yield queue.get_nowait()
|
||||
|
||||
unsubscribe()
|
||||
|
||||
# Usage
|
||||
async for chunk in stream_response(session, "Tell me a story"):
|
||||
print(chunk, end="", flush=True)
|
||||
```
|
||||
|
||||
### Decorator Pattern for Tools
|
||||
|
||||
```python
|
||||
from typing import Callable, Any
|
||||
from copilot import define_tool
|
||||
|
||||
def copilot_tool(
|
||||
name: str,
|
||||
description: str,
|
||||
parameters: dict
|
||||
) -> Callable:
|
||||
"""Decorator to convert a function into a Copilot tool."""
|
||||
def decorator(func: Callable) -> Any:
|
||||
return define_tool(
|
||||
name=name,
|
||||
description=description,
|
||||
parameters=parameters,
|
||||
handler=lambda args, inv: func(**args)
|
||||
)
|
||||
return decorator
|
||||
|
||||
@copilot_tool(
|
||||
name="calculate",
|
||||
description="Perform a calculation",
|
||||
parameters={
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"expression": {"type": "string", "description": "Math expression"}
|
||||
},
|
||||
"required": ["expression"]
|
||||
}
|
||||
)
|
||||
def calculate(expression: str) -> float:
|
||||
return eval(expression)
|
||||
|
||||
session = await client.create_session({"tools": [calculate]})
|
||||
```
|
||||
|
||||
## Python-Specific Features
|
||||
|
||||
### Async Context Manager Protocol
|
||||
|
||||
The SDK implements `__aenter__` and `__aexit__`:
|
||||
|
||||
```python
|
||||
class CopilotClient:
|
||||
async def __aenter__(self):
|
||||
await self.start()
|
||||
return self
|
||||
|
||||
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
||||
await self.stop()
|
||||
return False
|
||||
|
||||
class CopilotSession:
|
||||
async def __aenter__(self):
|
||||
return self
|
||||
|
||||
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
||||
await self.destroy()
|
||||
return False
|
||||
```
|
||||
|
||||
### Dataclass Support
|
||||
|
||||
Event data is available as attributes:
|
||||
|
||||
```python
|
||||
def handler(event):
|
||||
# Access event attributes directly
|
||||
print(event.type)
|
||||
print(event.data.content) # For assistant.message
|
||||
print(event.data.delta_content) # For assistant.message.delta
|
||||
```
|
||||
Reference in New Issue
Block a user