mirror of
https://github.com/github/awesome-copilot.git
synced 2026-02-20 02:15:12 +00:00
feat: add agent-governance skill
Add governance patterns and techniques for AI agent systems: - Policy definition with allowlists, blocklists, and content filters - Semantic intent classification for threat detection - Tool-level governance decorator pattern - Trust scoring with temporal decay for multi-agent systems - Append-only audit trail design - Framework integration examples (PydanticAI, CrewAI, OpenAI Agents) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
564
skills/agent-governance/SKILL.md
Normal file
564
skills/agent-governance/SKILL.md
Normal file
@@ -0,0 +1,564 @@
|
||||
---
|
||||
name: agent-governance
|
||||
description: |
|
||||
Patterns and techniques for adding governance, safety, and trust controls to AI agent systems. Use this skill when:
|
||||
- Building AI agents that call external tools (APIs, databases, file systems)
|
||||
- Implementing policy-based access controls for agent tool usage
|
||||
- Adding semantic intent classification to detect dangerous prompts
|
||||
- Creating trust scoring systems for multi-agent workflows
|
||||
- Building audit trails for agent actions and decisions
|
||||
- Enforcing rate limits, content filters, or tool restrictions on agents
|
||||
- Working with any agent framework (PydanticAI, CrewAI, OpenAI Agents, LangChain, AutoGen)
|
||||
---
|
||||
|
||||
# Agent Governance Patterns
|
||||
|
||||
Patterns for adding safety, trust, and policy enforcement to AI agent systems.
|
||||
|
||||
## Overview
|
||||
|
||||
Governance patterns ensure AI agents operate within defined boundaries — controlling which tools they can call, what content they can process, how much they can do, and maintaining accountability through audit trails.
|
||||
|
||||
```
|
||||
User Request → Intent Classification → Policy Check → Tool Execution → Audit Log
|
||||
↓ ↓ ↓
|
||||
Threat Detection Allow/Deny Trust Update
|
||||
```
|
||||
|
||||
## When to Use
|
||||
|
||||
- **Agents with tool access**: Any agent that calls external tools (APIs, databases, shell commands)
|
||||
- **Multi-agent systems**: Agents delegating to other agents need trust boundaries
|
||||
- **Production deployments**: Compliance, audit, and safety requirements
|
||||
- **Sensitive operations**: Financial transactions, data access, infrastructure management
|
||||
|
||||
---
|
||||
|
||||
## Pattern 1: Governance Policy
|
||||
|
||||
Define what an agent is allowed to do as a composable, serializable policy object.
|
||||
|
||||
```python
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum
|
||||
from typing import Optional
|
||||
import re
|
||||
|
||||
class PolicyAction(Enum):
|
||||
ALLOW = "allow"
|
||||
DENY = "deny"
|
||||
REVIEW = "review" # flag for human review
|
||||
|
||||
@dataclass
|
||||
class GovernancePolicy:
|
||||
"""Declarative policy controlling agent behavior."""
|
||||
name: str
|
||||
allowed_tools: list[str] = field(default_factory=list) # whitelist
|
||||
blocked_tools: list[str] = field(default_factory=list) # blacklist
|
||||
blocked_patterns: list[str] = field(default_factory=list) # content filters
|
||||
max_calls_per_request: int = 100 # rate limit
|
||||
require_human_approval: list[str] = field(default_factory=list) # tools needing approval
|
||||
|
||||
def check_tool(self, tool_name: str) -> PolicyAction:
|
||||
"""Check if a tool is allowed by this policy."""
|
||||
if tool_name in self.blocked_tools:
|
||||
return PolicyAction.DENY
|
||||
if tool_name in self.require_human_approval:
|
||||
return PolicyAction.REVIEW
|
||||
if self.allowed_tools and tool_name not in self.allowed_tools:
|
||||
return PolicyAction.DENY
|
||||
return PolicyAction.ALLOW
|
||||
|
||||
def check_content(self, content: str) -> Optional[str]:
|
||||
"""Check content against blocked patterns. Returns matched pattern or None."""
|
||||
for pattern in self.blocked_patterns:
|
||||
if re.search(pattern, content, re.IGNORECASE):
|
||||
return pattern
|
||||
return None
|
||||
```
|
||||
|
||||
### Policy Composition
|
||||
|
||||
Combine multiple policies (e.g., org-wide + team + agent-specific):
|
||||
|
||||
```python
|
||||
def compose_policies(*policies: GovernancePolicy) -> GovernancePolicy:
|
||||
"""Merge policies with most-restrictive-wins semantics."""
|
||||
combined = GovernancePolicy(name="composed")
|
||||
|
||||
for policy in policies:
|
||||
combined.blocked_tools.extend(policy.blocked_tools)
|
||||
combined.blocked_patterns.extend(policy.blocked_patterns)
|
||||
combined.require_human_approval.extend(policy.require_human_approval)
|
||||
combined.max_calls_per_request = min(
|
||||
combined.max_calls_per_request,
|
||||
policy.max_calls_per_request
|
||||
)
|
||||
if policy.allowed_tools:
|
||||
if combined.allowed_tools:
|
||||
combined.allowed_tools = [
|
||||
t for t in combined.allowed_tools if t in policy.allowed_tools
|
||||
]
|
||||
else:
|
||||
combined.allowed_tools = list(policy.allowed_tools)
|
||||
|
||||
return combined
|
||||
|
||||
|
||||
# Usage: layer policies from broad to specific
|
||||
org_policy = GovernancePolicy(
|
||||
name="org-wide",
|
||||
blocked_tools=["shell_exec", "delete_database"],
|
||||
blocked_patterns=[r"(?i)(api[_-]?key|secret|password)\s*[:=]"],
|
||||
max_calls_per_request=50
|
||||
)
|
||||
team_policy = GovernancePolicy(
|
||||
name="data-team",
|
||||
allowed_tools=["query_db", "read_file", "write_report"],
|
||||
require_human_approval=["write_report"]
|
||||
)
|
||||
agent_policy = compose_policies(org_policy, team_policy)
|
||||
```
|
||||
|
||||
### Policy as YAML
|
||||
|
||||
Store policies as configuration, not code:
|
||||
|
||||
```yaml
|
||||
# governance-policy.yaml
|
||||
name: production-agent
|
||||
allowed_tools:
|
||||
- search_documents
|
||||
- query_database
|
||||
- send_email
|
||||
blocked_tools:
|
||||
- shell_exec
|
||||
- delete_record
|
||||
blocked_patterns:
|
||||
- "(?i)(api[_-]?key|secret|password)\\s*[:=]"
|
||||
- "(?i)(drop|truncate|delete from)\\s+\\w+"
|
||||
max_calls_per_request: 25
|
||||
require_human_approval:
|
||||
- send_email
|
||||
```
|
||||
|
||||
```python
|
||||
import yaml
|
||||
|
||||
def load_policy(path: str) -> GovernancePolicy:
|
||||
with open(path) as f:
|
||||
data = yaml.safe_load(f)
|
||||
return GovernancePolicy(**data)
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Pattern 2: Semantic Intent Classification
|
||||
|
||||
Detect dangerous intent in prompts before they reach the agent, using pattern-based signals.
|
||||
|
||||
```python
|
||||
from dataclasses import dataclass
|
||||
|
||||
@dataclass
|
||||
class IntentSignal:
|
||||
category: str # e.g., "data_exfiltration", "privilege_escalation"
|
||||
confidence: float # 0.0 to 1.0
|
||||
evidence: str # what triggered the detection
|
||||
|
||||
# Weighted signal patterns for threat detection
|
||||
THREAT_SIGNALS = [
|
||||
# Data exfiltration
|
||||
(r"(?i)send\s+(all|every|entire)\s+\w+\s+to\s+", "data_exfiltration", 0.8),
|
||||
(r"(?i)export\s+.*\s+to\s+(external|outside|third.?party)", "data_exfiltration", 0.9),
|
||||
(r"(?i)curl\s+.*\s+-d\s+", "data_exfiltration", 0.7),
|
||||
|
||||
# Privilege escalation
|
||||
(r"(?i)(sudo|as\s+root|admin\s+access)", "privilege_escalation", 0.8),
|
||||
(r"(?i)chmod\s+777", "privilege_escalation", 0.9),
|
||||
|
||||
# System modification
|
||||
(r"(?i)(rm\s+-rf|del\s+/[sq]|format\s+c:)", "system_destruction", 0.95),
|
||||
(r"(?i)(drop\s+database|truncate\s+table)", "system_destruction", 0.9),
|
||||
|
||||
# Prompt injection
|
||||
(r"(?i)ignore\s+(previous|above|all)\s+(instructions?|rules?)", "prompt_injection", 0.9),
|
||||
(r"(?i)you\s+are\s+now\s+(a|an)\s+", "prompt_injection", 0.7),
|
||||
]
|
||||
|
||||
def classify_intent(content: str) -> list[IntentSignal]:
|
||||
"""Classify content for threat signals."""
|
||||
signals = []
|
||||
for pattern, category, weight in THREAT_SIGNALS:
|
||||
match = re.search(pattern, content)
|
||||
if match:
|
||||
signals.append(IntentSignal(
|
||||
category=category,
|
||||
confidence=weight,
|
||||
evidence=match.group()
|
||||
))
|
||||
return signals
|
||||
|
||||
def is_safe(content: str, threshold: float = 0.7) -> bool:
|
||||
"""Quick check: is the content safe above the given threshold?"""
|
||||
signals = classify_intent(content)
|
||||
return not any(s.confidence >= threshold for s in signals)
|
||||
```
|
||||
|
||||
**Key insight**: Intent classification happens *before* tool execution, acting as a pre-flight safety check. This is fundamentally different from output guardrails which only check *after* generation.
|
||||
|
||||
---
|
||||
|
||||
## Pattern 3: Tool-Level Governance Decorator
|
||||
|
||||
Wrap individual tool functions with governance checks:
|
||||
|
||||
```python
|
||||
import functools
|
||||
import time
|
||||
from collections import defaultdict
|
||||
|
||||
_call_counters: dict[str, int] = defaultdict(int)
|
||||
|
||||
def govern(policy: GovernancePolicy, audit_trail=None):
|
||||
"""Decorator that enforces governance policy on a tool function."""
|
||||
def decorator(func):
|
||||
@functools.wraps(func)
|
||||
async def wrapper(*args, **kwargs):
|
||||
tool_name = func.__name__
|
||||
|
||||
# 1. Check tool allowlist/blocklist
|
||||
action = policy.check_tool(tool_name)
|
||||
if action == PolicyAction.DENY:
|
||||
raise PermissionError(f"Policy '{policy.name}' blocks tool '{tool_name}'")
|
||||
if action == PolicyAction.REVIEW:
|
||||
raise PermissionError(f"Tool '{tool_name}' requires human approval")
|
||||
|
||||
# 2. Check rate limit
|
||||
_call_counters[policy.name] += 1
|
||||
if _call_counters[policy.name] > policy.max_calls_per_request:
|
||||
raise PermissionError(f"Rate limit exceeded: {policy.max_calls_per_request} calls")
|
||||
|
||||
# 3. Check content in arguments
|
||||
for arg in list(args) + list(kwargs.values()):
|
||||
if isinstance(arg, str):
|
||||
matched = policy.check_content(arg)
|
||||
if matched:
|
||||
raise PermissionError(f"Blocked pattern detected: {matched}")
|
||||
|
||||
# 4. Execute and audit
|
||||
start = time.monotonic()
|
||||
try:
|
||||
result = await func(*args, **kwargs)
|
||||
if audit_trail is not None:
|
||||
audit_trail.append({
|
||||
"tool": tool_name,
|
||||
"action": "allowed",
|
||||
"duration_ms": (time.monotonic() - start) * 1000,
|
||||
"timestamp": time.time()
|
||||
})
|
||||
return result
|
||||
except Exception as e:
|
||||
if audit_trail is not None:
|
||||
audit_trail.append({
|
||||
"tool": tool_name,
|
||||
"action": "error",
|
||||
"error": str(e),
|
||||
"timestamp": time.time()
|
||||
})
|
||||
raise
|
||||
|
||||
return wrapper
|
||||
return decorator
|
||||
|
||||
|
||||
# Usage with any agent framework
|
||||
audit_log = []
|
||||
policy = GovernancePolicy(
|
||||
name="search-agent",
|
||||
allowed_tools=["search", "summarize"],
|
||||
blocked_patterns=[r"(?i)password"],
|
||||
max_calls_per_request=10
|
||||
)
|
||||
|
||||
@govern(policy, audit_trail=audit_log)
|
||||
async def search(query: str) -> str:
|
||||
"""Search documents — governed by policy."""
|
||||
return f"Results for: {query}"
|
||||
|
||||
# Passes: search("latest quarterly report")
|
||||
# Blocked: search("show me the admin password")
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Pattern 4: Trust Scoring
|
||||
|
||||
Track agent reliability over time with decay-based trust scores:
|
||||
|
||||
```python
|
||||
from dataclasses import dataclass, field
|
||||
import math
|
||||
import time
|
||||
|
||||
@dataclass
|
||||
class TrustScore:
|
||||
"""Trust score with temporal decay."""
|
||||
score: float = 0.5 # 0.0 (untrusted) to 1.0 (fully trusted)
|
||||
successes: int = 0
|
||||
failures: int = 0
|
||||
last_updated: float = field(default_factory=time.time)
|
||||
|
||||
def record_success(self, reward: float = 0.05):
|
||||
self.successes += 1
|
||||
self.score = min(1.0, self.score + reward * (1 - self.score))
|
||||
self.last_updated = time.time()
|
||||
|
||||
def record_failure(self, penalty: float = 0.15):
|
||||
self.failures += 1
|
||||
self.score = max(0.0, self.score - penalty * self.score)
|
||||
self.last_updated = time.time()
|
||||
|
||||
def current(self, decay_rate: float = 0.001) -> float:
|
||||
"""Get score with temporal decay — trust erodes without activity."""
|
||||
elapsed = time.time() - self.last_updated
|
||||
decay = math.exp(-decay_rate * elapsed)
|
||||
return self.score * decay
|
||||
|
||||
@property
|
||||
def reliability(self) -> float:
|
||||
total = self.successes + self.failures
|
||||
return self.successes / total if total > 0 else 0.0
|
||||
|
||||
|
||||
# Usage in multi-agent systems
|
||||
trust = TrustScore()
|
||||
|
||||
# Agent completes tasks successfully
|
||||
trust.record_success() # 0.525
|
||||
trust.record_success() # 0.549
|
||||
|
||||
# Agent makes an error
|
||||
trust.record_failure() # 0.467
|
||||
|
||||
# Gate sensitive operations on trust
|
||||
if trust.current() >= 0.7:
|
||||
# Allow autonomous operation
|
||||
pass
|
||||
elif trust.current() >= 0.4:
|
||||
# Allow with human oversight
|
||||
pass
|
||||
else:
|
||||
# Deny or require explicit approval
|
||||
pass
|
||||
```
|
||||
|
||||
**Multi-agent trust**: In systems where agents delegate to other agents, each agent maintains trust scores for its delegates:
|
||||
|
||||
```python
|
||||
class AgentTrustRegistry:
|
||||
def __init__(self):
|
||||
self.scores: dict[str, TrustScore] = {}
|
||||
|
||||
def get_trust(self, agent_id: str) -> TrustScore:
|
||||
if agent_id not in self.scores:
|
||||
self.scores[agent_id] = TrustScore()
|
||||
return self.scores[agent_id]
|
||||
|
||||
def most_trusted(self, agents: list[str]) -> str:
|
||||
return max(agents, key=lambda a: self.get_trust(a).current())
|
||||
|
||||
def meets_threshold(self, agent_id: str, threshold: float) -> bool:
|
||||
return self.get_trust(agent_id).current() >= threshold
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Pattern 5: Audit Trail
|
||||
|
||||
Append-only audit log for all agent actions — critical for compliance and debugging:
|
||||
|
||||
```python
|
||||
from dataclasses import dataclass, field
|
||||
import json
|
||||
import time
|
||||
|
||||
@dataclass
|
||||
class AuditEntry:
|
||||
timestamp: float
|
||||
agent_id: str
|
||||
tool_name: str
|
||||
action: str # "allowed", "denied", "error"
|
||||
policy_name: str
|
||||
details: dict = field(default_factory=dict)
|
||||
|
||||
class AuditTrail:
|
||||
"""Append-only audit trail for agent governance events."""
|
||||
def __init__(self):
|
||||
self._entries: list[AuditEntry] = []
|
||||
|
||||
def log(self, agent_id: str, tool_name: str, action: str,
|
||||
policy_name: str, **details):
|
||||
self._entries.append(AuditEntry(
|
||||
timestamp=time.time(),
|
||||
agent_id=agent_id,
|
||||
tool_name=tool_name,
|
||||
action=action,
|
||||
policy_name=policy_name,
|
||||
details=details
|
||||
))
|
||||
|
||||
def denied(self) -> list[AuditEntry]:
|
||||
"""Get all denied actions — useful for security review."""
|
||||
return [e for e in self._entries if e.action == "denied"]
|
||||
|
||||
def by_agent(self, agent_id: str) -> list[AuditEntry]:
|
||||
return [e for e in self._entries if e.agent_id == agent_id]
|
||||
|
||||
def export_jsonl(self, path: str):
|
||||
"""Export as JSON Lines for log aggregation systems."""
|
||||
with open(path, "w") as f:
|
||||
for entry in self._entries:
|
||||
f.write(json.dumps({
|
||||
"timestamp": entry.timestamp,
|
||||
"agent_id": entry.agent_id,
|
||||
"tool": entry.tool_name,
|
||||
"action": entry.action,
|
||||
"policy": entry.policy_name,
|
||||
**entry.details
|
||||
}) + "\n")
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Pattern 6: Framework Integration
|
||||
|
||||
### PydanticAI
|
||||
|
||||
```python
|
||||
from pydantic_ai import Agent
|
||||
|
||||
policy = GovernancePolicy(
|
||||
name="support-bot",
|
||||
allowed_tools=["search_docs", "create_ticket"],
|
||||
blocked_patterns=[r"(?i)(ssn|social\s+security|credit\s+card)"],
|
||||
max_calls_per_request=20
|
||||
)
|
||||
|
||||
agent = Agent("openai:gpt-4o", system_prompt="You are a support assistant.")
|
||||
|
||||
@agent.tool
|
||||
@govern(policy)
|
||||
async def search_docs(ctx, query: str) -> str:
|
||||
"""Search knowledge base — governed."""
|
||||
return await kb.search(query)
|
||||
|
||||
@agent.tool
|
||||
@govern(policy)
|
||||
async def create_ticket(ctx, title: str, body: str) -> str:
|
||||
"""Create support ticket — governed."""
|
||||
return await tickets.create(title=title, body=body)
|
||||
```
|
||||
|
||||
### CrewAI
|
||||
|
||||
```python
|
||||
from crewai import Agent, Task, Crew
|
||||
|
||||
policy = GovernancePolicy(
|
||||
name="research-crew",
|
||||
allowed_tools=["search", "analyze"],
|
||||
max_calls_per_request=30
|
||||
)
|
||||
|
||||
# Apply governance at the crew level
|
||||
def governed_crew_run(crew: Crew, policy: GovernancePolicy):
|
||||
"""Wrap crew execution with governance checks."""
|
||||
audit = AuditTrail()
|
||||
for agent in crew.agents:
|
||||
for tool in agent.tools:
|
||||
original = tool.func
|
||||
tool.func = govern(policy, audit_trail=audit._entries)(original)
|
||||
result = crew.kickoff()
|
||||
return result, audit
|
||||
```
|
||||
|
||||
### OpenAI Agents SDK
|
||||
|
||||
```python
|
||||
from agents import Agent, function_tool
|
||||
|
||||
policy = GovernancePolicy(
|
||||
name="coding-agent",
|
||||
allowed_tools=["read_file", "write_file", "run_tests"],
|
||||
blocked_tools=["shell_exec"],
|
||||
max_calls_per_request=50
|
||||
)
|
||||
|
||||
@function_tool
|
||||
@govern(policy)
|
||||
async def read_file(path: str) -> str:
|
||||
"""Read file contents — governed."""
|
||||
return open(path).read()
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Governance Levels
|
||||
|
||||
Match governance strictness to risk level:
|
||||
|
||||
| Level | Controls | Use Case |
|
||||
|-------|----------|----------|
|
||||
| **Open** | Audit only, no restrictions | Internal dev/testing |
|
||||
| **Standard** | Tool allowlist + content filters | General production agents |
|
||||
| **Strict** | All controls + human approval for sensitive ops | Financial, healthcare, legal |
|
||||
| **Locked** | Allowlist only, no dynamic tools, full audit | Compliance-critical systems |
|
||||
|
||||
---
|
||||
|
||||
## Best Practices
|
||||
|
||||
| Practice | Rationale |
|
||||
|----------|-----------|
|
||||
| **Policy as configuration** | Store policies in YAML/JSON, not hardcoded — enables change without deploys |
|
||||
| **Most-restrictive-wins** | When composing policies, deny always overrides allow |
|
||||
| **Pre-flight intent check** | Classify intent *before* tool execution, not after |
|
||||
| **Trust decay** | Trust scores should decay over time — require ongoing good behavior |
|
||||
| **Append-only audit** | Never modify or delete audit entries — immutability enables compliance |
|
||||
| **Fail closed** | If governance check errors, deny the action rather than allowing it |
|
||||
| **Separate policy from logic** | Governance enforcement should be independent of agent business logic |
|
||||
|
||||
---
|
||||
|
||||
## Quick Start Checklist
|
||||
|
||||
```markdown
|
||||
## Agent Governance Implementation Checklist
|
||||
|
||||
### Setup
|
||||
- [ ] Define governance policy (allowed tools, blocked patterns, rate limits)
|
||||
- [ ] Choose governance level (open/standard/strict/locked)
|
||||
- [ ] Set up audit trail storage
|
||||
|
||||
### Implementation
|
||||
- [ ] Add @govern decorator to all tool functions
|
||||
- [ ] Add intent classification to user input processing
|
||||
- [ ] Implement trust scoring for multi-agent interactions
|
||||
- [ ] Wire up audit trail export
|
||||
|
||||
### Validation
|
||||
- [ ] Test that blocked tools are properly denied
|
||||
- [ ] Test that content filters catch sensitive patterns
|
||||
- [ ] Test rate limiting behavior
|
||||
- [ ] Verify audit trail captures all events
|
||||
- [ ] Test policy composition (most-restrictive-wins)
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Related Resources
|
||||
|
||||
- [Agent-OS Governance Engine](https://github.com/imran-siddique/agent-os) — Full governance framework
|
||||
- [AgentMesh Integrations](https://github.com/imran-siddique/agentmesh-integrations) — Framework-specific packages
|
||||
- [OWASP Top 10 for LLM Applications](https://owasp.org/www-project-top-10-for-large-language-model-applications/)
|
||||
Reference in New Issue
Block a user