19 KiB
applyTo, description, name
| applyTo | description | name |
|---|---|---|
| **.py, pyproject.toml, setup.py | This file provides guidance on building Python applications using GitHub Copilot SDK. | GitHub Copilot SDK Python Instructions |
Core Principles
- The SDK is in technical preview and may have breaking changes
- Requires Python 3.9 or later
- Requires GitHub Copilot CLI installed and in PATH
- Uses async/await patterns throughout (asyncio)
- Supports both async context managers and manual lifecycle management
- Type hints provided for better IDE support
Installation
Always install via pip:
pip install copilot-sdk
# or with poetry
poetry add copilot-sdk
# or with uv
uv add copilot-sdk
Client Initialization
Basic Client Setup
from copilot import CopilotClient
async with CopilotClient() as client:
# Use client...
pass
Client Configuration Options
When creating a CopilotClient, use a dict with these keys:
cli_path- Path to CLI executable (default: "copilot" from PATH or COPILOT_CLI_PATH env var)cli_url- URL of existing CLI server (e.g., "localhost:8080"). When provided, client won't spawn a processport- Server port (default: 0 for random)use_stdio- Use stdio transport instead of TCP (default: True)log_level- Log level (default: "info")auto_start- Auto-start server (default: True)auto_restart- Auto-restart on crash (default: True)cwd- Working directory for the CLI process (default: os.getcwd())env- Environment variables for the CLI process (dict)
Manual Server Control
For explicit control:
client = CopilotClient({"auto_start": False})
await client.start()
# Use client...
await client.stop()
Use force_stop() when stop() takes too long.
Session Management
Creating Sessions
Use a dict for SessionConfig:
session = await client.create_session({
"model": "gpt-5",
"streaming": True,
"tools": [...],
"system_message": { ... },
"available_tools": ["tool1", "tool2"],
"excluded_tools": ["tool3"],
"provider": { ... }
})
Session Config Options
session_id- Custom session ID (str)model- Model name ("gpt-5", "claude-sonnet-4.5", etc.)tools- Custom tools exposed to the CLI (list[Tool])system_message- System message customization (dict)available_tools- Allowlist of tool names (list[str])excluded_tools- Blocklist of tool names (list[str])provider- Custom API provider configuration (BYOK) (ProviderConfig)streaming- Enable streaming response chunks (bool)mcp_servers- MCP server configurations (list)custom_agents- Custom agent configurations (list)config_dir- Config directory override (str)skill_directories- Skill directories (list[str])disabled_skills- Disabled skills (list[str])on_permission_request- Permission request handler (callable)
Resuming Sessions
session = await client.resume_session("session-id", {
"tools": [my_new_tool]
})
Session Operations
session.session_id- Get session identifier (str)await session.send({"prompt": "...", "attachments": [...]})- Send message, returns str (message ID)await session.send_and_wait({"prompt": "..."}, timeout=60.0)- Send and wait for idle, returns SessionEvent | Noneawait session.abort()- Abort current processingawait session.get_messages()- Get all events/messages, returns list[SessionEvent]await session.destroy()- Clean up session
Event Handling
Event Subscription Pattern
ALWAYS use asyncio events or futures for waiting on session events:
import asyncio
done = asyncio.Event()
def handler(event):
if event.type == "assistant.message":
print(event.data.content)
elif event.type == "session.idle":
done.set()
session.on(handler)
await session.send({"prompt": "..."})
await done.wait()
Unsubscribing from Events
The on() method returns a function that unsubscribes:
unsubscribe = session.on(lambda event: print(event.type))
# Later...
unsubscribe()
Event Types
Use attribute access for event type checking:
def handler(event):
if event.type == "user.message":
# Handle user message
pass
elif event.type == "assistant.message":
print(event.data.content)
elif event.type == "tool.executionStart":
# Tool execution started
pass
elif event.type == "tool.executionComplete":
# Tool execution completed
pass
elif event.type == "session.start":
# Session started
pass
elif event.type == "session.idle":
# Session is idle (processing complete)
pass
elif event.type == "session.error":
print(f"Error: {event.data.message}")
session.on(handler)
Streaming Responses
Enabling Streaming
Set streaming: True in SessionConfig:
session = await client.create_session({
"model": "gpt-5",
"streaming": True
})
Handling Streaming Events
Handle both delta events (incremental) and final events:
import asyncio
done = asyncio.Event()
def handler(event):
if event.type == "assistant.message.delta":
# Incremental text chunk
print(event.data.delta_content, end="", flush=True)
elif event.type == "assistant.reasoning.delta":
# Incremental reasoning chunk (model-dependent)
print(event.data.delta_content, end="", flush=True)
elif event.type == "assistant.message":
# Final complete message
print("\n--- Final ---")
print(event.data.content)
elif event.type == "assistant.reasoning":
# Final reasoning content
print("--- Reasoning ---")
print(event.data.content)
elif event.type == "session.idle":
done.set()
session.on(handler)
await session.send({"prompt": "Tell me a story"})
await done.wait()
Note: Final events (assistant.message, assistant.reasoning) are ALWAYS sent regardless of streaming setting.
Custom Tools
Defining Tools with define_tool
Use define_tool for tool definitions:
from copilot import define_tool
async def fetch_issue(issue_id: str):
# Fetch issue from tracker
return {"id": issue_id, "status": "open"}
session = await client.create_session({
"model": "gpt-5",
"tools": [
define_tool(
name="lookup_issue",
description="Fetch issue details from tracker",
parameters={
"type": "object",
"properties": {
"id": {"type": "string", "description": "Issue ID"}
},
"required": ["id"]
},
handler=lambda args, inv: fetch_issue(args["id"])
)
]
})
Using Pydantic for Parameters
The SDK works well with Pydantic models:
from pydantic import BaseModel, Field
class WeatherArgs(BaseModel):
location: str = Field(description="City name")
units: str = Field(default="fahrenheit", description="Temperature units")
async def get_weather(args: WeatherArgs, inv):
return {"temperature": 72, "units": args.units}
session = await client.create_session({
"tools": [
define_tool(
name="get_weather",
description="Get weather for a location",
parameters=WeatherArgs.model_json_schema(),
handler=lambda args, inv: get_weather(WeatherArgs(**args), inv)
)
]
})
Tool Return Types
- Return any JSON-serializable value (automatically wrapped)
- Or return a ToolResult dict for full control:
{
"text_result_for_llm": str, # Result shown to LLM
"result_type": "success" | "failure",
"error": str, # Optional: Internal error (not shown to LLM)
"tool_telemetry": dict # Optional: Telemetry data
}
Tool Handler Signature
Tool handlers receive two arguments:
args(dict) - The tool arguments passed by the LLMinvocation(ToolInvocation) - Metadata about the invocationinvocation.session_id- Session IDinvocation.tool_call_id- Tool call IDinvocation.tool_name- Tool nameinvocation.arguments- Same as args parameter
Tool Execution Flow
When Copilot invokes a tool, the client automatically:
- Runs your handler function
- Serializes the return value
- Responds to the CLI
System Message Customization
Append Mode (Default - Preserves Guardrails)
session = await client.create_session({
"model": "gpt-5",
"system_message": {
"mode": "append",
"content": """
<workflow_rules>
- Always check for security vulnerabilities
- Suggest performance improvements when applicable
</workflow_rules>
"""
}
})
Replace Mode (Full Control - Removes Guardrails)
session = await client.create_session({
"model": "gpt-5",
"system_message": {
"mode": "replace",
"content": "You are a helpful assistant."
}
})
File Attachments
Attach files to messages:
await session.send({
"prompt": "Analyze this file",
"attachments": [
{
"type": "file",
"path": "/path/to/file.py",
"display_name": "My File"
}
]
})
Message Delivery Modes
Use the mode key in message options:
"enqueue"- Queue message for processing"immediate"- Process message immediately
await session.send({
"prompt": "...",
"mode": "enqueue"
})
Multiple Sessions
Sessions are independent and can run concurrently:
session1 = await client.create_session({"model": "gpt-5"})
session2 = await client.create_session({"model": "claude-sonnet-4.5"})
await asyncio.gather(
session1.send({"prompt": "Hello from session 1"}),
session2.send({"prompt": "Hello from session 2"})
)
Bring Your Own Key (BYOK)
Use custom API providers via provider:
session = await client.create_session({
"provider": {
"type": "openai",
"base_url": "https://api.openai.com/v1",
"api_key": "your-api-key"
}
})
Session Lifecycle Management
Listing Sessions
sessions = await client.list_sessions()
for metadata in sessions:
print(f"{metadata.session_id}: {metadata.summary}")
Deleting Sessions
await client.delete_session(session_id)
Getting Last Session ID
last_id = await client.get_last_session_id()
if last_id:
session = await client.resume_session(last_id)
Checking Connection State
state = client.get_state()
# Returns: "disconnected" | "connecting" | "connected" | "error"
Error Handling
Standard Exception Handling
try:
session = await client.create_session()
await session.send({"prompt": "Hello"})
except Exception as e:
print(f"Error: {e}")
Session Error Events
Monitor session.error event type for runtime errors:
def handler(event):
if event.type == "session.error":
print(f"Session Error: {event.data.message}")
session.on(handler)
Connectivity Testing
Use ping to verify server connectivity:
response = await client.ping("health check")
print(f"Server responded at {response['timestamp']}")
Resource Cleanup
Automatic Cleanup with Context Managers
ALWAYS use async context managers for automatic cleanup:
async with CopilotClient() as client:
async with await client.create_session() as session:
# Use session...
await session.send({"prompt": "Hello"})
# Session automatically destroyed
# Client automatically stopped
Manual Cleanup with Try-Finally
client = CopilotClient()
try:
await client.start()
session = await client.create_session()
try:
# Use session...
pass
finally:
await session.destroy()
finally:
await client.stop()
Best Practices
- Always use async context managers (
async with) for automatic cleanup - Use asyncio.Event or asyncio.Future to wait for session.idle event
- Handle session.error events for robust error handling
- Use if/elif chains for event type checking
- Enable streaming for better UX in interactive scenarios
- Use define_tool for tool definitions
- Use Pydantic models for type-safe parameter validation
- Dispose event subscriptions when no longer needed
- Use system_message with mode: "append" to preserve safety guardrails
- Handle both delta and final events when streaming is enabled
- Use type hints for better IDE support and code clarity
Common Patterns
Simple Query-Response
from copilot import CopilotClient
import asyncio
async def main():
async with CopilotClient() as client:
async with await client.create_session({"model": "gpt-5"}) as session:
done = asyncio.Event()
def handler(event):
if event.type == "assistant.message":
print(event.data.content)
elif event.type == "session.idle":
done.set()
session.on(handler)
await session.send({"prompt": "What is 2+2?"})
await done.wait()
asyncio.run(main())
Multi-Turn Conversation
async def send_and_wait(session, prompt: str):
done = asyncio.Event()
result = []
def handler(event):
if event.type == "assistant.message":
result.append(event.data.content)
print(event.data.content)
elif event.type == "session.idle":
done.set()
elif event.type == "session.error":
result.append(None)
done.set()
unsubscribe = session.on(handler)
await session.send({"prompt": prompt})
await done.wait()
unsubscribe()
return result[0] if result else None
async with await client.create_session() as session:
await send_and_wait(session, "What is the capital of France?")
await send_and_wait(session, "What is its population?")
SendAndWait Helper
# Use built-in send_and_wait for simpler synchronous interaction
async with await client.create_session() as session:
response = await session.send_and_wait(
{"prompt": "What is 2+2?"},
timeout=60.0
)
if response and response.type == "assistant.message":
print(response.data.content)
Tool with Dataclass Return Type
from dataclasses import dataclass, asdict
from copilot import define_tool
@dataclass
class UserInfo:
id: str
name: str
email: str
role: str
async def get_user(args, inv) -> dict:
user = UserInfo(
id=args["user_id"],
name="John Doe",
email="john@example.com",
role="Developer"
)
return asdict(user)
session = await client.create_session({
"tools": [
define_tool(
name="get_user",
description="Retrieve user information",
parameters={
"type": "object",
"properties": {
"user_id": {"type": "string", "description": "User ID"}
},
"required": ["user_id"]
},
handler=get_user
)
]
})
Streaming with Progress
import asyncio
current_message = []
done = asyncio.Event()
def handler(event):
if event.type == "assistant.message.delta":
current_message.append(event.data.delta_content)
print(event.data.delta_content, end="", flush=True)
elif event.type == "assistant.message":
print(f"\n\n=== Complete ===")
print(f"Total length: {len(event.data.content)} chars")
elif event.type == "session.idle":
done.set()
unsubscribe = session.on(handler)
await session.send({"prompt": "Write a long story"})
await done.wait()
unsubscribe()
Error Recovery
def handler(event):
if event.type == "session.error":
print(f"Session error: {event.data.message}")
# Optionally retry or handle error
session.on(handler)
try:
await session.send({"prompt": "risky operation"})
except Exception as e:
# Handle send errors
print(f"Failed to send: {e}")
Using TypedDict for Type Safety
from typing import TypedDict, List
class MessageOptions(TypedDict, total=False):
prompt: str
attachments: List[dict]
mode: str
class SessionConfig(TypedDict, total=False):
model: str
streaming: bool
tools: List
# Usage with type hints
options: MessageOptions = {
"prompt": "Hello",
"mode": "enqueue"
}
await session.send(options)
config: SessionConfig = {
"model": "gpt-5",
"streaming": True
}
session = await client.create_session(config)
Async Generator for Streaming
from typing import AsyncGenerator
async def stream_response(session, prompt: str) -> AsyncGenerator[str, None]:
"""Stream response chunks as an async generator."""
queue = asyncio.Queue()
done = asyncio.Event()
def handler(event):
if event.type == "assistant.message.delta":
queue.put_nowait(event.data.delta_content)
elif event.type == "session.idle":
done.set()
unsubscribe = session.on(handler)
await session.send({"prompt": prompt})
while not done.is_set():
try:
chunk = await asyncio.wait_for(queue.get(), timeout=0.1)
yield chunk
except asyncio.TimeoutError:
continue
# Drain remaining items
while not queue.empty():
yield queue.get_nowait()
unsubscribe()
# Usage
async for chunk in stream_response(session, "Tell me a story"):
print(chunk, end="", flush=True)
Decorator Pattern for Tools
from typing import Callable, Any
from copilot import define_tool
def copilot_tool(
name: str,
description: str,
parameters: dict
) -> Callable:
"""Decorator to convert a function into a Copilot tool."""
def decorator(func: Callable) -> Any:
return define_tool(
name=name,
description=description,
parameters=parameters,
handler=lambda args, inv: func(**args)
)
return decorator
@copilot_tool(
name="calculate",
description="Perform a calculation",
parameters={
"type": "object",
"properties": {
"expression": {"type": "string", "description": "Math expression"}
},
"required": ["expression"]
}
)
def calculate(expression: str) -> float:
return eval(expression)
session = await client.create_session({"tools": [calculate]})
Python-Specific Features
Async Context Manager Protocol
The SDK implements __aenter__ and __aexit__:
class CopilotClient:
async def __aenter__(self):
await self.start()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.stop()
return False
class CopilotSession:
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.destroy()
return False
Dataclass Support
Event data is available as attributes:
def handler(event):
# Access event attributes directly
print(event.type)
print(event.data.content) # For assistant.message
print(event.data.delta_content) # For assistant.message.delta