Add Dataverse SDK for Python Collection (#458)

* Add Dataverse SDK for Python: 5 new instruction files (error handling, authentication, performance, testing, use cases) + 4 prompts and updated READMEs

* Delete COLLECTION_STATUS.md

* Delete ENHANCEMENT_SUMMARY.md
This commit is contained in:
Troy Simeon Taylor
2025-12-04 18:38:34 -05:00
committed by GitHub
parent b81a3dc5a4
commit a89019fb3b
23 changed files with 6983 additions and 0 deletions

View File

@@ -0,0 +1,689 @@
# Dataverse SDK for Python - Advanced Features Guide
## Overview
Comprehensive guide to advanced Dataverse SDK features including enums, complex filtering, SQL queries, metadata operations, and production patterns. Based on official Microsoft walkthrough examples.
## 1. Working with Option Sets & Picklists
### Using IntEnum for Type Safety
```python
from enum import IntEnum
from PowerPlatform.Dataverse.client import DataverseClient
# Define enum for picklist
class Priority(IntEnum):
LOW = 1
MEDIUM = 2
HIGH = 3
class Priority(IntEnum):
COLD = 1
WARM = 2
HOT = 3
# Create record with enum value
record_data = {
"new_title": "Important Task",
"new_priority": Priority.HIGH, # Automatically converted to int
}
ids = client.create("new_tasktable", record_data)
```
### Handling Formatted Values
```python
# When retrieving records, picklist values are returned as integers
record = client.get("new_tasktable", record_id)
priority_int = record.get("new_priority") # Returns: 3
priority_formatted = record.get("new_priority@OData.Community.Display.V1.FormattedValue") # Returns: "High"
print(f"Priority (Raw): {priority_int}")
print(f"Priority (Formatted): {priority_formatted}")
```
### Creating Tables with Enum Columns
```python
from enum import IntEnum
class TaskStatus(IntEnum):
NOT_STARTED = 0
IN_PROGRESS = 1
COMPLETED = 2
class TaskPriority(IntEnum):
LOW = 1
MEDIUM = 2
HIGH = 3
# Pass enum classes as column types
columns = {
"new_Title": "string",
"new_Description": "string",
"new_Status": TaskStatus, # Creates option set column
"new_Priority": TaskPriority, # Creates option set column
"new_Amount": "decimal",
"new_DueDate": "datetime"
}
table_info = client.create_table(
"new_TaskManagement",
primary_column_schema_name="new_Title",
columns=columns
)
print(f"Created table with {len(columns)} columns including enums")
```
---
## 2. Advanced Filtering & Querying
### Complex OData Filters
```python
# Simple equality
filter1 = "name eq 'Contoso'"
# Comparison operators
filter2 = "creditlimit gt 50000"
filter3 = "createdon lt 2024-01-01"
# String operations
filter4 = "contains(name, 'Ltd')"
filter5 = "startswith(name, 'Con')"
filter6 = "endswith(name, 'Ltd')"
# Multiple conditions with AND
filter7 = "(name eq 'Contoso') and (creditlimit gt 50000)"
# Multiple conditions with OR
filter8 = "(industrycode eq 1) or (industrycode eq 2)"
# Negation
filter9 = "not(statecode eq 1)"
# Complex nested conditions
filter10 = "(creditlimit gt 50000) and ((industrycode eq 1) or (industrycode eq 2))"
# Using in get() calls
results = client.get("account", filter=filter10, select=["name", "creditlimit"])
```
### Retrieve with Related Records (Expand)
```python
# Expand parent account information
accounts = client.get(
"account",
filter="creditlimit gt 100000",
expand=["parentaccountid($select=name,creditlimit)"],
select=["accountid", "name", "creditlimit", "parentaccountid"]
)
for page in accounts:
for account in page:
parent_name = account.get("_parentaccountid_value")
print(f"Account: {account['name']}, Parent: {parent_name}")
```
### SQL Queries for Complex Analysis
```python
# SQL queries are read-only but powerful for analytics
sql = """
SELECT
a.name as AccountName,
a.creditlimit,
COUNT(c.contactid) as ContactCount
FROM account a
LEFT JOIN contact c ON a.accountid = c.parentcustomerid
WHERE a.creditlimit > 50000
GROUP BY a.accountid, a.name, a.creditlimit
ORDER BY ContactCount DESC
"""
results = client.query_sql(sql)
for row in results:
print(f"{row['AccountName']}: {row['ContactCount']} contacts")
```
### Paging with SQL Queries
```python
# SQL queries return paginated results by default
sql = "SELECT TOP 10000 name, creditlimit FROM account ORDER BY name"
all_results = []
for page in client.query_sql(sql):
all_results.extend(page)
print(f"Retrieved {len(page)} rows")
print(f"Total: {len(all_results)} rows")
```
---
## 3. Metadata Operations
### Creating Complex Tables
```python
from enum import IntEnum
from datetime import datetime
class TaskStatus(IntEnum):
NEW = 1
OPEN = 2
CLOSED = 3
# Create table with diverse column types
columns = {
"new_Subject": "string",
"new_Description": "string",
"new_Category": "string",
"new_Priority": "int",
"new_Status": TaskStatus,
"new_EstimatedHours": "decimal",
"new_DueDate": "datetime",
"new_IsOverdue": "bool",
"new_Notes": "string"
}
table_info = client.create_table(
"new_WorkItem",
primary_column_schema_name="new_Subject",
columns=columns
)
print(f"✓ Created table: {table_info['table_schema_name']}")
print(f" Primary Key: {table_info['primary_id_attribute']}")
print(f" Columns: {', '.join(table_info.get('columns_created', []))}")
```
### Inspecting Table Metadata
```python
# Get detailed table information
table_info = client.get_table_info("account")
print(f"Schema Name: {table_info.get('table_schema_name')}")
print(f"Logical Name: {table_info.get('table_logical_name')}")
print(f"Display Name: {table_info.get('table_display_name')}")
print(f"Entity Set: {table_info.get('entity_set_name')}")
print(f"Primary ID: {table_info.get('primary_id_attribute')}")
print(f"Primary Name: {table_info.get('primary_name_attribute')}")
```
### Listing All Tables in Organization
```python
# Retrieve all tables (may be large result set)
all_tables = []
for page in client.list_tables():
all_tables.extend(page)
print(f"Retrieved {len(page)} tables in this page")
print(f"\nTotal tables: {len(all_tables)}")
# Filter for custom tables
custom_tables = [t for t in all_tables if t['table_schema_name'].startswith('new_')]
print(f"Custom tables: {len(custom_tables)}")
for table in custom_tables[:5]:
print(f" - {table['table_schema_name']}")
```
### Managing Columns Dynamically
```python
# Add columns to existing table
client.create_columns("new_TaskTable", {
"new_Department": "string",
"new_Budget": "decimal",
"new_ApprovedDate": "datetime"
})
# Delete specific columns
client.delete_columns("new_TaskTable", [
"new_OldField1",
"new_OldField2"
])
# Delete entire table
client.delete_table("new_TaskTable")
```
---
## 4. Single vs. Multiple Record Operations
### Single Record Operations
```python
# Create single
record_id = client.create("account", {"name": "Contoso"})[0]
# Get single by ID
account = client.get("account", record_id)
# Update single
client.update("account", record_id, {"creditlimit": 100000})
# Delete single
client.delete("account", record_id)
```
### Multiple Record Operations
#### Create Multiple Records
```python
# Create list of records
records = [
{"name": "Company A", "creditlimit": 50000},
{"name": "Company B", "creditlimit": 75000},
{"name": "Company C", "creditlimit": 100000},
]
created_ids = client.create("account", records)
print(f"Created {len(created_ids)} records: {created_ids}")
```
#### Update Multiple Records (Broadcast)
```python
# Apply same update to multiple records
account_ids = ["id1", "id2", "id3"]
client.update("account", account_ids, {
"industrycode": 1, # Retail
"accountmanagerid": "manager-guid"
})
print(f"Updated {len(account_ids)} records with same data")
```
#### Delete Multiple Records
```python
# Delete multiple records with optimized bulk delete
record_ids = ["id1", "id2", "id3", "id4", "id5"]
client.delete("account", record_ids, use_bulk_delete=True)
print(f"Deleted {len(record_ids)} records")
```
---
## 5. Data Manipulation Patterns
### Retrieve, Modify, Update Pattern
```python
# Retrieve single record
account = client.get("account", record_id)
# Modify locally
original_amount = account.get("creditlimit", 0)
new_amount = original_amount + 10000
# Update back
client.update("account", record_id, {"creditlimit": new_amount})
print(f"Updated creditlimit: {original_amount}{new_amount}")
```
### Batch Processing Pattern
```python
# Retrieve in batches with paging
batch_size = 100
processed = 0
for page in client.get("account", top=batch_size, filter="statecode eq 0"):
# Process each page
batch_updates = []
for account in page:
if account.get("creditlimit", 0) > 100000:
batch_updates.append({
"id": account['accountid'],
"accountmanagerid": "senior-manager-guid"
})
# Batch update
for update in batch_updates:
client.update("account", update['id'], {"accountmanagerid": update['accountmanagerid']})
processed += 1
print(f"Processed {processed} accounts")
```
### Conditional Operations Pattern
```python
from PowerPlatform.Dataverse.core.errors import DataverseError
def safe_update(table, record_id, data, check_field=None, check_value=None):
"""Update with pre-condition check."""
try:
if check_field and check_value:
# Verify condition before updating
record = client.get(table, record_id, select=[check_field])
if record.get(check_field) != check_value:
print(f"Condition not met: {check_field} != {check_value}")
return False
client.update(table, record_id, data)
return True
except DataverseError as e:
print(f"Update failed: {e}")
return False
# Usage
safe_update("account", account_id, {"creditlimit": 100000}, "statecode", 0)
```
---
## 6. Formatted Values & Display
### Retrieving Formatted Values
```python
# When you retrieve a record with option set or money fields,
# you can request formatted values for display
record = client.get(
"account",
record_id,
select=["name", "creditlimit", "industrycode"]
)
# Raw values
name = record.get("name") # "Contoso Ltd"
limit = record.get("creditlimit") # 100000.00
industry = record.get("industrycode") # 1
# Formatted values (returned in OData response)
limit_formatted = record.get("creditlimit@OData.Community.Display.V1.FormattedValue")
industry_formatted = record.get("industrycode@OData.Community.Display.V1.FormattedValue")
print(f"Name: {name}")
print(f"Credit Limit: {limit_formatted or limit}") # "100,000.00" or 100000.00
print(f"Industry: {industry_formatted or industry}") # "Technology" or 1
```
---
## 7. Performance Optimization
### Column Selection Strategy
```python
# ❌ Retrieve all columns (slow, uses more bandwidth)
account = client.get("account", record_id)
# ✅ Retrieve only needed columns (fast, efficient)
account = client.get(
"account",
record_id,
select=["accountid", "name", "creditlimit", "telephone1"]
)
```
### Filtering on Server
```python
# ❌ Retrieve all, filter locally (inefficient)
all_accounts = []
for page in client.get("account"):
all_accounts.extend(page)
large_accounts = [a for a in all_accounts if a.get("creditlimit", 0) > 100000]
# ✅ Filter on server, retrieve only matches (efficient)
large_accounts = []
for page in client.get("account", filter="creditlimit gt 100000"):
large_accounts.extend(page)
```
### Paging Large Result Sets
```python
# ❌ Load all results at once (memory intensive)
all_accounts = list(client.get("account"))
# ✅ Process in pages (memory efficient)
processed = 0
for page in client.get("account", top=1000):
for account in page:
process_account(account)
processed += 1
print(f"Processed: {processed}")
```
### Batch Operations
```python
# ❌ Individual creates in loop (slow)
for account_data in accounts:
client.create("account", account_data)
# ✅ Batch create (fast, optimized)
created_ids = client.create("account", accounts)
```
---
## 8. Error Handling in Advanced Scenarios
### Handling Metadata Errors
```python
from PowerPlatform.Dataverse.core.errors import MetadataError
try:
table_info = client.create_table("new_CustomTable", {"name": "string"})
except MetadataError as e:
print(f"Metadata operation failed: {e}")
# Handle table creation specific errors
```
### Handling Validation Errors
```python
from PowerPlatform.Dataverse.core.errors import ValidationError
try:
client.create("account", {"name": None}) # Invalid: name required
except ValidationError as e:
print(f"Validation error: {e}")
# Handle validation specific errors
```
### Handling HTTP Errors
```python
from PowerPlatform.Dataverse.core.errors import HttpError
try:
client.get("account", "invalid-guid")
except HttpError as e:
if "404" in str(e):
print("Record not found")
elif "403" in str(e):
print("Access denied")
else:
print(f"HTTP error: {e}")
```
### Handling SQL Errors
```python
from PowerPlatform.Dataverse.core.errors import SQLParseError
try:
results = client.query_sql("SELECT INVALID SYNTAX")
except SQLParseError as e:
print(f"SQL parse error: {e}")
```
---
## 9. Working with Relationships
### Creating Related Records
```python
# Create parent account
parent_ids = client.create("account", {
"name": "Parent Company",
"creditlimit": 500000
})
parent_id = parent_ids[0]
# Create child accounts with parent reference
children = [
{"name": "Subsidiary A", "parentaccountid": parent_id},
{"name": "Subsidiary B", "parentaccountid": parent_id},
{"name": "Subsidiary C", "parentaccountid": parent_id},
]
child_ids = client.create("account", children)
print(f"Created {len(child_ids)} child accounts")
```
### Querying Related Records
```python
# Get account with child accounts
account = client.get("account", account_id)
# Query child accounts
children = client.get(
"account",
filter=f"parentaccountid eq {account_id}",
select=["accountid", "name", "creditlimit"]
)
for page in children:
for child in page:
print(f" - {child['name']}: ${child['creditlimit']}")
```
---
## 10. Cleanup & Housekeeping
### Clearing SDK Cache
```python
# After bulk operations, clear metadata cache
client.flush_cache()
# Useful after:
# - Massive delete operations
# - Table/column creation or deletion
# - Metadata synchronization across environments
```
### Safe Table Deletion
```python
from PowerPlatform.Dataverse.core.errors import MetadataError
def delete_table_safe(table_name):
"""Delete table with error handling."""
try:
# Verify table exists
table_info = client.get_table_info(table_name)
if not table_info:
print(f"Table {table_name} not found")
return False
# Delete
client.delete_table(table_name)
print(f"✓ Deleted table: {table_name}")
# Clear cache
client.flush_cache()
return True
except MetadataError as e:
print(f"❌ Failed to delete table: {e}")
return False
delete_table_safe("new_TempTable")
```
---
## 11. Comprehensive Example: Full Workflow
```python
from enum import IntEnum
from azure.identity import InteractiveBrowserCredential
from PowerPlatform.Dataverse.client import DataverseClient
from PowerPlatform.Dataverse.core.errors import DataverseError, MetadataError
class TaskStatus(IntEnum):
NEW = 1
IN_PROGRESS = 2
COMPLETED = 3
class TaskPriority(IntEnum):
LOW = 1
MEDIUM = 2
HIGH = 3
# Setup
credential = InteractiveBrowserCredential()
client = DataverseClient("https://yourorg.crm.dynamics.com", credential)
try:
# 1. Create table
print("Creating table...")
table_info = client.create_table(
"new_ProjectTask",
primary_column_schema_name="new_Title",
columns={
"new_Description": "string",
"new_Status": TaskStatus,
"new_Priority": TaskPriority,
"new_DueDate": "datetime",
"new_EstimatedHours": "decimal"
}
)
print(f"✓ Created table: {table_info['table_schema_name']}")
# 2. Create records
print("\nCreating tasks...")
tasks = [
{
"new_Title": "Design system",
"new_Description": "Create design system architecture",
"new_Status": TaskStatus.NEW,
"new_Priority": TaskPriority.HIGH,
"new_EstimatedHours": 40.0
},
{
"new_Title": "Implement UI",
"new_Description": "Build React components",
"new_Status": TaskStatus.IN_PROGRESS,
"new_Priority": TaskPriority.HIGH,
"new_EstimatedHours": 80.0
},
{
"new_Title": "Write tests",
"new_Description": "Unit and integration tests",
"new_Status": TaskStatus.NEW,
"new_Priority": TaskPriority.MEDIUM,
"new_EstimatedHours": 30.0
}
]
task_ids = client.create("new_ProjectTask", tasks)
print(f"✓ Created {len(task_ids)} tasks")
# 3. Query and filter
print("\nQuerying high-priority tasks...")
high_priority = client.get(
"new_ProjectTask",
filter="new_priority eq 3",
select=["new_Title", "new_Priority", "new_EstimatedHours"]
)
for page in high_priority:
for task in page:
print(f" - {task['new_title']}: {task['new_estimatedhours']} hours")
# 4. Update records
print("\nUpdating task status...")
client.update("new_ProjectTask", task_ids[1], {
"new_Status": TaskStatus.COMPLETED,
"new_EstimatedHours": 85.5
})
print("✓ Updated task status")
# 5. Cleanup
print("\nCleaning up...")
client.delete_table("new_ProjectTask")
print("✓ Deleted table")
# Clear cache
client.flush_cache()
except (MetadataError, DataverseError) as e:
print(f"❌ Error: {e}")
```
---
## Reference
- [Official Walkthrough Example](https://github.com/microsoft/PowerPlatform-DataverseClient-Python/blob/main/examples/advanced/walkthrough.py)
- [OData Filter Syntax](https://learn.microsoft.com/en-us/power-apps/developer/data-platform/webapi/query-data-web-api)
- [Table/Column Metadata](https://learn.microsoft.com/en-us/power-apps/developer/data-platform/webapi/create-update-entity-definitions-using-web-api)

View File

@@ -0,0 +1,563 @@
# Dataverse SDK for Python - Agentic Workflows Guide
## ⚠️ PREVIEW FEATURE NOTICE
**Status**: This feature is in **Public Preview** as of December 2025
**Availability**: General Availability (GA) date TBD
**Documentation**: Complete implementation details forthcoming
This guide covers the conceptual framework and planned capabilities for building agentic workflows with the Dataverse SDK for Python. Specific APIs and implementations may change before general availability.
---
## 1. Overview: Agentic Workflows with Dataverse
### What are Agentic Workflows?
Agentic workflows are autonomous, intelligent processes where:
- **Agents** make decisions and take actions based on data and rules
- **Workflows** orchestrate complex, multi-step operations
- **Dataverse** serves as the central source of truth for enterprise data
The Dataverse SDK for Python is designed to enable data scientists and developers to build these intelligent systems without .NET expertise.
### Key Capabilities (Planned)
The SDK is strategically positioned to support:
1. **Autonomous Data Agents** - Query, update, and evaluate data quality independently
2. **Form Prediction & Autofill** - Pre-fill forms based on data patterns and context
3. **Model Context Protocol (MCP)** Support - Enable standardized agent-to-tool communication
4. **Agent-to-Agent (A2A)** Collaboration - Multiple agents working together on complex tasks
5. **Semantic Modeling** - Natural language understanding of data relationships
6. **Secure Impersonation** - Run operations on behalf of specific users with audit trails
7. **Compliance Built-in** - Data governance and retention policies enforced
---
## 2. Architecture Patterns for Agentic Systems
### Multi-Agent Pattern
```python
# Conceptual pattern - specific APIs pending GA
class DataQualityAgent:
"""Autonomous agent that monitors and improves data quality."""
def __init__(self, client):
self.client = client
async def evaluate_data_quality(self, table_name):
"""Evaluate data quality metrics for a table."""
records = await self.client.get(table_name)
metrics = {
'total_records': len(records),
'null_values': sum(1 for r in records if None in r.values()),
'duplicate_records': await self._find_duplicates(table_name)
}
return metrics
async def auto_remediate(self, issues):
"""Automatically fix identified data quality issues."""
# Agent autonomously decides on remediation actions
pass
class DataEnrichmentAgent:
"""Autonomous agent that enriches data from external sources."""
async def enrich_accounts(self):
"""Enrich account data with market information."""
accounts = await self.client.get("account")
for account in accounts:
enrichment = await self._lookup_market_data(account['name'])
await self.client.update("account", account['id'], enrichment)
```
### Agent Orchestration Pattern
```python
# Conceptual pattern - specific APIs pending GA
class DataPipeline:
"""Orchestrates multiple agents working together."""
def __init__(self, client):
self.quality_agent = DataQualityAgent(client)
self.enrichment_agent = DataEnrichmentAgent(client)
self.sync_agent = SyncAgent(client)
async def run(self, table_name):
"""Execute multi-agent workflow."""
# Step 1: Quality check
print("Running quality checks...")
issues = await self.quality_agent.evaluate_data_quality(table_name)
# Step 2: Enrich data
print("Enriching data...")
await self.enrichment_agent.enrich_accounts()
# Step 3: Sync to external systems
print("Syncing to external systems...")
await self.sync_agent.sync_to_external_db(table_name)
```
---
## 3. Model Context Protocol (MCP) Support (Planned)
### What is MCP?
The Model Context Protocol (MCP) is an open standard for:
- **Tool Definition** - Describe what tools/capabilities are available
- **Tool Invocation** - Allow LLMs to call tools with parameters
- **Context Management** - Manage context between agent and tools
- **Error Handling** - Standardized error responses
### MCP Integration Pattern (Conceptual)
```python
# Conceptual pattern - specific APIs pending GA
from dataverse_mcp import DataverseMCPServer
# Define available tools
tools = [
{
"name": "query_accounts",
"description": "Query accounts with filters",
"parameters": {
"filter": "OData filter expression",
"select": "Columns to retrieve",
"top": "Maximum records"
}
},
{
"name": "create_account",
"description": "Create a new account",
"parameters": {
"name": "Account name",
"credit_limit": "Credit limit amount"
}
},
{
"name": "update_account",
"description": "Update account fields",
"parameters": {
"account_id": "Account GUID",
"updates": "Dictionary of field updates"
}
}
]
# Create MCP server
server = DataverseMCPServer(client, tools=tools)
# LLMs can now use Dataverse tools
await server.handle_tool_call("query_accounts", {
"filter": "creditlimit gt 100000",
"select": ["name", "creditlimit"]
})
```
---
## 4. Agent-to-Agent (A2A) Collaboration (Planned)
### A2A Communication Pattern
```python
# Conceptual pattern - specific APIs pending GA
class DataValidationAgent:
"""Validates data before downstream agents process it."""
async def validate_and_notify(self, data):
"""Validate data and notify other agents."""
if await self._is_valid(data):
# Publish event that other agents can subscribe to
await self.publish_event("data_validated", data)
else:
await self.publish_event("validation_failed", data)
class DataProcessingAgent:
"""Waits for valid data from validation agent."""
async def __init__(self):
self.subscribe("data_validated", self.process_data)
async def process_data(self, data):
"""Process already-validated data."""
# Agent can safely assume data is valid
result = await self._transform(data)
await self.publish_event("processing_complete", result)
```
---
## 5. Building Autonomous Data Agents
### Data Quality Agent Example
```python
# Working example with current SDK features
from PowerPlatform.Dataverse.client import DataverseClient
from azure.identity import InteractiveBrowserCredential
import json
class DataQualityAgent:
"""Monitor and report on data quality."""
def __init__(self, org_url, credential):
self.client = DataverseClient(org_url, credential)
def analyze_completeness(self, table_name, required_fields):
"""Analyze field completeness."""
records = self.client.get(
table_name,
select=required_fields
)
missing_by_field = {field: 0 for field in required_fields}
total = 0
for page in records:
for record in page:
total += 1
for field in required_fields:
if field not in record or record[field] is None:
missing_by_field[field] += 1
# Calculate completeness percentage
completeness = {
field: ((total - count) / total * 100)
for field, count in missing_by_field.items()
}
return {
'table': table_name,
'total_records': total,
'completeness': completeness,
'missing_counts': missing_by_field
}
def detect_duplicates(self, table_name, key_fields):
"""Detect potential duplicate records."""
records = self.client.get(table_name, select=key_fields)
all_records = []
for page in records:
all_records.extend(page)
seen = {}
duplicates = []
for record in all_records:
key = tuple(record.get(f) for f in key_fields)
if key in seen:
duplicates.append({
'original_id': seen[key],
'duplicate_id': record.get('id'),
'key': key
})
else:
seen[key] = record.get('id')
return {
'table': table_name,
'duplicate_count': len(duplicates),
'duplicates': duplicates
}
def generate_quality_report(self, table_name):
"""Generate comprehensive quality report."""
completeness = self.analyze_completeness(
table_name,
['name', 'telephone1', 'emailaddress1']
)
duplicates = self.detect_duplicates(
table_name,
['name', 'emailaddress1']
)
return {
'timestamp': pd.Timestamp.now().isoformat(),
'table': table_name,
'completeness': completeness,
'duplicates': duplicates
}
# Usage
client = DataverseClient("https://<org>.crm.dynamics.com", InteractiveBrowserCredential())
agent = DataQualityAgent("https://<org>.crm.dynamics.com", InteractiveBrowserCredential())
report = agent.generate_quality_report("account")
print(json.dumps(report, indent=2))
```
### Form Prediction Agent Example
```python
# Conceptual pattern using current SDK capabilities
from sklearn.ensemble import RandomForestRegressor
import pandas as pd
class FormPredictionAgent:
"""Predict and autofill form values."""
def __init__(self, org_url, credential):
self.client = DataverseClient(org_url, credential)
self.model = None
def train_on_historical_data(self, table_name, features, target):
"""Train prediction model on historical data."""
# Collect training data
records = []
for page in self.client.get(table_name, select=features + [target]):
records.extend(page)
df = pd.DataFrame(records)
# Train model
X = df[features].fillna(0)
y = df[target]
self.model = RandomForestRegressor()
self.model.fit(X, y)
return self.model.score(X, y)
def predict_field_values(self, table_name, record_id, features_data):
"""Predict missing field values."""
if self.model is None:
raise ValueError("Model not trained. Call train_on_historical_data first.")
# Predict
prediction = self.model.predict([features_data])[0]
# Return prediction with confidence
return {
'record_id': record_id,
'predicted_value': prediction,
'confidence': self.model.score([features_data], [prediction])
}
```
---
## 6. Integration with AI/ML Services
### LLM Integration Pattern
```python
# Using LLM to interpret Dataverse data
from openai import OpenAI
class DataInsightAgent:
"""Use LLM to generate insights from Dataverse data."""
def __init__(self, org_url, credential, openai_key):
self.client = DataverseClient(org_url, credential)
self.llm = OpenAI(api_key=openai_key)
def analyze_with_llm(self, table_name, sample_size=100):
"""Analyze data using LLM."""
# Get sample data
records = []
count = 0
for page in self.client.get(table_name):
records.extend(page)
count += len(page)
if count >= sample_size:
break
# Create summary for LLM
summary = f"""
Table: {table_name}
Total records sampled: {len(records)}
Sample data:
{json.dumps(records[:5], indent=2, default=str)}
Provide insights about this data.
"""
# Ask LLM
response = self.llm.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": summary}]
)
return response.choices[0].message.content
```
---
## 7. Secure Impersonation & Audit Trails
### Planned Capabilities
The SDK will support running operations on behalf of specific users:
```python
# Conceptual pattern - specific APIs pending GA
from dataverse_security import ImpersonationContext
# Run as different user
with ImpersonationContext(client, user_id="user-guid"):
# All operations run as this user
client.create("account", {"name": "New Account"})
# Audit trail: Created by [user-guid] at [timestamp]
# Retrieve audit trail
audit_log = client.get_audit_trail(
table="account",
record_id="record-guid",
action="create"
)
```
---
## 8. Compliance and Data Governance
### Planned Governance Features
```python
# Conceptual pattern - specific APIs pending GA
from dataverse_governance import DataGovernance
# Define retention policy
governance = DataGovernance(client)
governance.set_retention_policy(
table="account",
retention_days=365
)
# Define data classification
governance.classify_columns(
table="account",
classifications={
"name": "Public",
"telephone1": "Internal",
"creditlimit": "Confidential"
}
)
# Enforce policies
governance.enforce_all_policies()
```
---
## 9. Current SDK Capabilities Supporting Agentic Workflows
While full agentic features are in preview, current SDK capabilities already support agent building:
### ✅ Available Now
- **CRUD Operations** - Create, retrieve, update, delete data
- **Bulk Operations** - Process large datasets efficiently
- **Query Capabilities** - OData and SQL for flexible data retrieval
- **Metadata Operations** - Work with table and column definitions
- **Error Handling** - Structured exception hierarchy
- **Pagination** - Handle large result sets
- **File Upload** - Manage document attachments
### 🔜 Coming in GA
- Full MCP integration
- A2A collaboration primitives
- Enhanced authentication/impersonation
- Governance policy enforcement
- Native async/await support
- Advanced caching strategies
---
## 10. Getting Started: Build Your First Agent Today
```python
from PowerPlatform.Dataverse.client import DataverseClient
from azure.identity import InteractiveBrowserCredential
import json
class SimpleDataAgent:
"""Your first Dataverse agent."""
def __init__(self, org_url):
credential = InteractiveBrowserCredential()
self.client = DataverseClient(org_url, credential)
def check_health(self, table_name):
"""Agent function: Check table health."""
try:
tables = self.client.list_tables()
matching = [t for t in tables if t['table_logical_name'] == table_name]
if not matching:
return {"status": "error", "message": f"Table {table_name} not found"}
# Get record count
records = []
for page in self.client.get(table_name):
records.extend(page)
if len(records) > 1000:
break
return {
"status": "healthy",
"table": table_name,
"record_count": len(records),
"timestamp": pd.Timestamp.now().isoformat()
}
except Exception as e:
return {"status": "error", "message": str(e)}
# Usage
agent = SimpleDataAgent("https://<org>.crm.dynamics.com")
health = agent.check_health("account")
print(json.dumps(health, indent=2))
```
---
## 11. Resources & Documentation
### Official Documentation
- [Dataverse SDK for Python Overview](https://learn.microsoft.com/en-us/power-apps/developer/data-platform/sdk-python/overview)
- [Working with Data](https://learn.microsoft.com/en-us/power-apps/developer/data-platform/sdk-python/work-data)
- [Release Plan: Agentic Workflows](https://learn.microsoft.com/en-us/power-platform/release-plan/2025wave2/data-platform/build-agentic-flows-dataverse-sdk-python)
### External Resources
- [Model Context Protocol](https://modelcontextprotocol.io/)
- [Azure AI Services](https://learn.microsoft.com/en-us/azure/ai-services/)
- [Python async/await](https://docs.python.org/3/library/asyncio.html)
### Repository
- [SDK Source Code](https://github.com/microsoft/PowerPlatform-DataverseClient-Python)
- [Issues & Feature Requests](https://github.com/microsoft/PowerPlatform-DataverseClient-Python/issues)
---
## 12. FAQ: Agentic Workflows
**Q: Can I use agents today with the current SDK?**
A: Yes! Use the current capabilities to build agent-like systems. Full MCP/A2A support coming in GA.
**Q: What's the difference between current SDK and agentic features?**
A: Current: Synchronous CRUD; Agentic: Async, autonomous decision-making, agent collaboration.
**Q: Will there be breaking changes from preview to GA?**
A: Possibly. This is a preview feature; expect API refinements before general availability.
**Q: How do I prepare for agentic workflows today?**
A: Build agents using current CRUD operations, design with async patterns in mind, use MCP specs for future compatibility.
**Q: Is there a cost difference for agentic features?**
A: Unknown at this time. Check release notes closer to GA.
---
## 13. Next Steps
1. **Build a prototype** using current SDK capabilities
2. **Join preview** when MCP integration becomes available
3. **Provide feedback** via GitHub issues
4. **Watch for GA announcement** with full API documentation
5. **Migrate to full agentic** features when ready
The Dataverse SDK for Python is positioning itself as the go-to platform for building intelligent, autonomous data systems on the Microsoft Power Platform.

View File

@@ -0,0 +1,179 @@
---
applyTo: '**'
---
# Dataverse SDK for Python — API Reference Guide
## DataverseClient Class
Main client for interacting with Dataverse. Initialize with base URL and Azure credentials.
### Key Methods
#### create(table_schema_name, records)
Create single or bulk records. Returns list of GUIDs.
```python
# Single record
ids = client.create("account", {"name": "Acme"})
print(ids[0]) # First GUID
# Bulk create
ids = client.create("account", [{"name": "Contoso"}, {"name": "Fabrikam"}])
```
#### get(table_schema_name, record_id=None, select, filter, orderby, top, expand, page_size)
Fetch single record or query multiple with OData options.
```python
# Single record
record = client.get("account", record_id="guid-here")
# Query with filter and paging
for batch in client.get(
"account",
filter="statecode eq 0",
select=["name", "telephone1"],
orderby=["createdon desc"],
top=100,
page_size=50
):
for record in batch:
print(record["name"])
```
#### update(table_schema_name, ids, changes)
Update single or bulk records.
```python
# Single update
client.update("account", "guid-here", {"telephone1": "555-0100"})
# Broadcast: apply same changes to many IDs
client.update("account", [id1, id2, id3], {"statecode": 1})
# Paired: one-to-one mapping
client.update("account", [id1, id2], [{"name": "A"}, {"name": "B"}])
```
#### delete(table_schema_name, ids, use_bulk_delete=True)
Delete single or bulk records.
```python
# Single delete
client.delete("account", "guid-here")
# Bulk delete (async)
job_id = client.delete("account", [id1, id2, id3])
```
#### create_table(table_schema_name, columns, solution_unique_name=None, primary_column_schema_name=None)
Create custom table.
```python
from enum import IntEnum
class ItemStatus(IntEnum):
ACTIVE = 1
INACTIVE = 2
__labels__ = {
1033: {"ACTIVE": "Active", "INACTIVE": "Inactive"}
}
info = client.create_table("new_MyTable", {
"new_Title": "string",
"new_Quantity": "int",
"new_Price": "decimal",
"new_Active": "bool",
"new_Status": ItemStatus
})
print(info["entity_logical_name"])
```
#### create_columns(table_schema_name, columns)
Add columns to existing table.
```python
created = client.create_columns("new_MyTable", {
"new_Notes": "string",
"new_Count": "int"
})
```
#### delete_columns(table_schema_name, columns)
Remove columns from table.
```python
removed = client.delete_columns("new_MyTable", ["new_Notes", "new_Count"])
```
#### delete_table(table_schema_name)
Delete custom table (irreversible).
```python
client.delete_table("new_MyTable")
```
#### get_table_info(table_schema_name)
Retrieve table metadata.
```python
info = client.get_table_info("new_MyTable")
if info:
print(info["table_logical_name"])
print(info["entity_set_name"])
```
#### list_tables()
List all custom tables.
```python
tables = client.list_tables()
for table in tables:
print(table)
```
#### flush_cache(kind)
Clear SDK caches (e.g., picklist labels).
```python
removed = client.flush_cache("picklist")
```
## DataverseConfig Class
Configure client behavior (timeouts, retries, language).
```python
from PowerPlatform.Dataverse.core.config import DataverseConfig
cfg = DataverseConfig()
cfg.http_retries = 3
cfg.http_backoff = 1.0
cfg.http_timeout = 30
cfg.language_code = 1033 # English
client = DataverseClient(base_url=url, credential=cred, config=cfg)
```
## Error Handling
Catch `DataverseError` for SDK-specific exceptions. Check `is_transient` to decide retry.
```python
from PowerPlatform.Dataverse.core.errors import DataverseError
try:
client.create("account", {"name": "Test"})
except DataverseError as e:
print(f"Code: {e.code}")
print(f"Message: {e.message}")
print(f"Transient: {e.is_transient}")
print(f"Details: {e.to_dict()}")
```
## OData Filter Tips
- Use exact logical names (lowercase) in filter expressions
- Column names in `select` are auto-lowercased
- Navigation property names in `expand` are case-sensitive
## References
- API docs: https://learn.microsoft.com/en-us/python/api/powerplatform-dataverse-client/powerplatform.dataverse.client.dataverseclient
- Config docs: https://learn.microsoft.com/en-us/python/api/powerplatform-dataverse-client/powerplatform.dataverse.core.config.dataverseconfig
- Errors: https://learn.microsoft.com/en-us/python/api/powerplatform-dataverse-client/powerplatform.dataverse.core.errors

View File

@@ -0,0 +1,527 @@
---
applyTo: '**'
---
# Dataverse SDK for Python — Authentication & Security Patterns
Based on official Microsoft Azure SDK authentication documentation and Dataverse SDK best practices.
## 1. Authentication Overview
The Dataverse SDK for Python uses Azure Identity credentials for token-based authentication. This approach follows the principle of least privilege and works across local development, cloud deployment, and on-premises environments.
### Why Token-Based Authentication?
**Advantages over connection strings**:
- Establishes specific permissions needed by your app (principle of least privilege)
- Credentials are scoped only to intended apps
- With managed identity, no secrets to store or compromise
- Works seamlessly across environments without code changes
---
## 2. Credential Types & Selection
### Interactive Browser Credential (Local Development)
**Use for**: Developer workstations during local development.
```python
from azure.identity import InteractiveBrowserCredential
from PowerPlatform.Dataverse.client import DataverseClient
# Opens browser for authentication
credential = InteractiveBrowserCredential()
client = DataverseClient(
base_url="https://myorg.crm.dynamics.com",
credential=credential
)
# First use prompts for sign-in; subsequent calls use cached token
records = client.get("account")
```
**When to use**:
- ✅ Interactive development and testing
- ✅ Desktop applications with UI
- ❌ Background services or scheduled jobs
---
### Default Azure Credential (Recommended for All Environments)
**Use for**: Apps that run in multiple environments (dev → test → production).
```python
from azure.identity import DefaultAzureCredential
from PowerPlatform.Dataverse.client import DataverseClient
# Attempts credentials in this order:
# 1. Environment variables (app service principal)
# 2. Azure CLI credentials (local development)
# 3. Azure PowerShell credentials (local development)
# 4. Managed identity (when running in Azure)
credential = DefaultAzureCredential()
client = DataverseClient(
base_url="https://myorg.crm.dynamics.com",
credential=credential
)
records = client.get("account")
```
**Advantages**:
- Single code path works everywhere
- No environment-specific logic needed
- Automatically detects available credentials
- Preferred for production apps
**Credential chain**:
1. Environment variables (`AZURE_CLIENT_ID`, `AZURE_TENANT_ID`, `AZURE_CLIENT_SECRET`)
2. Visual Studio Code login
3. Azure CLI (`az login`)
4. Azure PowerShell (`Connect-AzAccount`)
5. Managed identity (on Azure VMs, App Service, AKS, etc.)
---
### Client Secret Credential (Service Principal)
**Use for**: Unattended authentication (scheduled jobs, scripts, on-premises services).
```python
from azure.identity import ClientSecretCredential
from PowerPlatform.Dataverse.client import DataverseClient
import os
credential = ClientSecretCredential(
tenant_id=os.environ["AZURE_TENANT_ID"],
client_id=os.environ["AZURE_CLIENT_ID"],
client_secret=os.environ["AZURE_CLIENT_SECRET"]
)
client = DataverseClient(
base_url="https://myorg.crm.dynamics.com",
credential=credential
)
records = client.get("account")
```
**Setup steps**:
1. Create app registration in Azure AD
2. Create client secret (keep secure!)
3. Grant Dataverse permissions to the app
4. Store credentials in environment variables or secure vault
**Security concerns**:
- ⚠️ Never hardcode credentials in source code
- ⚠️ Store secrets in Azure Key Vault or environment variables
- ⚠️ Rotate credentials regularly
- ⚠️ Use minimal required permissions
---
### Managed Identity Credential (Azure Resources)
**Use for**: Apps hosted in Azure (App Service, Azure Functions, AKS, VMs).
```python
from azure.identity import ManagedIdentityCredential
from PowerPlatform.Dataverse.client import DataverseClient
# No secrets needed - Azure manages identity
credential = ManagedIdentityCredential()
client = DataverseClient(
base_url="https://myorg.crm.dynamics.com",
credential=credential
)
records = client.get("account")
```
**Benefits**:
- ✅ No secrets to manage
- ✅ Automatic token refresh
- ✅ Highly secure
- ✅ Built-in to Azure services
**Setup**:
1. Enable managed identity on Azure resource (App Service, VM, etc.)
2. Grant Dataverse permissions to the managed identity
3. Code automatically uses the identity
---
## 3. Environment-Specific Configuration
### Local Development
```python
# .env file (git-ignored)
DATAVERSE_URL=https://myorg-dev.crm.dynamics.com
# Python code
import os
from azure.identity import DefaultAzureCredential
from PowerPlatform.Dataverse.client import DataverseClient
# Uses your Azure CLI credentials
credential = DefaultAzureCredential()
client = DataverseClient(
base_url=os.environ["DATAVERSE_URL"],
credential=credential
)
```
**Setup**: `az login` with your developer account
---
### Azure App Service / Azure Functions
```python
from azure.identity import ManagedIdentityCredential
from PowerPlatform.Dataverse.client import DataverseClient
# Automatically uses managed identity
credential = ManagedIdentityCredential()
client = DataverseClient(
base_url="https://myorg.crm.dynamics.com",
credential=credential
)
```
**Setup**: Enable managed identity in App Service, grant permissions in Dataverse
---
### On-Premises / Third-Party Hosting
```python
import os
from azure.identity import ClientSecretCredential
from PowerPlatform.Dataverse.client import DataverseClient
credential = ClientSecretCredential(
tenant_id=os.environ["AZURE_TENANT_ID"],
client_id=os.environ["AZURE_CLIENT_ID"],
client_secret=os.environ["AZURE_CLIENT_SECRET"]
)
client = DataverseClient(
base_url="https://myorg.crm.dynamics.com",
credential=credential
)
```
**Setup**: Create service principal, store credentials securely, grant Dataverse permissions
---
## 4. Client Configuration & Connection Settings
### Basic Configuration
```python
from PowerPlatform.Dataverse.core.config import DataverseConfig
from azure.identity import DefaultAzureCredential
from PowerPlatform.Dataverse.client import DataverseClient
cfg = DataverseConfig()
cfg.logging_enable = True # Enable detailed logging
client = DataverseClient(
base_url="https://myorg.crm.dynamics.com",
credential=DefaultAzureCredential(),
config=cfg
)
```
### HTTP Tuning
```python
from PowerPlatform.Dataverse.core.config import DataverseConfig
cfg = DataverseConfig()
# Timeout settings
cfg.http_timeout = 30 # Request timeout in seconds
# Retry configuration
cfg.http_retries = 3 # Number of retry attempts
cfg.http_backoff = 1 # Initial backoff in seconds
# Connection reuse
cfg.connection_timeout = 5 # Connection timeout
client = DataverseClient(
base_url="https://myorg.crm.dynamics.com",
credential=credential,
config=cfg
)
```
---
## 5. Security Best Practices
### 1. Never Hardcode Credentials
```python
# ❌ BAD - Don't do this!
credential = ClientSecretCredential(
tenant_id="your-tenant-id",
client_id="your-client-id",
client_secret="your-secret-key" # EXPOSED!
)
# ✅ GOOD - Use environment variables
import os
credential = ClientSecretCredential(
tenant_id=os.environ["AZURE_TENANT_ID"],
client_id=os.environ["AZURE_CLIENT_ID"],
client_secret=os.environ["AZURE_CLIENT_SECRET"]
)
```
### 2. Store Secrets Securely
**Development**:
```bash
# .env file (git-ignored)
AZURE_TENANT_ID=your-tenant-id
AZURE_CLIENT_ID=your-client-id
AZURE_CLIENT_SECRET=your-secret-key
```
**Production**:
```python
from azure.keyvault.secrets import SecretClient
from azure.identity import DefaultAzureCredential
# Retrieve secrets from Azure Key Vault
credential = DefaultAzureCredential()
client = SecretClient(
vault_url="https://mykeyvault.vault.azure.net",
credential=credential
)
secret = client.get_secret("dataverse-client-secret")
```
### 3. Implement Principle of Least Privilege
```python
# Grant minimal permissions:
# - Only read if app only reads
# - Only specific tables if possible
# - Time-limit credentials (auto-rotation)
# - Use managed identity instead of shared secrets
```
### 4. Monitor Authentication Events
```python
import logging
logger = logging.getLogger("dataverse_auth")
try:
client = DataverseClient(
base_url="https://myorg.crm.dynamics.com",
credential=credential
)
logger.info("Successfully authenticated to Dataverse")
except Exception as e:
logger.error(f"Authentication failed: {e}")
raise
```
### 5. Handle Token Expiration
```python
from azure.core.exceptions import ClientAuthenticationError
import time
def create_with_auth_retry(client, table_name, payload, max_retries=2):
"""Create record, retrying if token expired."""
for attempt in range(max_retries):
try:
return client.create(table_name, payload)
except ClientAuthenticationError:
if attempt < max_retries - 1:
logger.warning("Token expired, retrying...")
time.sleep(1)
else:
raise
```
---
## 6. Multi-Tenant Applications
### Tenant-Aware Client
```python
from azure.identity import DefaultAzureCredential
from PowerPlatform.Dataverse.client import DataverseClient
def get_client_for_tenant(tenant_id: str) -> DataverseClient:
"""Get DataverseClient for specific tenant."""
credential = DefaultAzureCredential()
# Dataverse URL contains tenant-specific org
base_url = f"https://{get_org_for_tenant(tenant_id)}.crm.dynamics.com"
return DataverseClient(
base_url=base_url,
credential=credential
)
def get_org_for_tenant(tenant_id: str) -> str:
"""Map tenant to Dataverse organization."""
# Implementation depends on your multi-tenant strategy
# Could be database lookup, configuration, etc.
pass
```
---
## 7. Troubleshooting Authentication
### Error: "Access Denied" (403)
```python
try:
client.get("account")
except DataverseError as e:
if e.status_code == 403:
print("User/app lacks Dataverse permissions")
print("Ensure Dataverse security role is assigned")
```
### Error: "Invalid Credentials" (401)
```python
# Check credential source
from azure.identity import DefaultAzureCredential
try:
cred = DefaultAzureCredential(exclude_cli_credential=False,
exclude_powershell_credential=False)
# Force re-authentication
import subprocess
subprocess.run(["az", "login"])
except Exception as e:
print(f"Authentication failed: {e}")
```
### Error: "Invalid Tenant"
```python
# Verify tenant ID
import json
from azure.identity import DefaultAzureCredential
credential = DefaultAzureCredential()
token = credential.get_token("https://dataverse.dynamics.com/.default")
# Decode token to verify tenant
import base64
payload = base64.b64decode(token.token.split('.')[1] + '==')
claims = json.loads(payload)
print(f"Token tenant: {claims.get('tid')}")
```
---
## 8. Credential Lifecycle
### Token Refresh
Azure Identity handles token refresh automatically:
```python
# Tokens are cached and refreshed automatically
credential = DefaultAzureCredential()
# First call acquires token
client.get("account")
# Subsequent calls reuse cached token
client.get("contact")
# If token expires, SDK automatically refreshes
```
### Session Management
```python
class DataverseSession:
"""Manages DataverseClient lifecycle."""
def __init__(self, base_url: str):
from azure.identity import DefaultAzureCredential
self.client = DataverseClient(
base_url=base_url,
credential=DefaultAzureCredential()
)
def __enter__(self):
return self.client
def __exit__(self, exc_type, exc_val, exc_tb):
# Cleanup if needed
pass
# Usage
with DataverseSession("https://myorg.crm.dynamics.com") as client:
records = client.get("account")
```
---
## 9. Dataverse-Specific Security
### Row-Level Security (RLS)
User's Dataverse security role determines accessible records:
```python
from azure.identity import InteractiveBrowserCredential
from PowerPlatform.Dataverse.client import DataverseClient
# Each user gets client with their credentials
def get_user_client(user_username: str) -> DataverseClient:
# User must already be authenticated
credential = InteractiveBrowserCredential()
client = DataverseClient(
base_url="https://myorg.crm.dynamics.com",
credential=credential
)
# User only sees records they have access to
return client
```
### Security Roles
Assign minimal required roles:
- **System Administrator**: Full access (avoid for apps)
- **Sales Manager**: Sales tables + reporting
- **Service Representative**: Service cases + knowledge
- **Custom**: Create role with specific table permissions
---
## 10. See Also
- [Azure Identity Client Library](https://learn.microsoft.com/en-us/python/api/azure-identity)
- [Authenticate to Azure Services](https://learn.microsoft.com/en-us/azure/developer/python/sdk/authentication/overview)
- [Azure Key Vault for Secrets](https://learn.microsoft.com/en-us/azure/key-vault/general/overview)
- [Dataverse Security Model](https://learn.microsoft.com/en-us/power-platform/admin/security/security-overview)

View File

@@ -0,0 +1,735 @@
# Dataverse SDK for Python - Best Practices Guide
## Overview
Production-ready patterns and best practices extracted from Microsoft's official PowerPlatform-DataverseClient-Python repository, examples, and recommended workflows.
## 1. Installation & Environment Setup
### Production Installation
```bash
# Install the published SDK from PyPI
pip install PowerPlatform-Dataverse-Client
# Install Azure Identity for authentication
pip install azure-identity
# Optional: pandas integration for data manipulation
pip install pandas
```
### Development Installation
```bash
# Clone the repository
git clone https://github.com/microsoft/PowerPlatform-DataverseClient-Python.git
cd PowerPlatform-DataverseClient-Python
# Install in editable mode for live development
pip install -e .
# Install development dependencies
pip install pytest pytest-cov black isort mypy ruff
```
### Python Version Support
- **Minimum**: Python 3.10
- **Recommended**: Python 3.11+ for best performance
- **Supported**: Python 3.10, 3.11, 3.12, 3.13, 3.14
### Verify Installation
```python
from PowerPlatform.Dataverse import __version__
from PowerPlatform.Dataverse.client import DataverseClient
from azure.identity import InteractiveBrowserCredential
print(f"SDK Version: {__version__}")
print("Installation successful!")
```
---
## 2. Authentication Patterns
### Interactive Development (Browser-Based)
```python
from azure.identity import InteractiveBrowserCredential
from PowerPlatform.Dataverse.client import DataverseClient
credential = InteractiveBrowserCredential()
client = DataverseClient("https://yourorg.crm.dynamics.com", credential)
```
**When to use:** Local development, interactive testing, single-user scenarios.
### Production (Client Secret)
```python
from azure.identity import ClientSecretCredential
from PowerPlatform.Dataverse.client import DataverseClient
credential = ClientSecretCredential(
tenant_id="your-tenant-id",
client_id="your-client-id",
client_secret="your-client-secret"
)
client = DataverseClient("https://yourorg.crm.dynamics.com", credential)
```
**When to use:** Server-side applications, Azure automation, scheduled jobs.
### Certificate-Based Authentication
```python
from azure.identity import ClientCertificateCredential
from PowerPlatform.Dataverse.client import DataverseClient
credential = ClientCertificateCredential(
tenant_id="your-tenant-id",
client_id="your-client-id",
certificate_path="path/to/certificate.pem"
)
client = DataverseClient("https://yourorg.crm.dynamics.com", credential)
```
**When to use:** Highly secure environments, certificate-pinning requirements.
### Azure CLI Authentication
```python
from azure.identity import AzureCliCredential
from PowerPlatform.Dataverse.client import DataverseClient
credential = AzureCliCredential()
client = DataverseClient("https://yourorg.crm.dynamics.com", credential)
```
**When to use:** Local testing with Azure CLI installed, Azure DevOps pipelines.
---
## 3. Singleton Client Pattern
**Best Practice**: Create one `DataverseClient` instance and reuse it throughout your application.
```python
# ❌ ANTI-PATTERN: Creating new clients repeatedly
def fetch_account(account_id):
credential = InteractiveBrowserCredential()
client = DataverseClient("https://yourorg.crm.dynamics.com", credential)
return client.get("account", account_id)
# ✅ PATTERN: Singleton client
class DataverseService:
_instance = None
def __new__(cls):
if cls._instance is None:
credential = InteractiveBrowserCredential()
cls._instance = DataverseClient(
"https://yourorg.crm.dynamics.com",
credential
)
return cls._instance
# Usage
service = DataverseService()
account = service.get("account", account_id)
```
---
## 4. Configuration Optimization
### Connection Settings
```python
from PowerPlatform.Dataverse.core.config import DataverseConfig
from PowerPlatform.Dataverse.client import DataverseClient
from azure.identity import ClientSecretCredential
config = DataverseConfig(
language_code=1033, # English (US)
# Note: http_retries, http_backoff, http_timeout are reserved for internal use
)
credential = ClientSecretCredential(tenant_id, client_id, client_secret)
client = DataverseClient("https://yourorg.crm.dynamics.com", credential, config)
```
**Key configuration options:**
- `language_code`: Language for API responses (default: 1033 for English)
---
## 5. CRUD Operations Best Practices
### Create Operations
#### Single Record
```python
record_data = {
"name": "Contoso Ltd",
"telephone1": "555-0100",
"creditlimit": 100000.00,
}
created_ids = client.create("account", record_data)
record_id = created_ids[0]
print(f"Created: {record_id}")
```
#### Bulk Create (Automatically Optimized)
```python
# SDK automatically uses CreateMultiple for arrays > 1 record
records = [
{"name": f"Company {i}", "creditlimit": 50000 + (i * 1000)}
for i in range(100)
]
created_ids = client.create("account", records)
print(f"Created {len(created_ids)} records")
```
**Performance**: Bulk create is optimized internally; no manual batching required.
### Read Operations
#### Single Record by ID
```python
account = client.get("account", "account-guid-here")
print(account.get("name"))
```
#### Query with Filtering & Selection
```python
# Returns paginated results (generator)
for page in client.get(
"account",
filter="creditlimit gt 50000",
select=["name", "creditlimit", "telephone1"],
orderby="name",
top=100
):
for account in page:
print(f"{account['name']}: ${account['creditlimit']}")
```
**Key parameters:**
- `filter`: OData filter (must use **lowercase** logical names)
- `select`: Fields to retrieve (improves performance)
- `orderby`: Sort results
- `top`: Max records per page (default: 5000)
- `page_size`: Override page size for pagination
#### SQL Queries (Read-Only)
```python
# SQL queries are read-only; use for complex analytics
results = client.query_sql("""
SELECT TOP 10 name, creditlimit
FROM account
WHERE creditlimit > 50000
ORDER BY name
""")
for row in results:
print(f"{row['name']}: ${row['creditlimit']}")
```
**Limitations:**
- Read-only (SELECT only, no DML)
- Useful for complex joins and analytics
- May be disabled by org policy
### Update Operations
#### Single Record
```python
client.update("account", "account-guid", {
"creditlimit": 150000.00,
"name": "Updated Company Name"
})
```
#### Bulk Update (Broadcast Same Change)
```python
# Update all selected records with same data
account_ids = ["id1", "id2", "id3"]
client.update("account", account_ids, {
"industrycode": 1, # Retail
"accountmanagerid": "manager-guid"
})
```
#### Paired Updates (1:1 Record Updates)
```python
# For different updates per record, send multiple calls
updates = {
"id1": {"creditlimit": 100000},
"id2": {"creditlimit": 200000},
"id3": {"creditlimit": 300000},
}
for record_id, data in updates.items():
client.update("account", record_id, data)
```
### Delete Operations
#### Single Record
```python
client.delete("account", "account-guid")
```
#### Bulk Delete (Optimized)
```python
# SDK automatically uses BulkDelete for large lists
record_ids = ["id1", "id2", "id3", ...]
client.delete("account", record_ids, use_bulk_delete=True)
```
---
## 6. Error Handling & Recovery
### Exception Hierarchy
```python
from PowerPlatform.Dataverse.core.errors import (
DataverseError, # Base class
ValidationError, # Validation failures
MetadataError, # Table/column operations
HttpError, # HTTP-level errors
SQLParseError # SQL query syntax errors
)
try:
client.create("account", {"name": None}) # Invalid
except ValidationError as e:
print(f"Validation failed: {e}")
# Handle validation-specific logic
except DataverseError as e:
print(f"General SDK error: {e}")
# Handle other SDK errors
```
### Retry Logic Pattern
```python
import time
from PowerPlatform.Dataverse.core.errors import HttpError
def create_with_retry(table_name, record_data, max_retries=3):
"""Create record with exponential backoff retry logic."""
for attempt in range(max_retries):
try:
return client.create(table_name, record_data)
except HttpError as e:
if attempt == max_retries - 1:
raise
# Exponential backoff: 1s, 2s, 4s
backoff_seconds = 2 ** attempt
print(f"Attempt {attempt + 1} failed. Retrying in {backoff_seconds}s...")
time.sleep(backoff_seconds)
# Usage
created_ids = create_with_retry("account", {"name": "Contoso"})
```
### 429 (Request Rate Limit) Handling
```python
import time
from PowerPlatform.Dataverse.core.errors import HttpError
try:
accounts = client.get("account", top=5000)
except HttpError as e:
if "429" in str(e):
# Rate limited; wait and retry
print("Rate limited. Waiting 60 seconds...")
time.sleep(60)
accounts = client.get("account", top=5000)
else:
raise
```
---
## 7. Table & Column Management
### Create Custom Table
```python
from enum import IntEnum
class Priority(IntEnum):
LOW = 1
MEDIUM = 2
HIGH = 3
# Define columns with types
columns = {
"new_Title": "string",
"new_Quantity": "int",
"new_Amount": "decimal",
"new_Completed": "bool",
"new_Priority": Priority, # Creates option set/picklist
"new_CreatedDate": "datetime"
}
table_info = client.create_table(
"new_CustomTable",
primary_column_schema_name="new_Name",
columns=columns
)
print(f"Created table: {table_info['table_schema_name']}")
```
### Get Table Metadata
```python
table_info = client.get_table_info("account")
print(f"Schema Name: {table_info['table_schema_name']}")
print(f"Logical Name: {table_info['table_logical_name']}")
print(f"Entity Set: {table_info['entity_set_name']}")
print(f"Primary ID: {table_info['primary_id_attribute']}")
```
### List All Tables
```python
tables = client.list_tables()
for table in tables:
print(f"{table['table_schema_name']} ({table['table_logical_name']})")
```
### Column Management
```python
# Add columns to existing table
client.create_columns("new_CustomTable", {
"new_Status": "string",
"new_Priority": "int"
})
# Delete columns
client.delete_columns("new_CustomTable", ["new_Status", "new_Priority"])
# Delete table
client.delete_table("new_CustomTable")
```
---
## 8. Paging & Large Result Sets
### Pagination Pattern
```python
# Retrieve all accounts in pages
all_accounts = []
for page in client.get(
"account",
top=500, # Records per page
page_size=500
):
all_accounts.extend(page)
print(f"Retrieved page with {len(page)} records")
print(f"Total: {len(all_accounts)} records")
```
### Manual Paging with Continuation Tokens
```python
# For complex paging scenarios
skip_count = 0
page_size = 1000
while True:
page = client.get("account", top=page_size, skip=skip_count)
if not page:
break
print(f"Page {skip_count // page_size + 1}: {len(page)} records")
skip_count += page_size
```
---
## 9. File Operations
### Upload Small Files (< 128 MB)
```python
from pathlib import Path
file_path = Path("document.pdf")
record_id = "account-guid"
# Single PATCH upload
response = client.upload_file(
table_name="account",
record_id=record_id,
file_column_name="new_documentfile",
file_path=file_path
)
print(f"Upload successful: {response}")
```
### Upload Large Files with Chunking
```python
from pathlib import Path
file_path = Path("large_video.mp4")
record_id = "account-guid"
# SDK automatically chunks large files
response = client.upload_file(
table_name="account",
record_id=record_id,
file_column_name="new_videofile",
file_path=file_path,
chunk_size=4 * 1024 * 1024 # 4 MB chunks
)
print(f"Chunked upload complete")
```
---
## 10. OData Filter Optimization
### Case Sensitivity Rules
```python
# ❌ WRONG: Uppercase logical names
results = client.get("account", filter="Name eq 'Contoso'")
# ✅ CORRECT: Lowercase logical names
results = client.get("account", filter="name eq 'Contoso'")
# ✅ Values ARE case-sensitive when needed
results = client.get("account", filter="name eq 'Contoso Ltd'")
```
### Filter Expression Examples
```python
# Equality
client.get("account", filter="name eq 'Contoso'")
# Greater than / Less than
client.get("account", filter="creditlimit gt 50000")
client.get("account", filter="createdon lt 2024-01-01")
# String contains
client.get("account", filter="contains(name, 'Ltd')")
# AND/OR operations
client.get("account", filter="(name eq 'Contoso') and (creditlimit gt 50000)")
client.get("account", filter="(industrycode eq 1) or (industrycode eq 2)")
# NOT operation
client.get("account", filter="not(statecode eq 1)")
```
### Select & Expand
```python
# Select specific columns (improves performance)
client.get("account", select=["name", "creditlimit", "telephone1"])
# Expand related records
client.get(
"account",
expand=["parentaccountid($select=name)"],
select=["name", "parentaccountid"]
)
```
---
## 11. Cache Management
### Flushing Cache
```python
# Clear SDK internal cache after bulk operations
client.flush_cache()
# Useful after:
# - Metadata changes (table/column creation)
# - Bulk deletes
# - Metadata synchronization
```
---
## 12. Performance Best Practices
### Do's ✅
1. **Use `select` parameter**: Only fetch needed columns
```python
client.get("account", select=["name", "creditlimit"])
```
2. **Batch operations**: Create/update multiple records at once
```python
ids = client.create("account", [record1, record2, record3])
```
3. **Use paging**: Don't load all records at once
```python
for page in client.get("account", top=1000):
process_page(page)
```
4. **Reuse client instance**: Create once, use many times
```python
client = DataverseClient(url, credential) # Once
# Reuse throughout app
```
5. **Apply filters on server**: Let Dataverse filter before returning
```python
client.get("account", filter="creditlimit gt 50000")
```
### Don'ts ❌
1. **Don't fetch all columns**: Specify what you need
```python
# Slow
client.get("account")
```
2. **Don't create records in loops**: Batch them
```python
# Slow
for record in records:
client.create("account", record)
```
3. **Don't load all results at once**: Use pagination
```python
# Slow
all_accounts = list(client.get("account"))
```
4. **Don't create new clients repeatedly**: Reuse singleton
```python
# Inefficient
for i in range(100):
client = DataverseClient(url, credential)
```
---
## 13. Common Patterns Summary
### Pattern: Upsert (Create or Update)
```python
def upsert_account(name, data):
"""Create account or update if exists."""
try:
# Try to find existing
results = list(client.get("account", filter=f"name eq '{name}'"))
if results:
account_id = results[0]['accountid']
client.update("account", account_id, data)
return account_id, "updated"
else:
ids = client.create("account", {"name": name, **data})
return ids[0], "created"
except Exception as e:
print(f"Upsert failed: {e}")
raise
```
### Pattern: Bulk Operation with Error Recovery
```python
def create_with_recovery(records):
"""Create records with per-record error tracking."""
results = {"success": [], "failed": []}
try:
ids = client.create("account", records)
results["success"] = ids
except Exception as e:
# If bulk fails, try individual records
for i, record in enumerate(records):
try:
ids = client.create("account", record)
results["success"].append(ids[0])
except Exception as e:
results["failed"].append({"index": i, "record": record, "error": str(e)})
return results
```
---
## 14. Dependencies & Versions
### Core Dependencies
- **azure-identity** >= 1.17.0 (Authentication)
- **azure-core** >= 1.30.2 (HTTP client)
- **requests** >= 2.32.0 (HTTP requests)
- **Python** >= 3.10
### Optional Dependencies
- **pandas** (Data manipulation)
- **reportlab** (PDF generation for file examples)
### Development Tools
- **pytest** >= 7.0.0 (Testing)
- **black** >= 23.0.0 (Code formatting)
- **mypy** >= 1.0.0 (Type checking)
- **ruff** >= 0.1.0 (Linting)
---
## 15. Troubleshooting Common Issues
### ImportError: No module named 'PowerPlatform'
```bash
# Verify installation
pip show PowerPlatform-Dataverse-Client
# Reinstall
pip install --upgrade PowerPlatform-Dataverse-Client
# Check virtual environment is activated
which python # Should show venv path
```
### Authentication Failed
```python
# Verify credentials have Dataverse access
# Try interactive auth first for testing
from azure.identity import InteractiveBrowserCredential
credential = InteractiveBrowserCredential(
tenant_id="your-tenant-id" # Specify if multiple tenants
)
# Check org URL format
# ✓ https://yourorg.crm.dynamics.com
# ❌ https://yourorg.crm.dynamics.com/
# ❌ https://yourorg.crm4.dynamics.com (regional)
```
### HTTP 429 Rate Limiting
```python
# Reduce request frequency
# Implement exponential backoff (see Error Handling section)
# Reduce page size
client.get("account", top=500) # Instead of 5000
```
### MetadataError: Table Not Found
```python
# Verify table exists (schema name is case-insensitive for existence, but case-sensitive for API)
tables = client.list_tables()
print([t['table_schema_name'] for t in tables])
# Use exact schema name
table_info = client.get_table_info("new_customprefixed_table")
```
### SQL Query Not Enabled
```python
# query_sql() requires org config
# If disabled, fallback to OData
try:
results = client.query_sql("SELECT * FROM account")
except Exception:
# Fallback to OData
results = client.get("account")
```
---
## Reference Links
- [Official Repository](https://github.com/microsoft/PowerPlatform-DataverseClient-Python)
- [PyPI Package](https://pypi.org/project/PowerPlatform-Dataverse-Client/)
- [Azure Identity Documentation](https://learn.microsoft.com/en-us/python/api/overview/azure/identity-readme)
- [Dataverse Web API Documentation](https://learn.microsoft.com/en-us/power-apps/developer/data-platform/webapi/overview)

View File

@@ -0,0 +1,536 @@
---
applyTo: '**'
---
# Dataverse SDK for Python — Error Handling & Troubleshooting Guide
Based on official Microsoft documentation for Azure SDK error handling patterns and Dataverse SDK specifics.
## 1. DataverseError Class Overview
The Dataverse SDK for Python provides a structured exception hierarchy for robust error handling.
### DataverseError Constructor
```python
from PowerPlatform.Dataverse.core.errors import DataverseError
DataverseError(
message: str, # Human-readable error message
code: str, # Error category (e.g., "validation_error", "http_error")
subcode: str | None = None, # Optional specific error identifier
status_code: int | None = None, # HTTP status code (if applicable)
details: Dict[str, Any] | None = None, # Additional diagnostic information
source: str | None = None, # Error source: "client" or "server"
is_transient: bool = False # Whether error may succeed on retry
)
```
### Key Properties
```python
try:
client.get("account", record_id="invalid-id")
except DataverseError as e:
print(f"Message: {e.message}") # Human-readable message
print(f"Code: {e.code}") # Error category
print(f"Subcode: {e.subcode}") # Specific error type
print(f"Status Code: {e.status_code}") # HTTP status (401, 403, 429, etc.)
print(f"Source: {e.source}") # "client" or "server"
print(f"Is Transient: {e.is_transient}") # Can retry?
print(f"Details: {e.details}") # Additional context
# Convert to dictionary for logging
error_dict = e.to_dict()
```
---
## 2. Common Error Scenarios
### Authentication Errors (401)
**Cause**: Invalid credentials, expired tokens, or misconfigured settings.
```python
from PowerPlatform.Dataverse.client import DataverseClient
from PowerPlatform.Dataverse.core.errors import DataverseError
from azure.identity import InteractiveBrowserCredential
try:
# Bad credentials or expired token
credential = InteractiveBrowserCredential()
client = DataverseClient(
base_url="https://invalid-org.crm.dynamics.com",
credential=credential
)
records = client.get("account")
except DataverseError as e:
if e.status_code == 401:
print("Authentication failed. Check credentials and token expiration.")
print(f"Details: {e.message}")
# Don't retry - fix credentials first
else:
raise
```
### Authorization Errors (403)
**Cause**: User lacks permissions for the requested operation.
```python
try:
# User doesn't have permission to read contacts
records = client.get("contact")
except DataverseError as e:
if e.status_code == 403:
print("Access denied. User lacks required permissions.")
print(f"Request ID for support: {e.details.get('request_id')}")
# Escalate to administrator
else:
raise
```
### Resource Not Found (404)
**Cause**: Record, table, or resource doesn't exist.
```python
try:
# Record doesn't exist
record = client.get("account", record_id="00000000-0000-0000-0000-000000000000")
except DataverseError as e:
if e.status_code == 404:
print("Resource not found. Using default data.")
record = {"name": "Unknown", "id": None}
else:
raise
```
### Rate Limiting (429)
**Cause**: Too many requests exceeding service protection limits.
**Note**: The SDK has minimal built-in retry support. Handle transient consistency issues manually.
```python
import time
def create_with_retry(client, table_name, payload, max_retries=3):
"""Create record with retry logic for rate limiting."""
for attempt in range(max_retries):
try:
result = client.create(table_name, payload)
return result
except DataverseError as e:
if e.status_code == 429 and e.is_transient:
wait_time = 2 ** attempt # Exponential backoff
print(f"Rate limited. Retrying in {wait_time}s...")
time.sleep(wait_time)
else:
raise
raise Exception(f"Failed after {max_retries} retries")
```
### Server Errors (500, 502, 503, 504)
**Cause**: Temporary service issues or infrastructure problems.
```python
try:
result = client.create("account", {"name": "Acme"})
except DataverseError as e:
if 500 <= e.status_code < 600:
print(f"Server error ({e.status_code}). Service may be temporarily unavailable.")
# Implement retry logic with exponential backoff
else:
raise
```
### Validation Errors (400)
**Cause**: Invalid request format, missing required fields, or business rule violations.
```python
try:
# Missing required field or invalid data
client.create("account", {"telephone1": "not-a-phone-number"})
except DataverseError as e:
if e.status_code == 400:
print(f"Validation error: {e.message}")
if e.details:
print(f"Details: {e.details}")
# Log validation issues for debugging
else:
raise
```
---
## 3. Error Handling Best Practices
### Use Specific Exception Handling
Always catch specific exceptions before general ones:
```python
from PowerPlatform.Dataverse.core.errors import DataverseError
from azure.core.exceptions import AzureError
try:
records = client.get("account", filter="statecode eq 0", top=100)
except DataverseError as e:
# Handle Dataverse-specific errors
if e.status_code == 401:
print("Re-authenticate required")
elif e.status_code == 404:
print("Resource not found")
elif e.is_transient:
print("Transient error - may retry")
else:
print(f"Operation failed: {e.message}")
except AzureError as e:
# Handle Azure SDK errors (network, auth, etc.)
print(f"Azure error: {e}")
except Exception as e:
# Catch-all for unexpected errors
print(f"Unexpected error: {e}")
```
### Implement Smart Retry Logic
**Don't retry on**:
- 401 Unauthorized (authentication failures)
- 403 Forbidden (authorization failures)
- 400 Bad Request (client errors)
- 404 Not Found (unless resource should eventually appear)
**Consider retrying on**:
- 408 Request Timeout
- 429 Too Many Requests (with exponential backoff)
- 500 Internal Server Error
- 502 Bad Gateway
- 503 Service Unavailable
- 504 Gateway Timeout
```python
def should_retry(error: DataverseError) -> bool:
"""Determine if operation should be retried."""
if not error.is_transient:
return False
retryable_codes = {408, 429, 500, 502, 503, 504}
return error.status_code in retryable_codes
def call_with_exponential_backoff(func, *args, max_attempts=3, **kwargs):
"""Call function with exponential backoff retry."""
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except DataverseError as e:
if should_retry(e) and attempt < max_attempts - 1:
wait_time = 2 ** attempt # 1s, 2s, 4s...
print(f"Attempt {attempt + 1} failed. Retrying in {wait_time}s...")
time.sleep(wait_time)
else:
raise
```
### Extract Meaningful Error Information
```python
import json
from datetime import datetime
def log_error_for_support(error: DataverseError):
"""Log error with diagnostic information."""
error_info = {
"timestamp": datetime.utcnow().isoformat(),
"error_type": type(error).__name__,
"message": error.message,
"code": error.code,
"subcode": error.subcode,
"status_code": error.status_code,
"source": error.source,
"is_transient": error.is_transient,
"details": error.details
}
print(json.dumps(error_info, indent=2))
# Save to log file or send to monitoring service
return error_info
```
### Handle Bulk Operations Gracefully
```python
def bulk_create_with_error_tracking(client, table_name, payloads):
"""Create multiple records, tracking which succeed/fail."""
results = {
"succeeded": [],
"failed": []
}
for idx, payload in enumerate(payloads):
try:
record_ids = client.create(table_name, payload)
results["succeeded"].append({
"payload": payload,
"ids": record_ids
})
except DataverseError as e:
results["failed"].append({
"index": idx,
"payload": payload,
"error": {
"message": e.message,
"code": e.code,
"status": e.status_code
}
})
return results
```
---
## 4. Enable Diagnostic Logging
### Configure Logging
```python
import logging
import sys
# Set up root logger
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('dataverse_sdk.log'),
logging.StreamHandler(sys.stdout)
]
)
# Configure specific loggers
logging.getLogger('azure').setLevel(logging.DEBUG)
logging.getLogger('PowerPlatform').setLevel(logging.DEBUG)
# HTTP logging (careful with sensitive data)
logging.getLogger('azure.core.pipeline.policies.http_logging_policy').setLevel(logging.DEBUG)
```
### Enable SDK-Level Logging
```python
from PowerPlatform.Dataverse.client import DataverseClient
from PowerPlatform.Dataverse.core.config import DataverseConfig
from azure.identity import InteractiveBrowserCredential
cfg = DataverseConfig()
cfg.logging_enable = True # Enable detailed logging
client = DataverseClient(
base_url="https://myorg.crm.dynamics.com",
credential=InteractiveBrowserCredential(),
config=cfg
)
# Now SDK will log detailed HTTP requests/responses
records = client.get("account", top=10)
```
### Parse Error Responses
```python
import json
try:
client.create("account", invalid_payload)
except DataverseError as e:
# Extract structured error details
if e.details and isinstance(e.details, dict):
error_code = e.details.get('error', {}).get('code')
error_message = e.details.get('error', {}).get('message')
print(f"Error Code: {error_code}")
print(f"Error Message: {error_message}")
# Some errors include nested details
if 'error' in e.details and 'details' in e.details['error']:
for detail in e.details['error']['details']:
print(f" - {detail.get('code')}: {detail.get('message')}")
```
---
## 5. Dataverse-Specific Error Handling
### Handle OData Query Errors
```python
try:
# Invalid OData filter
records = client.get(
"account",
filter="invalid_column eq 0"
)
except DataverseError as e:
if "invalid column" in e.message.lower():
print("Check OData column names and syntax")
else:
print(f"Query error: {e.message}")
```
### Handle File Upload Errors
```python
try:
client.upload_file(
table_name="account",
record_id=record_id,
column_name="document_column",
file_path="large_file.pdf"
)
except DataverseError as e:
if e.status_code == 413:
print("File too large. Use chunked upload mode.")
elif e.status_code == 400:
print("Invalid column or file format.")
else:
raise
```
### Handle Table Metadata Operations
```python
try:
# Create custom table
table_def = {
"SchemaName": "new_CustomTable",
"DisplayName": "Custom Table"
}
client.create("EntityMetadata", table_def)
except DataverseError as e:
if "already exists" in e.message:
print("Table already exists")
elif "permission" in e.message.lower():
print("Insufficient permissions to create tables")
else:
raise
```
---
## 6. Monitoring and Alerting
### Wrap Client Calls with Monitoring
```python
from functools import wraps
import time
def monitor_operation(operation_name):
"""Decorator to monitor SDK operations."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
duration = time.time() - start_time
print(f"{operation_name} completed in {duration:.2f}s")
return result
except DataverseError as e:
duration = time.time() - start_time
print(f"{operation_name} failed after {duration:.2f}s")
print(f" Error: {e.code} ({e.status_code}): {e.message}")
raise
return wrapper
return decorator
@monitor_operation("Fetch Accounts")
def get_accounts(client):
return client.get("account", top=100)
# Usage
try:
accounts = get_accounts(client)
except DataverseError:
print("Operation failed - check logs for details")
```
---
## 7. Common Troubleshooting Checklist
| Issue | Diagnosis | Solution |
|-------|-----------|----------|
| 401 Unauthorized | Expired token or bad credentials | Re-authenticate with valid credentials |
| 403 Forbidden | User lacks permissions | Request access from administrator |
| 404 Not Found | Record/table doesn't exist | Verify schema name and record ID |
| 429 Rate Limited | Too many requests | Implement exponential backoff retry |
| 500+ Server Error | Service issue | Retry with exponential backoff; check status page |
| 400 Bad Request | Invalid request format | Check OData syntax, field names, required fields |
| Network timeout | Connection issues | Check network, increase timeout in DataverseConfig |
| InvalidOperationException | Plugin/workflow error | Check plugin logs in Dataverse |
---
## 8. Logging Best Practices
```python
import logging
import json
from datetime import datetime
class DataverseErrorHandler:
"""Centralized error handling and logging."""
def __init__(self, log_file="dataverse_errors.log"):
self.logger = logging.getLogger("DataverseSDK")
handler = logging.FileHandler(log_file)
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
self.logger.setLevel(logging.ERROR)
def log_error(self, error: DataverseError, context: str = ""):
"""Log error with context for debugging."""
error_record = {
"timestamp": datetime.utcnow().isoformat(),
"context": context,
"error": error.to_dict()
}
self.logger.error(json.dumps(error_record, indent=2))
def is_retryable(self, error: DataverseError) -> bool:
"""Check if error should be retried."""
return error.is_transient and error.status_code in {408, 429, 500, 502, 503, 504}
# Usage
error_handler = DataverseErrorHandler()
try:
client.create("account", payload)
except DataverseError as e:
error_handler.log_error(e, "create_account_batch_1")
if error_handler.is_retryable(e):
print("Will retry this operation")
else:
print("Operation failed permanently")
```
---
## 9. See Also
- [DataverseError API Reference](https://learn.microsoft.com/en-us/python/api/powerplatform-dataverse-client/powerplatform.dataverse.core.errors.dataverseerror)
- [Azure SDK Error Handling](https://learn.microsoft.com/en-us/azure/developer/python/sdk/fundamentals/errors)
- [Dataverse SDK Getting Started](https://learn.microsoft.com/en-us/power-apps/developer/data-platform/sdk-python/get-started)
- [Service Protection API Limits](https://learn.microsoft.com/en-us/power-apps/developer/data-platform/optimize-performance-create-update)

View File

@@ -0,0 +1,649 @@
# Dataverse SDK for Python - File Operations & Practical Examples
## Overview
Complete guide to file upload operations, chunking strategies, and practical real-world examples using the PowerPlatform-DataverseClient-Python SDK.
---
## 1. File Upload Fundamentals
### Small File Upload (< 128 MB)
```python
from pathlib import Path
from PowerPlatform.Dataverse.client import DataverseClient
file_path = Path("document.pdf")
record_id = "account-guid"
# Single PATCH upload for small files
response = client.upload_file(
table_name="account",
record_id=record_id,
file_column_name="new_documentfile",
file_path=file_path
)
print(f"Upload successful: {response}")
```
**When to use:** Documents, images, PDFs under 128 MB
### Large File Upload with Chunking
```python
from pathlib import Path
file_path = Path("large_video.mp4")
record_id = "account-guid"
# SDK automatically handles chunking for large files
response = client.upload_file(
table_name="account",
record_id=record_id,
file_column_name="new_videofile",
file_path=file_path,
chunk_size=4 * 1024 * 1024 # 4 MB chunks
)
print("Chunked upload complete")
```
**When to use:** Large videos, databases, archives > 128 MB
### Upload with Progress Tracking
```python
import hashlib
from pathlib import Path
def calculate_file_hash(file_path):
"""Calculate SHA-256 hash of file."""
hash_obj = hashlib.sha256()
with open(file_path, 'rb') as f:
for chunk in iter(lambda: f.read(1024*1024), b''):
hash_obj.update(chunk)
return hash_obj.hexdigest()
def upload_with_tracking(client, table_name, record_id, column_name, file_path):
"""Upload file with validation tracking."""
file_path = Path(file_path)
file_size = file_path.stat().st_size
print(f"Starting upload: {file_path.name} ({file_size / 1024 / 1024:.2f} MB)")
# Calculate hash before upload
original_hash = calculate_file_hash(file_path)
print(f"File hash: {original_hash}")
# Perform upload
response = client.upload_file(
table_name=table_name,
record_id=record_id,
file_column_name=column_name,
file_path=file_path
)
print(f"✓ Upload complete")
return response
# Usage
upload_with_tracking(client, "account", account_id, "new_documentfile", "report.pdf")
```
---
## 2. Upload Strategies & Configuration
### Automatic Chunking Decision
```python
def upload_file_smart(client, table_name, record_id, column_name, file_path):
"""Upload with automatic strategy selection."""
file_path = Path(file_path)
file_size = file_path.stat().st_size
max_single_patch = 128 * 1024 * 1024 # 128 MB
if file_size <= max_single_patch:
print(f"Using single PATCH (file < 128 MB)")
chunk_size = None # SDK will use single request
else:
print(f"Using chunked upload (file > 128 MB)")
chunk_size = 4 * 1024 * 1024 # 4 MB chunks
response = client.upload_file(
table_name=table_name,
record_id=record_id,
file_column_name=column_name,
file_path=file_path,
chunk_size=chunk_size
)
return response
# Usage
upload_file_smart(client, "account", account_id, "new_largemedifile", "video.mp4")
```
### Batch File Uploads
```python
from pathlib import Path
from PowerPlatform.Dataverse.core.errors import HttpError
def batch_upload_files(client, table_name, record_id, files_dict):
"""
Upload multiple files to different columns of same record.
Args:
table_name: Table name
record_id: Record ID
files_dict: {"column_name": "file_path", ...}
Returns:
{"success": [...], "failed": [...]}
"""
results = {"success": [], "failed": []}
for column_name, file_path in files_dict.items():
try:
print(f"Uploading {Path(file_path).name} to {column_name}...")
response = client.upload_file(
table_name=table_name,
record_id=record_id,
file_column_name=column_name,
file_path=file_path
)
results["success"].append({
"column": column_name,
"file": Path(file_path).name,
"response": response
})
print(f" ✓ Uploaded successfully")
except HttpError as e:
results["failed"].append({
"column": column_name,
"file": Path(file_path).name,
"error": str(e)
})
print(f" ❌ Upload failed: {e}")
return results
# Usage
files = {
"new_contractfile": "contract.pdf",
"new_specfile": "specification.docx",
"new_designfile": "design.png"
}
results = batch_upload_files(client, "account", account_id, files)
print(f"Success: {len(results['success'])}, Failed: {len(results['failed'])}")
```
### Resume Failed Uploads
```python
from pathlib import Path
import time
from PowerPlatform.Dataverse.core.errors import HttpError
def upload_with_retry(client, table_name, record_id, column_name, file_path, max_retries=3):
"""Upload with exponential backoff retry logic."""
file_path = Path(file_path)
for attempt in range(max_retries):
try:
print(f"Upload attempt {attempt + 1}/{max_retries}: {file_path.name}")
response = client.upload_file(
table_name=table_name,
record_id=record_id,
file_column_name=column_name,
file_path=file_path,
chunk_size=4 * 1024 * 1024
)
print(f"✓ Upload successful")
return response
except HttpError as e:
if attempt == max_retries - 1:
print(f"❌ Upload failed after {max_retries} attempts")
raise
# Exponential backoff: 1s, 2s, 4s
backoff_seconds = 2 ** attempt
print(f"⚠ Upload failed. Retrying in {backoff_seconds}s...")
time.sleep(backoff_seconds)
# Usage
upload_with_retry(client, "account", account_id, "new_documentfile", "contract.pdf")
```
---
## 3. Real-World Examples
### Example 1: Customer Document Management System
```python
from pathlib import Path
from datetime import datetime
from enum import IntEnum
from PowerPlatform.Dataverse.client import DataverseClient
from azure.identity import ClientSecretCredential
class DocumentType(IntEnum):
CONTRACT = 1
INVOICE = 2
SPECIFICATION = 3
OTHER = 4
# Setup
credential = ClientSecretCredential(
tenant_id="tenant-id",
client_id="client-id",
client_secret="client-secret"
)
client = DataverseClient("https://yourorg.crm.dynamics.com", credential)
def upload_customer_document(customer_id, doc_path, doc_type):
"""Upload document for customer."""
doc_path = Path(doc_path)
# Create document record
doc_record = {
"new_documentname": doc_path.stem,
"new_documenttype": doc_type,
"new_customerid": customer_id,
"new_uploadeddate": datetime.now().isoformat(),
"new_filesize": doc_path.stat().st_size
}
doc_ids = client.create("new_customerdocument", doc_record)
doc_id = doc_ids[0]
# Upload file
print(f"Uploading {doc_path.name}...")
client.upload_file(
table_name="new_customerdocument",
record_id=doc_id,
file_column_name="new_documentfile",
file_path=doc_path
)
print(f"✓ Document uploaded and linked to customer")
return doc_id
# Usage
customer_id = "customer-guid-here"
doc_id = upload_customer_document(
customer_id,
"contract.pdf",
DocumentType.CONTRACT
)
# Query uploaded documents
docs = client.get(
"new_customerdocument",
filter=f"new_customerid eq '{customer_id}'",
select=["new_documentname", "new_documenttype", "new_uploadeddate"]
)
for page in docs:
for doc in page:
print(f"- {doc['new_documentname']} ({doc['new_uploadeddate']})")
```
### Example 2: Media Gallery with Thumbnails
```python
from pathlib import Path
from enum import IntEnum
from PowerPlatform.Dataverse.client import DataverseClient
class MediaType(IntEnum):
PHOTO = 1
VIDEO = 2
DOCUMENT = 3
def create_media_gallery(client, gallery_name, media_files):
"""
Create media gallery with multiple files.
Args:
gallery_name: Gallery name
media_files: [{"file": path, "type": MediaType, "description": text}, ...]
"""
# Create gallery record
gallery_ids = client.create("new_mediagallery", {
"new_galleryname": gallery_name,
"new_createddate": datetime.now().isoformat()
})
gallery_id = gallery_ids[0]
# Create and upload media items
for media_info in media_files:
file_path = Path(media_info["file"])
# Create media item record
item_ids = client.create("new_mediaitem", {
"new_itemname": file_path.stem,
"new_mediatype": media_info["type"],
"new_description": media_info.get("description", ""),
"new_galleryid": gallery_id,
"new_filesize": file_path.stat().st_size
})
item_id = item_ids[0]
# Upload media file
print(f"Uploading {file_path.name}...")
client.upload_file(
table_name="new_mediaitem",
record_id=item_id,
file_column_name="new_mediafile",
file_path=file_path
)
print(f"{file_path.name}")
return gallery_id
# Usage
media_files = [
{"file": "photo1.jpg", "type": MediaType.PHOTO, "description": "Product shot 1"},
{"file": "photo2.jpg", "type": MediaType.PHOTO, "description": "Product shot 2"},
{"file": "demo.mp4", "type": MediaType.VIDEO, "description": "Product demo video"},
{"file": "manual.pdf", "type": MediaType.DOCUMENT, "description": "User manual"}
]
gallery_id = create_media_gallery(client, "Q4 Product Launch", media_files)
print(f"Created gallery: {gallery_id}")
```
### Example 3: Backup & Archival System
```python
from pathlib import Path
from datetime import datetime, timedelta
from PowerPlatform.Dataverse.client import DataverseClient
from PowerPlatform.Dataverse.core.errors import DataverseError
import json
def backup_table_data(client, table_name, output_dir):
"""
Backup table data to JSON files and create archive record.
"""
output_dir = Path(output_dir)
output_dir.mkdir(exist_ok=True)
backup_time = datetime.now()
backup_file = output_dir / f"{table_name}_{backup_time.strftime('%Y%m%d_%H%M%S')}.json"
print(f"Backing up {table_name}...")
# Retrieve all records
all_records = []
for page in client.get(table_name, top=5000):
all_records.extend(page)
# Write to JSON
with open(backup_file, 'w') as f:
json.dump(all_records, f, indent=2, default=str)
print(f" ✓ Exported {len(all_records)} records")
# Create backup record in Dataverse
backup_ids = client.create("new_backuprecord", {
"new_tablename": table_name,
"new_recordcount": len(all_records),
"new_backupdate": backup_time.isoformat(),
"new_status": 1 # Completed
})
backup_id = backup_ids[0]
# Upload backup file
print(f"Uploading backup file...")
client.upload_file(
table_name="new_backuprecord",
record_id=backup_id,
file_column_name="new_backupfile",
file_path=backup_file
)
return backup_id
# Usage
backup_id = backup_table_data(client, "account", "backups")
print(f"Backup created: {backup_id}")
```
### Example 4: Automated Report Generation & Storage
```python
from pathlib import Path
from datetime import datetime
from enum import IntEnum
from PowerPlatform.Dataverse.client import DataverseClient
import json
class ReportStatus(IntEnum):
PENDING = 1
PROCESSING = 2
COMPLETED = 3
FAILED = 4
def generate_and_store_report(client, report_type, data):
"""
Generate report from data and store in Dataverse.
"""
report_time = datetime.now()
# Generate report file (simulated)
report_file = Path(f"report_{report_type}_{report_time.strftime('%Y%m%d_%H%M%S')}.json")
with open(report_file, 'w') as f:
json.dump(data, f, indent=2)
# Create report record
report_ids = client.create("new_report", {
"new_reportname": f"{report_type} Report",
"new_reporttype": report_type,
"new_generateddate": report_time.isoformat(),
"new_status": ReportStatus.PROCESSING,
"new_recordcount": len(data.get("records", []))
})
report_id = report_ids[0]
try:
# Upload report file
print(f"Uploading report: {report_file.name}")
client.upload_file(
table_name="new_report",
record_id=report_id,
file_column_name="new_reportfile",
file_path=report_file
)
# Update status to completed
client.update("new_report", report_id, {
"new_status": ReportStatus.COMPLETED
})
print(f"✓ Report stored successfully")
return report_id
except Exception as e:
print(f"❌ Report generation failed: {e}")
client.update("new_report", report_id, {
"new_status": ReportStatus.FAILED,
"new_errormessage": str(e)
})
raise
finally:
# Clean up temp file
report_file.unlink(missing_ok=True)
# Usage
sales_data = {
"month": "January",
"records": [
{"product": "A", "sales": 10000},
{"product": "B", "sales": 15000},
{"product": "C", "sales": 8000}
]
}
report_id = generate_and_store_report(client, "SALES_SUMMARY", sales_data)
```
---
## 4. File Management Best Practices
### File Size Validation
```python
from pathlib import Path
def validate_file_for_upload(file_path, max_size_mb=500):
"""Validate file before upload."""
file_path = Path(file_path)
if not file_path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
file_size = file_path.stat().st_size
max_size_bytes = max_size_mb * 1024 * 1024
if file_size > max_size_bytes:
raise ValueError(f"File too large: {file_size / 1024 / 1024:.2f} MB > {max_size_mb} MB")
return file_size
# Usage
try:
size = validate_file_for_upload("document.pdf", max_size_mb=128)
print(f"File valid: {size / 1024 / 1024:.2f} MB")
except (FileNotFoundError, ValueError) as e:
print(f"Validation failed: {e}")
```
### Supported File Types Validation
```python
from pathlib import Path
ALLOWED_EXTENSIONS = {'.pdf', '.docx', '.xlsx', '.jpg', '.png', '.mp4', '.zip'}
def validate_file_type(file_path):
"""Validate file extension."""
file_path = Path(file_path)
if file_path.suffix.lower() not in ALLOWED_EXTENSIONS:
raise ValueError(f"Unsupported file type: {file_path.suffix}")
return True
# Usage
try:
validate_file_type("document.pdf")
print("File type valid")
except ValueError as e:
print(f"Invalid: {e}")
```
### Upload Logging & Audit Trail
```python
from pathlib import Path
from datetime import datetime
import json
def log_file_upload(table_name, record_id, file_path, status, error=None):
"""Log file upload for audit trail."""
file_path = Path(file_path)
log_entry = {
"timestamp": datetime.now().isoformat(),
"table": table_name,
"record_id": record_id,
"file_name": file_path.name,
"file_size": file_path.stat().st_size if file_path.exists() else 0,
"status": status,
"error": error
}
# Append to log file
log_file = Path("upload_audit.log")
with open(log_file, 'a') as f:
f.write(json.dumps(log_entry) + "\n")
return log_entry
# Usage in upload wrapper
def upload_with_logging(client, table_name, record_id, column_name, file_path):
"""Upload with audit logging."""
try:
client.upload_file(
table_name=table_name,
record_id=record_id,
file_column_name=column_name,
file_path=file_path
)
log_file_upload(table_name, record_id, file_path, "SUCCESS")
except Exception as e:
log_file_upload(table_name, record_id, file_path, "FAILED", str(e))
raise
```
---
## 5. Troubleshooting File Operations
### Common Issues & Solutions
#### Issue: File Upload Timeout
```python
# For very large files, increase chunk size strategically
response = client.upload_file(
table_name="account",
record_id=record_id,
file_column_name="new_file",
file_path="large_file.zip",
chunk_size=8 * 1024 * 1024 # 8 MB chunks
)
```
#### Issue: Insufficient Disk Space
```python
import shutil
from pathlib import Path
def check_upload_space(file_path):
"""Check if system has space for file + temp buffer."""
file_path = Path(file_path)
file_size = file_path.stat().st_size
# Get disk space
total, used, free = shutil.disk_usage(file_path.parent)
# Need file_size + 10% buffer
required_space = file_size * 1.1
if free < required_space:
raise OSError(f"Insufficient disk space: {free / 1024 / 1024:.0f} MB free, {required_space / 1024 / 1024:.0f} MB needed")
return True
```
#### Issue: File Corruption During Upload
```python
import hashlib
def verify_uploaded_file(local_path, remote_data):
"""Verify uploaded file integrity."""
# Calculate local hash
with open(local_path, 'rb') as f:
local_hash = hashlib.sha256(f.read()).hexdigest()
# Compare with metadata
remote_hash = remote_data.get("new_filehash")
if local_hash != remote_hash:
raise ValueError("File corruption detected: hash mismatch")
return True
```
---
## Reference
- [Official File Upload Example](https://github.com/microsoft/PowerPlatform-DataverseClient-Python/blob/main/examples/advanced/file_upload.py)
- [File Upload Best Practices](https://learn.microsoft.com/en-us/power-apps/developer/data-platform/file-column-data)

View File

@@ -0,0 +1,230 @@
---
applyTo: '**'
---
# Dataverse SDK for Python — Complete Module Reference
## Package Hierarchy
```
PowerPlatform.Dataverse
├── client
│ └── DataverseClient
├── core
│ ├── config (DataverseConfig)
│ └── errors (DataverseError, ValidationError, MetadataError, HttpError, SQLParseError)
├── data (OData operations, metadata, SQL, file upload)
├── extensions (placeholder for future extensions)
├── models (placeholder for data models and types)
└── utils (placeholder for utilities and adapters)
```
## core.config Module
Manage client connection and behavior settings.
### DataverseConfig Class
Container for language, timeouts, retries. Immutable.
```python
from PowerPlatform.Dataverse.core.config import DataverseConfig
cfg = DataverseConfig(
language_code=1033, # Default English (US)
http_retries=None, # Reserved for future
http_backoff=None, # Reserved for future
http_timeout=None # Reserved for future
)
# Or use default static builder
cfg_default = DataverseConfig.from_env()
```
**Key attributes:**
- `language_code: int = 1033` — LCID for localized labels and messages.
- `http_retries: int | None` — (Reserved) Maximum retry attempts for transient errors.
- `http_backoff: float | None` — (Reserved) Backoff multiplier between retries.
- `http_timeout: float | None` — (Reserved) Request timeout in seconds.
## core.errors Module
Structured exception hierarchy for SDK operations.
### DataverseError (Base)
Base exception for SDK errors.
```python
from PowerPlatform.Dataverse.core.errors import DataverseError
try:
# SDK call
pass
except DataverseError as e:
print(f"Code: {e.code}") # Error category
print(f"Subcode: {e.subcode}") # Specific error
print(f"Message: {e.message}") # Human-readable
print(f"Status: {e.status_code}") # HTTP status (if applicable)
print(f"Transient: {e.is_transient}") # Retry-worthy?
details = e.to_dict() # Convert to dict
```
### ValidationError
Validation failures during data operations.
```python
from PowerPlatform.Dataverse.core.errors import ValidationError
```
### MetadataError
Table/column creation, deletion, or inspection failures.
```python
from PowerPlatform.Dataverse.core.errors import MetadataError
try:
client.create_table("MyTable", {...})
except MetadataError as e:
print(f"Metadata issue: {e.message}")
```
### HttpError
Web API HTTP request failures (4xx, 5xx, etc.).
```python
from PowerPlatform.Dataverse.core.errors import HttpError
try:
client.get("account", record_id)
except HttpError as e:
print(f"HTTP {e.status_code}: {e.message}")
print(f"Service error code: {e.service_error_code}")
print(f"Correlation ID: {e.correlation_id}")
print(f"Request ID: {e.request_id}")
print(f"Retry-After: {e.retry_after} seconds")
print(f"Transient (retry?): {e.is_transient}") # 429, 503, 504
```
### SQLParseError
SQL query syntax errors when using `query_sql()`.
```python
from PowerPlatform.Dataverse.core.errors import SQLParseError
try:
client.query_sql("INVALID SQL HERE")
except SQLParseError as e:
print(f"SQL parse error: {e.message}")
```
## data Package
Low-level OData protocol, metadata, SQL, and file operations (internal delegation).
The `data` package is primarily internal; the high-level `DataverseClient` in the `client` module wraps and exposes:
- CRUD operations via OData
- Metadata management (create/update/delete tables and columns)
- SQL query execution
- File upload handling
Users interact with these via `DataverseClient` methods (e.g., `create()`, `get()`, `update()`, `delete()`, `create_table()`, `query_sql()`, `upload_file()`).
## extensions Package (Placeholder)
Reserved for future extension points (e.g., custom adapters, middleware).
Currently empty; use core and client modules for current functionality.
## models Package (Placeholder)
Reserved for future data model definitions and type definitions.
Currently empty. Data structures return as `dict` (OData) and are JSON-serializable.
## utils Package (Placeholder)
Reserved for utility adapters and helpers.
Currently empty. Helper functions may be added in future releases.
## client Module
Main user-facing API.
### DataverseClient Class
High-level client for all Dataverse operations.
```python
from azure.identity import InteractiveBrowserCredential
from PowerPlatform.Dataverse.client import DataverseClient
from PowerPlatform.Dataverse.core.config import DataverseConfig
# Create credential
credential = InteractiveBrowserCredential()
# Optionally configure
cfg = DataverseConfig(language_code=1033)
# Create client
client = DataverseClient(
base_url="https://org.crm.dynamics.com",
credential=credential,
config=cfg # optional
)
```
#### CRUD Methods
- `create(table_schema_name, records)``list[str]` — Create records, return GUIDs.
- `get(table_schema_name, record_id=None, select, filter, orderby, top, expand, page_size)` → Record(s).
- `update(table_schema_name, ids, changes)``None` — Update records.
- `delete(table_schema_name, ids, use_bulk_delete=True)``str | None` — Delete records.
#### Metadata Methods
- `create_table(table_schema_name, columns, solution_unique_name, primary_column_schema_name)` → Metadata dict.
- `create_columns(table_schema_name, columns)``list[str]`.
- `delete_columns(table_schema_name, columns)``list[str]`.
- `delete_table(table_schema_name)``None`.
- `get_table_info(table_schema_name)` → Metadata dict or `None`.
- `list_tables()``list[str]`.
#### SQL & Utilities
- `query_sql(sql)``list[dict]` — Execute read-only SQL.
- `upload_file(table_schema_name, record_id, file_name_attribute, path, mode, mime_type, if_none_match)``None` — Upload to file column.
- `flush_cache(kind)``int` — Clear SDK caches (e.g., `"picklist"`).
## Imports Summary
```python
# Main client
from PowerPlatform.Dataverse.client import DataverseClient
# Configuration
from PowerPlatform.Dataverse.core.config import DataverseConfig
# Errors
from PowerPlatform.Dataverse.core.errors import (
DataverseError,
ValidationError,
MetadataError,
HttpError,
SQLParseError,
)
```
## References
- Module docs: https://learn.microsoft.com/en-us/python/api/powerplatform-dataverse-client/
- Core: https://learn.microsoft.com/en-us/python/api/powerplatform-dataverse-client/powerplatform.dataverse.core
- Data: https://learn.microsoft.com/en-us/python/api/powerplatform-dataverse-client/powerplatform.dataverse.data
- Extensions: https://learn.microsoft.com/en-us/python/api/powerplatform-dataverse-client/powerplatform.dataverse.extensions
- Models: https://learn.microsoft.com/en-us/python/api/powerplatform-dataverse-client/powerplatform.dataverse.models
- Utils: https://learn.microsoft.com/en-us/python/api/powerplatform-dataverse-client/powerplatform.dataverse.utils
- Client: https://learn.microsoft.com/en-us/python/api/powerplatform-dataverse-client/powerplatform.dataverse.client

View File

@@ -0,0 +1,547 @@
# Dataverse SDK for Python - Pandas Integration Guide
## Overview
Guide to integrating the Dataverse SDK for Python with pandas DataFrames for data science and analysis workflows. The SDK's JSON response format maps seamlessly to pandas DataFrames, enabling data scientists to work with Dataverse data using familiar data manipulation tools.
---
## 1. Introduction to PandasODataClient
### What is PandasODataClient?
`PandasODataClient` is a thin wrapper around the standard `DataverseClient` that returns data in pandas DataFrame format instead of raw JSON dictionaries. This makes it ideal for:
- Data scientists working with tabular data
- Analytics and reporting workflows
- Data exploration and cleaning
- Integration with machine learning pipelines
### Installation Requirements
```bash
# Install core dependencies
pip install PowerPlatform-Dataverse-Client
pip install azure-identity
# Install pandas for data manipulation
pip install pandas
```
### When to Use PandasODataClient
**Use when you need:**
- Data exploration and analysis
- Working with tabular data
- Integration with statistical/ML libraries
- Efficient data manipulation
**Use DataverseClient instead when you need:**
- Real-time CRUD operations only
- File upload operations
- Metadata operations
- Single record operations
---
## 2. Basic DataFrame Workflow
### Converting Query Results to DataFrame
```python
from azure.identity import InteractiveBrowserCredential
from PowerPlatform.Dataverse.client import DataverseClient
import pandas as pd
# Setup authentication
base_url = "https://<myorg>.crm.dynamics.com"
credential = InteractiveBrowserCredential()
client = DataverseClient(base_url=base_url, credential=credential)
# Query data
pages = client.get(
"account",
select=["accountid", "name", "creditlimit", "telephone1"],
filter="statecode eq 0",
orderby=["name"]
)
# Collect all pages into one DataFrame
all_records = []
for page in pages:
all_records.extend(page)
# Convert to DataFrame
df = pd.DataFrame(all_records)
# Display first few rows
print(df.head())
print(f"Total records: {len(df)}")
```
### Query Parameters Map to DataFrame
```python
# All query parameters return as columns in DataFrame
df = pd.DataFrame(
client.get(
"account",
select=["accountid", "name", "creditlimit", "telephone1", "createdon"],
filter="creditlimit > 50000",
orderby=["creditlimit desc"]
)
)
# Result is a DataFrame with columns:
# accountid | name | creditlimit | telephone1 | createdon
```
---
## 3. Data Exploration with Pandas
### Basic Exploration
```python
import pandas as pd
from azure.identity import InteractiveBrowserCredential
from PowerPlatform.Dataverse.client import DataverseClient
client = DataverseClient("https://<myorg>.crm.dynamics.com", InteractiveBrowserCredential())
# Load account data
records = []
for page in client.get("account", select=["accountid", "name", "creditlimit", "industrycode"]):
records.extend(page)
df = pd.DataFrame(records)
# Explore the data
print(df.shape) # (1000, 4)
print(df.dtypes) # Data types
print(df.describe()) # Statistical summary
print(df.info()) # Column info and null counts
print(df.head(10)) # First 10 rows
```
### Filtering and Selecting
```python
# Filter rows by condition
high_value = df[df['creditlimit'] > 100000]
# Select specific columns
names_limits = df[['name', 'creditlimit']]
# Multiple conditions
filtered = df[(df['creditlimit'] > 50000) & (df['industrycode'] == 1)]
# Value counts
print(df['industrycode'].value_counts())
```
### Sorting and Grouping
```python
# Sort by column
sorted_df = df.sort_values('creditlimit', ascending=False)
# Group by and aggregate
by_industry = df.groupby('industrycode').agg({
'creditlimit': ['mean', 'sum', 'count'],
'name': 'count'
})
# Group statistics
print(df.groupby('industrycode')['creditlimit'].describe())
```
### Data Cleaning
```python
# Handle missing values
df_clean = df.dropna() # Remove rows with NaN
df_filled = df.fillna(0) # Fill NaN with 0
df_ffill = df.fillna(method='ffill') # Forward fill
# Check for duplicates
duplicates = df[df.duplicated(['name'])]
df_unique = df.drop_duplicates()
# Data type conversion
df['creditlimit'] = pd.to_numeric(df['creditlimit'])
df['createdon'] = pd.to_datetime(df['createdon'])
```
---
## 4. Data Analysis Patterns
### Aggregation and Summarization
```python
# Create summary report
summary = df.groupby('industrycode').agg({
'accountid': 'count',
'creditlimit': ['mean', 'min', 'max', 'sum'],
'name': lambda x: ', '.join(x.head(3)) # Sample names
}).round(2)
print(summary)
```
### Time-Series Analysis
```python
# Convert to datetime
df['createdon'] = pd.to_datetime(df['createdon'])
# Resample to monthly
monthly = df.set_index('createdon').resample('M').size()
# Extract date components
df['year'] = df['createdon'].dt.year
df['month'] = df['createdon'].dt.month
df['day_of_week'] = df['createdon'].dt.day_name()
```
### Join and Merge Operations
```python
# Load two related tables
accounts = pd.DataFrame(client.get("account", select=["accountid", "name"]))
contacts = pd.DataFrame(client.get("contact", select=["contactid", "parentcustomerid", "fullname"]))
# Merge on relationship
merged = accounts.merge(
contacts,
left_on='accountid',
right_on='parentcustomerid',
how='left'
)
print(merged.head())
```
### Statistical Analysis
```python
# Correlation matrix
correlation = df[['creditlimit', 'industrycode']].corr()
# Distribution analysis
print(df['creditlimit'].describe())
print(df['creditlimit'].skew())
print(df['creditlimit'].kurtosis())
# Percentiles
print(df['creditlimit'].quantile([0.25, 0.5, 0.75]))
```
---
## 5. Pivot Tables and Reports
### Creating Pivot Tables
```python
# Pivot table by industry and status
pivot = pd.pivot_table(
df,
values='creditlimit',
index='industrycode',
columns='statecode',
aggfunc=['sum', 'mean', 'count']
)
print(pivot)
```
### Generating Reports
```python
# Sales report by industry
industry_report = df.groupby('industrycode').agg({
'accountid': 'count',
'creditlimit': 'sum',
'name': 'first'
}).rename(columns={
'accountid': 'Account Count',
'creditlimit': 'Total Credit Limit',
'name': 'Sample Account'
})
# Export to CSV
industry_report.to_csv('industry_report.csv')
# Export to Excel
industry_report.to_excel('industry_report.xlsx')
```
---
## 6. Data Visualization
### Matplotlib Integration
```python
import matplotlib.pyplot as plt
# Create visualizations
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
# Histogram
df['creditlimit'].hist(bins=30, ax=axes[0, 0])
axes[0, 0].set_title('Credit Limit Distribution')
# Bar chart
df['industrycode'].value_counts().plot(kind='bar', ax=axes[0, 1])
axes[0, 1].set_title('Accounts by Industry')
# Box plot
df.boxplot(column='creditlimit', by='industrycode', ax=axes[1, 0])
axes[1, 0].set_title('Credit Limit by Industry')
# Scatter plot
df.plot.scatter(x='creditlimit', y='industrycode', ax=axes[1, 1])
axes[1, 1].set_title('Credit Limit vs Industry')
plt.tight_layout()
plt.show()
```
### Seaborn Integration
```python
import seaborn as sns
# Correlation heatmap
plt.figure(figsize=(8, 6))
sns.heatmap(df[['creditlimit', 'industrycode']].corr(), annot=True)
plt.title('Correlation Matrix')
plt.show()
# Distribution plot
sns.distplot(df['creditlimit'], kde=True)
plt.title('Credit Limit Distribution')
plt.show()
```
---
## 7. Machine Learning Integration
### Preparing Data for ML
```python
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
# Load and prepare data
records = []
for page in client.get("account", select=["accountid", "creditlimit", "industrycode", "statecode"]):
records.extend(page)
df = pd.DataFrame(records)
# Feature engineering
df['log_creditlimit'] = np.log1p(df['creditlimit'])
df['industry_cat'] = pd.Categorical(df['industrycode']).codes
# Split features and target
X = df[['industrycode', 'log_creditlimit']]
y = df['statecode']
# Train-test split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
print(f"Training set: {len(X_train)}, Test set: {len(X_test)}")
```
### Building a Classification Model
```python
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report
# Train model
model = RandomForestClassifier(n_estimators=100)
model.fit(X_train, y_train)
# Evaluate
y_pred = model.predict(X_test)
print(classification_report(y_test, y_pred))
# Feature importance
importances = pd.Series(
model.feature_importances_,
index=X.columns
).sort_values(ascending=False)
print(importances)
```
---
## 8. Advanced DataFrame Operations
### Custom Functions
```python
# Apply function to columns
df['name_length'] = df['name'].apply(len)
# Apply function to rows
df['category'] = df.apply(
lambda row: 'High' if row['creditlimit'] > 100000 else 'Low',
axis=1
)
# Conditional operations
df['adjusted_limit'] = df['creditlimit'].where(
df['statecode'] == 0,
df['creditlimit'] * 0.5
)
```
### String Operations
```python
# String methods
df['name_upper'] = df['name'].str.upper()
df['name_starts'] = df['name'].str.startswith('A')
df['name_contains'] = df['name'].str.contains('Inc')
df['name_split'] = df['name'].str.split(',').str[0]
# Replace and substitute
df['industry'] = df['industrycode'].map({
1: 'Retail',
2: 'Manufacturing',
3: 'Technology'
})
```
### Reshaping Data
```python
# Transpose
transposed = df.set_index('name').T
# Stack/Unstack
stacked = df.set_index(['name', 'industrycode'])['creditlimit'].unstack()
# Melt long format
melted = pd.melt(df, id_vars=['name'], var_name='metric', value_name='value')
```
---
## 9. Performance Optimization
### Efficient Data Loading
```python
# Load large datasets in chunks
all_records = []
chunk_size = 1000
for page in client.get(
"account",
select=["accountid", "name", "creditlimit"],
top=10000, # Limit total records
page_size=chunk_size
):
all_records.extend(page)
if len(all_records) % 5000 == 0:
print(f"Loaded {len(all_records)} records")
df = pd.DataFrame(all_records)
print(f"Total: {len(df)} records")
```
### Memory Optimization
```python
# Reduce memory usage
# Use categorical for repeated values
df['industrycode'] = df['industrycode'].astype('category')
# Use appropriate numeric types
df['creditlimit'] = pd.to_numeric(df['creditlimit'], downcast='float')
# Delete columns no longer needed
df = df.drop(columns=['unused_col1', 'unused_col2'])
# Check memory usage
print(df.memory_usage(deep=True).sum() / 1024**2, "MB")
```
### Query Optimization
```python
# Apply filters on server, not client
# ✅ GOOD: Filter on server
accounts = client.get(
"account",
filter="creditlimit > 50000", # Server-side filter
select=["accountid", "name", "creditlimit"]
)
# ❌ BAD: Load all, filter locally
all_accounts = client.get("account") # Loads everything
filtered = [a for a in all_accounts if a['creditlimit'] > 50000] # Client-side
```
---
## 10. Complete Example: Sales Analytics
```python
import pandas as pd
import numpy as np
from azure.identity import InteractiveBrowserCredential
from PowerPlatform.Dataverse.client import DataverseClient
# Setup
client = DataverseClient(
"https://<myorg>.crm.dynamics.com",
InteractiveBrowserCredential()
)
# Load data
print("Loading account data...")
records = []
for page in client.get(
"account",
select=["accountid", "name", "creditlimit", "industrycode", "statecode", "createdon"],
orderby=["createdon"]
):
records.extend(page)
df = pd.DataFrame(records)
df['createdon'] = pd.to_datetime(df['createdon'])
# Data cleaning
df = df.dropna()
# Feature engineering
df['year'] = df['createdon'].dt.year
df['month'] = df['createdon'].dt.month
df['year_month'] = df['createdon'].dt.to_period('M')
# Analysis
print("\n=== ACCOUNT OVERVIEW ===")
print(f"Total accounts: {len(df)}")
print(f"Total credit limit: ${df['creditlimit'].sum():,.2f}")
print(f"Average credit limit: ${df['creditlimit'].mean():,.2f}")
print("\n=== BY INDUSTRY ===")
industry_summary = df.groupby('industrycode').agg({
'accountid': 'count',
'creditlimit': ['sum', 'mean']
}).round(2)
print(industry_summary)
print("\n=== BY STATUS ===")
status_summary = df.groupby('statecode').agg({
'accountid': 'count',
'creditlimit': 'sum'
})
print(status_summary)
# Export report
print("\n=== EXPORTING REPORT ===")
industry_summary.to_csv('industry_analysis.csv')
print("Report saved to industry_analysis.csv")
```
---
## 11. Known Limitations
- `PandasODataClient` currently requires manual DataFrame creation from query results
- Very large DataFrames (millions of rows) may experience memory constraints
- Pandas operations are client-side; server-side aggregation is more efficient for large datasets
- File operations require standard `DataverseClient`, not pandas wrapper
---
## 12. Related Resources
- [Pandas Documentation](https://pandas.pydata.org/docs/)
- [Official Example: quickstart_pandas.py](https://github.com/microsoft/PowerPlatform-DataverseClient-Python/blob/main/examples/quickstart_pandas.py)
- [SDK for Python README](https://github.com/microsoft/PowerPlatform-DataverseClient-Python/blob/main/README.md)
- [Microsoft Learn: Working with data](https://learn.microsoft.com/en-us/power-apps/developer/data-platform/sdk-python/work-data)

View File

@@ -0,0 +1,499 @@
---
applyTo: '**'
---
# Dataverse SDK for Python — Performance & Optimization Guide
Based on official Microsoft Dataverse and Azure SDK performance guidance.
## 1. Performance Overview
The Dataverse SDK for Python is optimized for Python developers but has some limitations in preview:
- **Minimal retry policy**: Only network errors are retried by default
- **No DeleteMultiple**: Use individual deletes or update status instead
- **Limited OData batching**: General-purpose OData batching not supported
- **SQL limitations**: No JOINs, limited WHERE/TOP/ORDER BY
Workarounds and optimization strategies address these limitations.
---
## 2. Query Optimization
### Use Select to Limit Columns
```python
# ❌ SLOW - Retrieves all columns
accounts = client.get("account", top=100)
# ✅ FAST - Only retrieve needed columns
accounts = client.get(
"account",
select=["accountid", "name", "telephone1", "creditlimit"],
top=100
)
```
**Impact**: Reduces payload size and memory usage by 30-50%.
---
### Use Filters Efficiently
```python
# ❌ SLOW - Fetch all, filter in Python
all_accounts = client.get("account")
active_accounts = [a for a in all_accounts if a.get("statecode") == 0]
# ✅ FAST - Filter server-side
accounts = client.get(
"account",
filter="statecode eq 0",
top=100
)
```
**OData filter examples**:
```python
# Equals
filter="statecode eq 0"
# String contains
filter="contains(name, 'Acme')"
# Multiple conditions
filter="statecode eq 0 and createdon gt 2025-01-01Z"
# Not equals
filter="statecode ne 2"
```
---
### Order by for Predictable Paging
```python
# Ensure consistent order for pagination
accounts = client.get(
"account",
orderby=["createdon desc", "name asc"],
page_size=100
)
for page in accounts:
process_page(page)
```
---
## 3. Pagination Best Practices
### Lazy Pagination (Recommended)
```python
# ✅ BEST - Generator yields one page at a time
pages = client.get(
"account",
top=5000, # Total limit
page_size=200 # Per-page size (hint)
)
for page in pages: # Each iteration fetches one page
for record in page:
process_record(record) # Process immediately
```
**Benefits**:
- Memory efficient (pages loaded on-demand)
- Fast time-to-first-result
- Can stop early if needed
### Avoid Loading Everything into Memory
```python
# ❌ SLOW - Loads all 100,000 records at once
all_records = list(client.get("account", top=100000))
process(all_records)
# ✅ FAST - Process as you go
for page in client.get("account", top=100000, page_size=5000):
process(page)
```
---
## 4. Batch Operations
### Bulk Create (Recommended)
```python
# ✅ BEST - Single call with multiple records
payloads = [
{"name": f"Account {i}", "telephone1": f"555-{i:04d}"}
for i in range(1000)
]
ids = client.create("account", payloads) # One API call for many records
```
### Bulk Update - Broadcast Mode
```python
# ✅ FAST - Same update applied to many records
account_ids = ["id1", "id2", "id3", "..."]
client.update("account", account_ids, {"statecode": 1}) # One call
```
### Bulk Update - Per-Record Mode
```python
# ✅ ACCEPTABLE - Different updates for each record
account_ids = ["id1", "id2", "id3"]
updates = [
{"telephone1": "555-0100"},
{"telephone1": "555-0200"},
{"telephone1": "555-0300"},
]
client.update("account", account_ids, updates)
```
### Batch Size Tuning
Based on table complexity (per Microsoft guidance):
| Table Type | Batch Size | Max Threads |
|------------|-----------|-------------|
| OOB (Account, Contact, Lead) | 200-300 | 30 |
| Simple (few lookups) | ≤10 | 50 |
| Moderately complex | ≤100 | 30 |
| Large/complex (>100 cols, >20 lookups) | 10-20 | 10-20 |
```python
def bulk_create_optimized(client, table_name, payloads, batch_size=200):
"""Create records in optimal batch size."""
for i in range(0, len(payloads), batch_size):
batch = payloads[i:i + batch_size]
ids = client.create(table_name, batch)
print(f"Created {len(ids)} records")
yield ids
```
---
## 5. Connection Management
### Reuse Client Instance
```python
# ❌ BAD - Creates new connection each time
def process_batch():
for batch in batches:
client = DataverseClient(...) # Expensive!
client.create("account", batch)
# ✅ GOOD - Reuse connection
client = DataverseClient(...) # Create once
def process_batch():
for batch in batches:
client.create("account", batch) # Reuse
```
### Global Client Instance
```python
# singleton_client.py
from azure.identity import DefaultAzureCredential
from PowerPlatform.Dataverse.client import DataverseClient
_client = None
def get_client():
global _client
if _client is None:
_client = DataverseClient(
base_url="https://myorg.crm.dynamics.com",
credential=DefaultAzureCredential()
)
return _client
# main.py
from singleton_client import get_client
client = get_client()
records = client.get("account")
```
### Connection Timeout Configuration
```python
from PowerPlatform.Dataverse.core.config import DataverseConfig
cfg = DataverseConfig()
cfg.http_timeout = 30 # Request timeout
cfg.connection_timeout = 5 # Connection timeout
client = DataverseClient(
base_url="https://myorg.crm.dynamics.com",
credential=credential,
config=cfg
)
```
---
## 6. Async Operations (Future Capability)
Currently synchronous, but prepare for async:
```python
# Recommended pattern for future async support
import asyncio
async def get_accounts_async(client):
"""Pattern for future async SDK."""
# When SDK supports async:
# accounts = await client.get("account")
# For now, use sync with executor
loop = asyncio.get_event_loop()
accounts = await loop.run_in_executor(
None,
lambda: list(client.get("account"))
)
return accounts
# Usage
accounts = asyncio.run(get_accounts_async(client))
```
---
## 7. File Upload Optimization
### Small Files (<128 MB)
```python
# ✅ FAST - Single request
client.upload_file(
table_name="account",
record_id=record_id,
column_name="document_column",
file_path="small_file.pdf"
)
```
### Large Files (>128 MB)
```python
# ✅ OPTIMIZED - Chunked upload
client.upload_file(
table_name="account",
record_id=record_id,
column_name="document_column",
file_path="large_file.pdf",
mode='chunk',
if_none_match=True
)
# SDK automatically:
# 1. Splits file into 4MB chunks
# 2. Uploads chunks in parallel
# 3. Assembles on server
```
---
## 8. OData Query Optimization
### SQL Alternative (Simple Queries)
```python
# ✅ SOMETIMES FASTER - Direct SQL for SELECT only
# Limited support: single SELECT, optional WHERE/TOP/ORDER BY
records = client.get(
"account",
sql="SELECT accountid, name FROM account WHERE statecode = 0 ORDER BY name"
)
```
### Complex Queries
```python
# ❌ NOT SUPPORTED - JOINs, complex WHERE
sql="SELECT a.accountid, c.fullname FROM account a JOIN contact c ON a.accountid = c.parentcustomerid"
# ✅ WORKAROUND - Get accounts, then contacts for each
accounts = client.get("account", select=["accountid", "name"])
for account in accounts:
contacts = client.get(
"contact",
filter=f"parentcustomerid eq '{account['accountid']}'"
)
process(account, contacts)
```
---
## 9. Memory Management
### Process Large Datasets Incrementally
```python
import gc
def process_large_table(client, table_name):
"""Process millions of records without memory issues."""
for page in client.get(table_name, page_size=5000):
for record in page:
result = process_record(record)
save_result(result)
# Force garbage collection between pages
gc.collect()
```
### DataFrame Integration with Chunking
```python
import pandas as pd
def load_to_dataframe_chunked(client, table_name, chunk_size=10000):
"""Load data to DataFrame in chunks."""
dfs = []
for page in client.get(table_name, page_size=1000):
df_chunk = pd.DataFrame(page)
dfs.append(df_chunk)
# Combine when chunk threshold reached
if len(dfs) >= chunk_size // 1000:
df = pd.concat(dfs, ignore_index=True)
process_chunk(df)
dfs = []
# Process remaining
if dfs:
df = pd.concat(dfs, ignore_index=True)
process_chunk(df)
```
---
## 10. Rate Limiting Handling
SDK has minimal retry support - implement manually:
```python
import time
from PowerPlatform.Dataverse.core.errors import DataverseError
def call_with_backoff(func, max_retries=3):
"""Call function with exponential backoff for rate limits."""
for attempt in range(max_retries):
try:
return func()
except DataverseError as e:
if e.status_code == 429: # Too Many Requests
if attempt < max_retries - 1:
wait_time = 2 ** attempt # 1s, 2s, 4s
print(f"Rate limited. Waiting {wait_time}s...")
time.sleep(wait_time)
else:
raise
else:
raise
# Usage
ids = call_with_backoff(
lambda: client.create("account", payload)
)
```
---
## 11. Transaction Consistency (Known Limitation)
SDK doesn't have transactional guarantees:
```python
# ⚠️ If bulk operation partially fails, some records may be created
def create_with_consistency_check(client, table_name, payloads):
"""Create records and verify all succeeded."""
try:
ids = client.create(table_name, payloads)
# Verify all records created
created = client.get(
table_name,
filter=f"isof(Microsoft.Dynamics.CRM.{table_name})"
)
if len(ids) != count_created:
print(f"⚠️ Only {count_created}/{len(ids)} records created")
# Handle partial failure
except Exception as e:
print(f"Creation failed: {e}")
# Check what was created
```
---
## 12. Monitoring Performance
### Log Operation Duration
```python
import time
import logging
logger = logging.getLogger("dataverse")
def monitored_operation(operation_name):
"""Decorator to monitor operation performance."""
def decorator(func):
def wrapper(*args, **kwargs):
start = time.time()
try:
result = func(*args, **kwargs)
duration = time.time() - start
logger.info(f"{operation_name}: {duration:.2f}s")
return result
except Exception as e:
duration = time.time() - start
logger.error(f"{operation_name} failed after {duration:.2f}s: {e}")
raise
return wrapper
return decorator
@monitored_operation("Bulk Create Accounts")
def create_accounts(client, payloads):
return client.create("account", payloads)
```
---
## 13. Performance Checklist
| Item | Status | Notes |
|------|--------|-------|
| Reuse client instance | ☐ | Create once, reuse |
| Use select to limit columns | ☐ | Only retrieve needed data |
| Filter server-side with OData | ☐ | Don't fetch all and filter |
| Use pagination with page_size | ☐ | Process incrementally |
| Batch operations | ☐ | Use create/update for multiple |
| Tune batch size by table type | ☐ | OOB=200-300, Simple=≤10 |
| Handle rate limiting (429) | ☐ | Implement exponential backoff |
| Use chunked upload for large files | ☐ | SDK handles for >128MB |
| Monitor operation duration | ☐ | Log timing for analysis |
| Test with production-like data | ☐ | Performance varies with data volume |
---
## 14. See Also
- [Dataverse Web API Performance](https://learn.microsoft.com/en-us/power-apps/developer/data-platform/optimize-performance-create-update)
- [OData Query Options](https://learn.microsoft.com/en-us/power-apps/developer/data-platform/webapi/query-data-web-api)
- [SDK Working with Data](https://learn.microsoft.com/en-us/power-apps/developer/data-platform/sdk-python/work-data)

View File

@@ -0,0 +1,730 @@
---
applyTo: '**'
---
# Dataverse SDK for Python — Real-World Use Cases & Templates
Based on official Dataverse data migration and integration patterns.
## 1. Data Migration from Legacy Systems
### Migration Architecture
```
Legacy System → Staging Database → Dataverse
(Extract) (Transform) (Load)
```
### Complete Migration Example
```python
import pandas as pd
import time
from PowerPlatform.Dataverse.client import DataverseClient
from PowerPlatform.Dataverse.core.errors import DataverseError
from azure.identity import DefaultAzureCredential
class DataMigrationPipeline:
"""Migrate data from legacy system to Dataverse."""
def __init__(self, org_url: str):
self.client = DataverseClient(
base_url=org_url,
credential=DefaultAzureCredential()
)
self.success_records = []
self.failed_records = []
def extract_from_legacy(self, legacy_db_connection, query: str):
"""Extract data from source system."""
return pd.read_sql(query, legacy_db_connection)
def transform_accounts(self, df: pd.DataFrame) -> list:
"""Transform source data to Dataverse schema."""
payloads = []
for _, row in df.iterrows():
# Map source fields to Dataverse
payload = {
"name": row["company_name"][:100], # Limit to 100 chars
"telephone1": row["phone"],
"websiteurl": row["website"],
"revenue": float(row["annual_revenue"]) if row["annual_revenue"] else None,
"numberofemployees": int(row["employees"]) if row["employees"] else None,
# Track source ID for reconciliation
"new_sourcecompanyid": str(row["legacy_id"]),
"new_importsequencenumber": row["legacy_id"]
}
payloads.append(payload)
return payloads
def load_to_dataverse(self, payloads: list, batch_size: int = 200):
"""Load data to Dataverse with error tracking."""
total = len(payloads)
for i in range(0, total, batch_size):
batch = payloads[i:i + batch_size]
try:
ids = self.client.create("account", batch)
self.success_records.extend(ids)
print(f"✓ Created {len(ids)} records ({len(self.success_records)}/{total})")
# Prevent rate limiting
time.sleep(0.5)
except DataverseError as e:
self.failed_records.extend(batch)
print(f"✗ Batch failed: {e.message}")
def reconcile_migration(self, df: pd.DataFrame):
"""Verify migration and track results."""
# Query created records
created_accounts = self.client.get(
"account",
filter="new_importsequencenumber ne null",
select=["accountid", "new_sourcecompanyid", "new_importsequencenumber"],
top=10000
)
created_df = pd.DataFrame(list(created_accounts))
# Update source table with Dataverse IDs
merged = df.merge(
created_df,
left_on="legacy_id",
right_on="new_importsequencenumber"
)
print(f"Successfully migrated {len(merged)} accounts")
print(f"Failed: {len(self.failed_records)} records")
return {
"total_source": len(df),
"migrated": len(merged),
"failed": len(self.failed_records),
"success_rate": len(merged) / len(df) * 100
}
# Usage
pipeline = DataMigrationPipeline("https://myorg.crm.dynamics.com")
# Extract
source_data = pipeline.extract_from_legacy(
legacy_connection,
"SELECT id, company_name, phone, website, annual_revenue, employees FROM companies"
)
# Transform
payloads = pipeline.transform_accounts(source_data)
# Load
pipeline.load_to_dataverse(payloads, batch_size=300)
# Reconcile
results = pipeline.reconcile_migration(source_data)
print(results)
```
---
## 2. Data Quality & Deduplication Agent
### Detect and Merge Duplicates
```python
from PowerPlatform.Dataverse.client import DataverseClient
from azure.identity import DefaultAzureCredential
import difflib
class DataQualityAgent:
"""Monitor and improve data quality."""
def __init__(self, org_url: str):
self.client = DataverseClient(
base_url=org_url,
credential=DefaultAzureCredential()
)
def find_potential_duplicates(self, table_name: str, match_fields: list):
"""Find potential duplicate records."""
records = []
for page in self.client.get(table_name, select=match_fields, top=10000):
records.extend(page)
duplicates = []
seen = {}
for record in records:
# Create key from match fields
key = tuple(
record.get(field, "").lower().strip()
for field in match_fields
)
if key in seen and key != ("",) * len(match_fields):
duplicates.append({
"original": seen[key],
"duplicate": record,
"fields_matched": match_fields
})
else:
seen[key] = record
return duplicates, len(records)
def merge_records(self, table_name: str, primary_id: str, duplicate_id: str,
mapping: dict):
"""Merge duplicate record into primary."""
# Copy data from duplicate to primary
updates = {}
duplicate = self.client.get(table_name, duplicate_id)
for source_field, target_field in mapping.items():
if duplicate.get(source_field) and not primary.get(target_field):
updates[target_field] = duplicate[source_field]
# Update primary
if updates:
self.client.update(table_name, primary_id, updates)
# Delete duplicate
self.client.delete(table_name, duplicate_id)
return f"Merged {duplicate_id} into {primary_id}"
def generate_quality_report(self, table_name: str) -> dict:
"""Generate data quality metrics."""
records = list(self.client.get(table_name, top=10000))
report = {
"table": table_name,
"total_records": len(records),
"null_values": {},
"duplicates": 0,
"completeness_score": 0
}
# Check null values
all_fields = set()
for record in records:
all_fields.update(record.keys())
for field in all_fields:
null_count = sum(1 for r in records if not r.get(field))
completeness = (len(records) - null_count) / len(records) * 100
if completeness < 100:
report["null_values"][field] = {
"null_count": null_count,
"completeness": completeness
}
# Check duplicates
duplicates, _ = self.find_potential_duplicates(
table_name,
["name", "emailaddress1"]
)
report["duplicates"] = len(duplicates)
# Overall completeness
avg_completeness = sum(
100 - ((d["null_count"] / len(records)) * 100)
for d in report["null_values"].values()
) / len(report["null_values"]) if report["null_values"] else 100
report["completeness_score"] = avg_completeness
return report
# Usage
agent = DataQualityAgent("https://myorg.crm.dynamics.com")
# Find duplicates
duplicates, total = agent.find_potential_duplicates(
"account",
match_fields=["name", "emailaddress1"]
)
print(f"Found {len(duplicates)} potential duplicates out of {total} accounts")
# Merge if confident
for dup in duplicates[:5]: # Process top 5
result = agent.merge_records(
"account",
primary_id=dup["original"]["accountid"],
duplicate_id=dup["duplicate"]["accountid"],
mapping={"telephone1": "telephone1", "websiteurl": "websiteurl"}
)
print(result)
# Quality report
report = agent.generate_quality_report("account")
print(f"Data Quality: {report['completeness_score']:.1f}%")
```
---
## 3. Contact & Account Enrichment
### Enrich CRM Data from External Sources
```python
import requests
from PowerPlatform.Dataverse.client import DataverseClient
from azure.identity import DefaultAzureCredential
class DataEnrichmentAgent:
"""Enrich CRM records with external data."""
def __init__(self, org_url: str, external_api_key: str):
self.client = DataverseClient(
base_url=org_url,
credential=DefaultAzureCredential()
)
self.api_key = external_api_key
def enrich_accounts_with_industry_data(self):
"""Enrich accounts with industry classification."""
accounts = self.client.get(
"account",
select=["accountid", "name", "websiteurl"],
filter="new_industrydata eq null",
top=500
)
enriched_count = 0
for page in accounts:
for account in page:
try:
# Call external API
industry = self._lookup_industry(account["name"])
if industry:
self.client.update(
"account",
account["accountid"],
{"new_industrydata": industry}
)
enriched_count += 1
except Exception as e:
print(f"Failed to enrich {account['name']}: {e}")
return enriched_count
def enrich_contacts_with_social_profiles(self):
"""Find and link social media profiles."""
contacts = self.client.get(
"contact",
select=["contactid", "fullname", "emailaddress1"],
filter="new_linkedinurl eq null",
top=500
)
for page in contacts:
for contact in page:
try:
# Find social profiles
profiles = self._find_social_profiles(
contact["fullname"],
contact["emailaddress1"]
)
if profiles:
self.client.update(
"contact",
contact["contactid"],
{
"new_linkedinurl": profiles.get("linkedin"),
"new_twitterhandle": profiles.get("twitter")
}
)
except Exception as e:
print(f"Failed to enrich {contact['fullname']}: {e}")
def _lookup_industry(self, company_name: str) -> str:
"""Call external industry API."""
response = requests.get(
"https://api.example.com/industry",
params={"company": company_name},
headers={"Authorization": f"Bearer {self.api_key}"}
)
if response.status_code == 200:
return response.json().get("industry")
return None
def _find_social_profiles(self, name: str, email: str) -> dict:
"""Find social media profiles for person."""
response = requests.get(
"https://api.example.com/social",
params={"name": name, "email": email},
headers={"Authorization": f"Bearer {self.api_key}"}
)
if response.status_code == 200:
return response.json()
return {}
# Usage
enricher = DataEnrichmentAgent(
"https://myorg.crm.dynamics.com",
api_key="your-api-key"
)
enriched = enricher.enrich_accounts_with_industry_data()
print(f"Enriched {enriched} accounts")
```
---
## 4. Automated Report Data Export
### Export CRM Data to Excel
```python
import pandas as pd
from PowerPlatform.Dataverse.client import DataverseClient
from azure.identity import DefaultAzureCredential
from datetime import datetime
class ReportExporter:
"""Export Dataverse data to reports."""
def __init__(self, org_url: str):
self.client = DataverseClient(
base_url=org_url,
credential=DefaultAzureCredential()
)
def export_sales_summary(self, output_file: str):
"""Export sales data for reporting."""
accounts = []
for page in self.client.get(
"account",
select=["accountid", "name", "revenue", "numberofemployees",
"createdon", "modifiedon"],
filter="statecode eq 0", # Active only
orderby=["revenue desc"],
top=10000
):
accounts.extend(page)
# Opportunities
opportunities = []
for page in self.client.get(
"opportunity",
select=["opportunityid", "name", "estimatedvalue",
"statuscode", "parentaccountid", "createdon"],
top=10000
):
opportunities.extend(page)
# Create DataFrames
df_accounts = pd.DataFrame(accounts)
df_opportunities = pd.DataFrame(opportunities)
# Generate report
with pd.ExcelWriter(output_file) as writer:
df_accounts.to_excel(writer, sheet_name="Accounts", index=False)
df_opportunities.to_excel(writer, sheet_name="Opportunities", index=False)
# Summary sheet
summary = pd.DataFrame({
"Metric": [
"Total Accounts",
"Total Opportunities",
"Total Revenue",
"Export Date"
],
"Value": [
len(df_accounts),
len(df_opportunities),
df_accounts["revenue"].sum() if "revenue" in df_accounts else 0,
datetime.now().isoformat()
]
})
summary.to_excel(writer, sheet_name="Summary", index=False)
return output_file
def export_activity_log(self, days_back: int = 30) -> str:
"""Export recent activity for audit."""
from_date = pd.Timestamp.now(tz='UTC') - pd.Timedelta(days=days_back)
activities = []
for page in self.client.get(
"activitypointer",
select=["activityid", "subject", "activitytypecode",
"createdon", "ownerid"],
filter=f"createdon gt {from_date.isoformat()}",
orderby=["createdon desc"],
top=10000
):
activities.extend(page)
df = pd.DataFrame(activities)
output = f"activity_log_{datetime.now():%Y%m%d}.csv"
df.to_csv(output, index=False)
return output
# Usage
exporter = ReportExporter("https://myorg.crm.dynamics.com")
report_file = exporter.export_sales_summary("sales_report.xlsx")
print(f"Report saved to {report_file}")
```
---
## 5. Workflow Integration - Bulk Operations
### Process Records Based on Conditions
```python
from PowerPlatform.Dataverse.client import DataverseClient
from azure.identity import DefaultAzureCredential
from enum import IntEnum
class AccountStatus(IntEnum):
PROSPECT = 1
ACTIVE = 2
CLOSED = 3
class BulkWorkflow:
"""Automate bulk operations."""
def __init__(self, org_url: str):
self.client = DataverseClient(
base_url=org_url,
credential=DefaultAzureCredential()
)
def mark_accounts_as_inactive_if_no_activity(self, days_no_activity: int = 90):
"""Deactivate accounts with no recent activity."""
from_date = f"2025-{datetime.now().month:02d}-01T00:00:00Z"
inactive_accounts = self.client.get(
"account",
select=["accountid", "name"],
filter=f"modifiedon lt {from_date} and statecode eq 0",
top=5000
)
accounts_to_deactivate = []
for page in inactive_accounts:
accounts_to_deactivate.extend([a["accountid"] for a in page])
# Bulk update
if accounts_to_deactivate:
self.client.update(
"account",
accounts_to_deactivate,
{"statecode": AccountStatus.CLOSED}
)
print(f"Deactivated {len(accounts_to_deactivate)} inactive accounts")
def update_opportunity_status_based_on_amount(self):
"""Update opportunity stage based on estimated value."""
opportunities = self.client.get(
"opportunity",
select=["opportunityid", "estimatedvalue"],
filter="statuscode ne 7", # Not closed
top=5000
)
updates = []
ids = []
for page in opportunities:
for opp in page:
value = opp.get("estimatedvalue", 0)
# Determine stage
if value < 10000:
stage = 1 # Qualification
elif value < 50000:
stage = 2 # Proposal
else:
stage = 3 # Proposal Review
updates.append({"stageid": stage})
ids.append(opp["opportunityid"])
# Bulk update
if ids:
self.client.update("opportunity", ids, updates)
print(f"Updated {len(ids)} opportunities")
# Usage
workflow = BulkWorkflow("https://myorg.crm.dynamics.com")
workflow.mark_accounts_as_inactive_if_no_activity(days_no_activity=90)
workflow.update_opportunity_status_based_on_amount()
```
---
## 6. Scheduled Job Template
### Azure Function for Scheduled Operations
```python
# scheduled_migration_job.py
import azure.functions as func
from datetime import datetime
from DataMigrationPipeline import DataMigrationPipeline
import logging
def main(timer: func.TimerRequest) -> None:
"""Run migration job on schedule (e.g., daily)."""
if timer.past_due:
logging.info('The timer is past due!')
try:
logging.info(f'Migration job started at {datetime.utcnow()}')
# Run migration
pipeline = DataMigrationPipeline("https://myorg.crm.dynamics.com")
# Extract, transform, load
source_data = pipeline.extract_from_legacy(...)
payloads = pipeline.transform_accounts(source_data)
pipeline.load_to_dataverse(payloads)
# Get results
results = pipeline.reconcile_migration(source_data)
logging.info(f'Migration completed: {results}')
except Exception as e:
logging.error(f'Migration failed: {e}')
raise
# function_app.py - Azure Functions setup
app = func.FunctionApp()
@app.schedule_trigger(schedule="0 0 * * *") # Daily at midnight
def migration_job(timer: func.TimerRequest) -> None:
main(timer)
```
---
## 7. Complete Starter Template
```python
#!/usr/bin/env python3
"""
Dataverse SDK for Python - Complete Starter Template
"""
from azure.identity import DefaultAzureCredential
from PowerPlatform.Dataverse.client import DataverseClient
from PowerPlatform.Dataverse.core.config import DataverseConfig
from PowerPlatform.Dataverse.core.errors import DataverseError
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class DataverseApp:
"""Base class for Dataverse applications."""
def __init__(self, org_url: str):
self.org_url = org_url
self.client = self._create_client()
def _create_client(self) -> DataverseClient:
"""Create authenticated client."""
cfg = DataverseConfig()
cfg.logging_enable = False
return DataverseClient(
base_url=self.org_url,
credential=DefaultAzureCredential(),
config=cfg
)
def create_account(self, name: str, phone: str = None) -> str:
"""Create account record."""
try:
payload = {"name": name}
if phone:
payload["telephone1"] = phone
id = self.client.create("account", payload)[0]
logger.info(f"Created account: {id}")
return id
except DataverseError as e:
logger.error(f"Failed to create account: {e.message}")
raise
def get_accounts(self, filter_expr: str = None, top: int = 100) -> list:
"""Get account records."""
try:
accounts = self.client.get(
"account",
filter=filter_expr,
select=["accountid", "name", "telephone1", "createdon"],
orderby=["createdon desc"],
top=top
)
all_accounts = []
for page in accounts:
all_accounts.extend(page)
logger.info(f"Retrieved {len(all_accounts)} accounts")
return all_accounts
except DataverseError as e:
logger.error(f"Failed to get accounts: {e.message}")
raise
def update_account(self, account_id: str, **kwargs) -> None:
"""Update account record."""
try:
self.client.update("account", account_id, kwargs)
logger.info(f"Updated account: {account_id}")
except DataverseError as e:
logger.error(f"Failed to update account: {e.message}")
raise
if __name__ == "__main__":
# Usage
app = DataverseApp("https://myorg.crm.dynamics.com")
# Create
account_id = app.create_account("Acme Inc", "555-0100")
# Get
accounts = app.get_accounts(filter_expr="statecode eq 0", top=50)
print(f"Found {len(accounts)} active accounts")
# Update
app.update_account(account_id, telephone1="555-0199")
```
---
## 8. See Also
- [Dataverse Data Migration](https://learn.microsoft.com/en-us/power-platform/architecture/key-concepts/data-migration/workflow-complex-data-migration)
- [Working with Data (SDK)](https://learn.microsoft.com/en-us/power-apps/developer/data-platform/sdk-python/work-data)
- [SDK Examples on GitHub](https://github.com/microsoft/PowerPlatform-DataverseClient-Python/tree/main/examples)

View File

@@ -0,0 +1,99 @@
---
applyTo: '**'
---
# Dataverse SDK for Python — Official Quickstart
This instruction summarizes Microsoft Learn guidance for the Dataverse SDK for Python (preview) and provides copyable snippets.
## Prerequisites
- Dataverse environment with read/write
- Python 3.10+
- Network access to PyPI
## Install
```bash
pip install PowerPlatform-Dataverse-Client
```
## Connect
```python
from azure.identity import InteractiveBrowserCredential
from PowerPlatform.Dataverse.client import DataverseClient
from PowerPlatform.Dataverse.core.config import DataverseConfig
cfg = DataverseConfig() # defaults to language_code=1033
client = DataverseClient(
base_url="https://<myorg>.crm.dynamics.com",
credential=InteractiveBrowserCredential(),
config=cfg,
)
```
- Optional HTTP settings: `cfg.http_retries`, `cfg.http_backoff`, `cfg.http_timeout`.
## CRUD Examples
```python
# Create returns list[str] of GUIDs
account_id = client.create("account", {"name": "Acme, Inc.", "telephone1": "555-0100"})[0]
# Retrieve single
account = client.get("account", account_id)
# Update (returns None)
client.update("account", account_id, {"telephone1": "555-0199"})
# Delete
client.delete("account", account_id)
```
## Bulk Operations
```python
# Broadcast patch to many IDs
ids = client.create("account", [{"name": "Contoso"}, {"name": "Fabrikam"}])
client.update("account", ids, {"telephone1": "555-0200"})
# 1:1 list of patches
client.update("account", ids, [{"telephone1": "555-1200"}, {"telephone1": "555-1300"}])
# Bulk create
payloads = [{"name": "Contoso"}, {"name": "Fabrikam"}, {"name": "Northwind"}]
ids = client.create("account", payloads)
```
## File Upload
```python
client.upload_file('account', record_id, 'sample_filecolumn', 'test.pdf')
client.upload_file('account', record_id, 'sample_filecolumn', 'test.pdf', mode='chunk', if_none_match=True)
```
## Paging Retrieve Multiple
```python
pages = client.get(
"account",
select=["accountid", "name", "createdon"],
orderby=["name asc"],
top=10,
page_size=3,
)
for page in pages:
print(len(page), page[:2])
```
## Table Metadata Quickstart
```python
info = client.create_table("SampleItem", {
"code": "string",
"count": "int",
"amount": "decimal",
"when": "datetime",
"active": "bool",
})
logical = info["entity_logical_name"]
rec_id = client.create(logical, {f"{logical}name": "Sample A"})[0]
client.delete(logical, rec_id)
client.delete_table("SampleItem")
```
## References
- Getting started: https://learn.microsoft.com/en-us/power-apps/developer/data-platform/sdk-python/get-started
- Working with data: https://learn.microsoft.com/en-us/power-apps/developer/data-platform/sdk-python/work-data
- SDK source/examples: https://github.com/microsoft/PowerPlatform-DataverseClient-Python

View File

@@ -0,0 +1,486 @@
---
applyTo: '**'
---
# Dataverse SDK for Python — Testing & Debugging Strategies
Based on official Azure Functions and pytest testing patterns.
## 1. Testing Overview
### Testing Pyramid for Dataverse SDK
```
Integration Tests <- Test with real Dataverse
/\
/ \
/Unit Tests (Mocked)\
/____________________\
< Framework Tests
```
---
## 2. Unit Testing with Mocking
### Setup Test Environment
```bash
# Install test dependencies
pip install pytest pytest-cov unittest-mock
```
### Mock DataverseClient
```python
# tests/test_operations.py
import pytest
from unittest.mock import Mock, patch, MagicMock
from PowerPlatform.Dataverse.client import DataverseClient
@pytest.fixture
def mock_client():
"""Provide mocked DataverseClient."""
client = Mock(spec=DataverseClient)
return client
def test_create_account(mock_client):
"""Test account creation with mocked client."""
# Setup mock response
mock_client.create.return_value = ["id-123"]
# Call function
from my_app import create_account
result = create_account(mock_client, {"name": "Acme"})
# Verify
assert result == "id-123"
mock_client.create.assert_called_once_with("account", {"name": "Acme"})
def test_create_account_error(mock_client):
"""Test error handling in account creation."""
from PowerPlatform.Dataverse.core.errors import DataverseError
# Setup mock to raise error
mock_client.create.side_effect = DataverseError(
message="Account exists",
code="validation_error",
status_code=400
)
# Verify error is raised
from my_app import create_account
with pytest.raises(DataverseError):
create_account(mock_client, {"name": "Acme"})
```
### Test Data Structures
```python
# tests/fixtures.py
import pytest
@pytest.fixture
def sample_account():
"""Sample account record for testing."""
return {
"accountid": "id-123",
"name": "Acme Inc",
"telephone1": "555-0100",
"statecode": 0,
"createdon": "2025-01-01T00:00:00Z"
}
@pytest.fixture
def sample_accounts(sample_account):
"""Multiple sample accounts."""
return [
sample_account,
{**sample_account, "accountid": "id-124", "name": "Fabrikam"},
{**sample_account, "accountid": "id-125", "name": "Contoso"},
]
# Usage in tests
def test_process_accounts(mock_client, sample_accounts):
mock_client.get.return_value = iter([sample_accounts])
# Test processing
```
---
## 3. Mocking Common Patterns
### Mock Get with Pagination
```python
def test_pagination(mock_client, sample_accounts):
"""Test handling paginated results."""
# Mock returns generator with pages
mock_client.get.return_value = iter([
sample_accounts[:2], # Page 1
sample_accounts[2:] # Page 2
])
from my_app import process_all_accounts
result = process_all_accounts(mock_client)
assert len(result) == 3 # All pages processed
```
### Mock Bulk Operations
```python
def test_bulk_create(mock_client):
"""Test bulk account creation."""
payloads = [
{"name": "Account 1"},
{"name": "Account 2"},
]
# Mock returns list of IDs
mock_client.create.return_value = ["id-1", "id-2"]
from my_app import create_accounts
ids = create_accounts(mock_client, payloads)
assert len(ids) == 2
mock_client.create.assert_called_once_with("account", payloads)
```
### Mock Errors
```python
def test_rate_limiting_retry(mock_client):
"""Test retry logic on rate limiting."""
from PowerPlatform.Dataverse.core.errors import DataverseError
# Mock fails then succeeds
error = DataverseError(
message="Too many requests",
code="http_error",
status_code=429,
is_transient=True
)
mock_client.create.side_effect = [error, ["id-123"]]
from my_app import create_with_retry
result = create_with_retry(mock_client, "account", {})
assert result == "id-123"
assert mock_client.create.call_count == 2 # Retried
```
---
## 4. Integration Testing
### Local Development Testing
```python
# tests/test_integration.py
import pytest
from azure.identity import InteractiveBrowserCredential
from PowerPlatform.Dataverse.client import DataverseClient
@pytest.fixture
def dataverse_client():
"""Real client for integration testing."""
client = DataverseClient(
base_url="https://myorg-dev.crm.dynamics.com",
credential=InteractiveBrowserCredential()
)
return client
@pytest.mark.integration
def test_create_and_retrieve_account(dataverse_client):
"""Test creating and retrieving account (against real Dataverse)."""
# Create
account_id = dataverse_client.create("account", {
"name": "Test Account"
})[0]
# Retrieve
account = dataverse_client.get("account", account_id)
# Verify
assert account["name"] == "Test Account"
# Cleanup
dataverse_client.delete("account", account_id)
```
### Test Isolation
```python
# tests/conftest.py
import pytest
@pytest.fixture(scope="function")
def test_account(dataverse_client):
"""Create test account, cleanup after test."""
account_id = dataverse_client.create("account", {
"name": "Test Account"
})[0]
yield account_id
# Cleanup
try:
dataverse_client.delete("account", account_id)
except:
pass # Already deleted
# Usage
def test_update_account(dataverse_client, test_account):
"""Test updating account."""
dataverse_client.update("account", test_account, {"telephone1": "555-0100"})
account = dataverse_client.get("account", test_account)
assert account["telephone1"] == "555-0100"
```
---
## 5. Pytest Configuration
### pytest.ini
```ini
[pytest]
# Skip integration tests by default
testpaths = tests
python_files = test_*.py
python_classes = Test*
python_functions = test_*
markers =
integration: marks tests as integration (run with -m integration)
slow: marks tests as slow
unit: marks tests as unit tests
```
### Run Tests
```bash
# Unit tests only
pytest
# Unit + integration
pytest -m "unit or integration"
# Integration only
pytest -m integration
# With coverage
pytest --cov=my_app tests/
# Specific test
pytest tests/test_operations.py::test_create_account
```
---
## 6. Coverage Analysis
### Generate Coverage Report
```bash
# Run tests with coverage
pytest --cov=my_app --cov-report=html tests/
# View coverage
open htmlcov/index.html # macOS
start htmlcov/index.html # Windows
```
### Coverage Configuration (.coveragerc)
```ini
[run]
branch = True
source = my_app
[report]
exclude_lines =
pragma: no cover
def __repr__
raise AssertionError
raise NotImplementedError
if __name__ == .__main__.:
[html]
directory = htmlcov
```
---
## 7. Debugging with print/logging
### Enable Debug Logging
```python
import logging
import sys
# Configure logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler('debug.log')
]
)
# Enable SDK logging
logging.getLogger('PowerPlatform').setLevel(logging.DEBUG)
logging.getLogger('azure').setLevel(logging.DEBUG)
# In test
def test_with_logging(mock_client):
logger = logging.getLogger(__name__)
logger.debug("Starting test")
result = my_function(mock_client)
logger.debug(f"Result: {result}")
```
### Pytest Capturing Output
```bash
# Show print/logging output in tests
pytest -s tests/
# Capture and show on failure only
pytest --tb=short tests/
```
---
## 8. Performance Testing
### Measure Operation Duration
```python
import pytest
import time
def test_bulk_create_performance(dataverse_client):
"""Test bulk create performance."""
payloads = [{"name": f"Account {i}"} for i in range(1000)]
start = time.time()
ids = dataverse_client.create("account", payloads)
duration = time.time() - start
assert len(ids) == 1000
assert duration < 10 # Should complete in under 10 seconds
print(f"Created 1000 records in {duration:.2f}s ({1000/duration:.0f} records/s)")
```
### Pytest Benchmark Plugin
```bash
pip install pytest-benchmark
```
```python
def test_query_performance(benchmark, dataverse_client):
"""Benchmark query performance."""
def get_accounts():
return list(dataverse_client.get("account", top=100))
result = benchmark(get_accounts)
assert len(result) <= 100
```
---
## 9. Common Testing Patterns
### Testing Retry Logic
```python
def test_retry_on_transient_error(mock_client):
"""Test retry on transient error."""
from PowerPlatform.Dataverse.core.errors import DataverseError
error = DataverseError(
message="Timeout",
code="http_error",
status_code=408,
is_transient=True
)
# Fail then succeed
mock_client.create.side_effect = [error, ["id-123"]]
from my_app import create_with_retry
result = create_with_retry(mock_client, "account", {})
assert result == "id-123"
```
### Testing Filter Building
```python
def test_filter_builder():
"""Test OData filter generation."""
from my_app import build_account_filter
# Test cases
assert build_account_filter(status="active") == "statecode eq 0"
assert build_account_filter(name="Acme") == "contains(name, 'Acme')"
assert build_account_filter(status="active", name="Acme") \
== "statecode eq 0 and contains(name, 'Acme')"
```
### Testing Error Handling
```python
def test_handles_missing_record(mock_client):
"""Test handling 404 errors."""
from PowerPlatform.Dataverse.core.errors import DataverseError
mock_client.get.side_effect = DataverseError(
message="Not found",
code="http_error",
status_code=404
)
from my_app import get_account_safe
result = get_account_safe(mock_client, "invalid-id")
assert result is None # Returns None instead of raising
```
---
## 10. Debugging Checklist
| Issue | Debug Steps |
|-------|-------------|
| Test fails unexpectedly | Add `-s` flag to see print output |
| Mock not called | Check method name/parameters match exactly |
| Real API failing | Check credentials, URL, permissions |
| Rate limiting in tests | Add delays or use smaller batches |
| Data not found | Verify record created and not cleaned up |
| Assertion errors | Print actual vs expected values |
---
## 11. See Also
- [Pytest Documentation](https://docs.pytest.org/)
- [unittest.mock Reference](https://docs.python.org/3/library/unittest.mock.html)
- [Azure Functions Testing](https://learn.microsoft.com/en-us/azure/azure-functions/functions-reference-python#unit-testing)
- [Dataverse SDK Examples](https://github.com/microsoft/PowerPlatform-DataverseClient-Python/tree/main/examples)

View File

@@ -0,0 +1,32 @@
---
applyTo: '**'
---
# Dataverse SDK for Python — Getting Started
- Install the Dataverse Python SDK and prerequisites.
- Configure environment variables for Dataverse tenant, client ID, secret, and resource URL.
- Use the SDK to authenticate via OAuth and perform CRUD operations.
## Setup
- Python 3.10+
- Recommended: virtual environment
## Install
```bash
pip install dataverse-sdk
```
## Auth Basics
- Use OAuth with Azure AD app registration.
- Store secrets in `.env` and load via `python-dotenv`.
## Common Tasks
- Query tables
- Create/update rows
- Batch operations
- Handle pagination and throttling
## Tips
- Reuse clients; avoid frequent re-auth.
- Add retries for transient failures.
- Log requests for troubleshooting.