mirror of
https://github.com/github/awesome-copilot.git
synced 2026-02-20 18:35:14 +00:00
Add a new skill that analyzes Terraform plan JSON output for AzureRM Provider to distinguish between false-positive diffs (order-only changes in Set-type attributes) and actual resource changes. This skill helps users identify 'noise' in terraform plan output caused by Azure API returning Set elements in different order, making plan reviews easier and reducing confusion in CI/CD pipelines. Bundled assets: - references/azurerm_set_attributes.json - references/azurerm_set_attributes.md - scripts/analyze_plan.py
941 lines
30 KiB
Python
Executable File
941 lines
30 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
Terraform Plan Analyzer for AzureRM Set-type Attributes
|
|
|
|
Analyzes terraform plan JSON output to distinguish between:
|
|
- Order-only changes (false positives) in Set-type attributes
|
|
- Actual additions/deletions/modifications
|
|
|
|
Usage:
|
|
terraform show -json plan.tfplan | python analyze_plan.py
|
|
python analyze_plan.py plan.json
|
|
python analyze_plan.py plan.json --format json --exit-code
|
|
|
|
For CI/CD pipeline usage, see README.md in this directory.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import sys
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional, Set
|
|
|
|
# Exit codes for --exit-code option
|
|
EXIT_NO_CHANGES = 0
|
|
EXIT_ORDER_ONLY = 0 # Order-only changes are not real changes
|
|
EXIT_SET_CHANGES = 1 # Actual Set attribute changes
|
|
EXIT_RESOURCE_REPLACE = 2 # Resource replacement (most severe)
|
|
EXIT_ERROR = 3
|
|
|
|
# Default path to the external attributes JSON file (relative to this script)
|
|
DEFAULT_ATTRIBUTES_PATH = (
|
|
Path(__file__).parent.parent / "references" / "azurerm_set_attributes.json"
|
|
)
|
|
|
|
|
|
# Global configuration
|
|
class Config:
|
|
"""Global configuration for the analyzer."""
|
|
|
|
ignore_case: bool = False
|
|
quiet: bool = False
|
|
verbose: bool = False
|
|
warnings: List[str] = []
|
|
|
|
|
|
CONFIG = Config()
|
|
|
|
|
|
def warn(message: str) -> None:
|
|
"""Add a warning message."""
|
|
CONFIG.warnings.append(message)
|
|
if CONFIG.verbose:
|
|
print(f"Warning: {message}", file=sys.stderr)
|
|
|
|
|
|
def load_set_attributes(path: Optional[Path] = None) -> Dict[str, Dict[str, Any]]:
|
|
"""Load Set-type attributes from external JSON file."""
|
|
attributes_path = path or DEFAULT_ATTRIBUTES_PATH
|
|
|
|
try:
|
|
with open(attributes_path, "r", encoding="utf-8") as f:
|
|
data = json.load(f)
|
|
return data.get("resources", {})
|
|
except FileNotFoundError:
|
|
warn(f"Attributes file not found: {attributes_path}")
|
|
return {}
|
|
except json.JSONDecodeError as e:
|
|
print(f"Error: Invalid JSON in attributes file: {e}", file=sys.stderr)
|
|
sys.exit(EXIT_ERROR)
|
|
|
|
|
|
# Global variable to hold loaded attributes (initialized in main)
|
|
AZURERM_SET_ATTRIBUTES: Dict[str, Any] = {}
|
|
|
|
|
|
def get_attr_config(attr_def: Any) -> tuple:
|
|
"""
|
|
Parse attribute definition and return (key_attr, nested_attrs).
|
|
|
|
Attribute definition can be:
|
|
- str: simple key attribute (e.g., "name")
|
|
- None/null: no key attribute
|
|
- dict: nested structure with "_key" and nested attributes
|
|
"""
|
|
if attr_def is None:
|
|
return (None, {})
|
|
if isinstance(attr_def, str):
|
|
return (attr_def, {})
|
|
if isinstance(attr_def, dict):
|
|
key_attr = attr_def.get("_key")
|
|
nested_attrs = {k: v for k, v in attr_def.items() if k != "_key"}
|
|
return (key_attr, nested_attrs)
|
|
return (None, {})
|
|
|
|
|
|
@dataclass
|
|
class SetAttributeChange:
|
|
"""Represents a change in a Set-type attribute."""
|
|
|
|
attribute_name: str
|
|
path: str = (
|
|
"" # Full path for nested attributes (e.g., "rewrite_rule_set.rewrite_rule")
|
|
)
|
|
order_only_count: int = 0
|
|
added: List[str] = field(default_factory=list)
|
|
removed: List[str] = field(default_factory=list)
|
|
modified: List[tuple] = field(default_factory=list)
|
|
nested_changes: List["SetAttributeChange"] = field(default_factory=list)
|
|
# For primitive sets (string/number arrays)
|
|
is_primitive: bool = False
|
|
primitive_added: List[Any] = field(default_factory=list)
|
|
primitive_removed: List[Any] = field(default_factory=list)
|
|
|
|
|
|
@dataclass
|
|
class ResourceChange:
|
|
"""Represents changes to a single resource."""
|
|
|
|
address: str
|
|
resource_type: str
|
|
actions: List[str] = field(default_factory=list)
|
|
set_changes: List[SetAttributeChange] = field(default_factory=list)
|
|
other_changes: List[str] = field(default_factory=list)
|
|
is_replace: bool = False
|
|
is_create: bool = False
|
|
is_delete: bool = False
|
|
|
|
|
|
@dataclass
|
|
class AnalysisResult:
|
|
"""Overall analysis result."""
|
|
|
|
resources: List[ResourceChange] = field(default_factory=list)
|
|
order_only_count: int = 0
|
|
actual_set_changes_count: int = 0
|
|
replace_count: int = 0
|
|
create_count: int = 0
|
|
delete_count: int = 0
|
|
other_changes_count: int = 0
|
|
warnings: List[str] = field(default_factory=list)
|
|
|
|
|
|
def get_element_key(element: Dict[str, Any], key_attr: Optional[str]) -> str:
|
|
"""Extract the key value from a Set element."""
|
|
if key_attr and key_attr in element:
|
|
val = element[key_attr]
|
|
if CONFIG.ignore_case and isinstance(val, str):
|
|
return val.lower()
|
|
return str(val)
|
|
# Fall back to hash of sorted items for elements without a key attribute
|
|
return str(hash(json.dumps(element, sort_keys=True)))
|
|
|
|
|
|
def normalize_value(val: Any) -> Any:
|
|
"""Normalize values for comparison (treat empty string and None as equivalent)."""
|
|
if val == "" or val is None:
|
|
return None
|
|
if isinstance(val, list) and len(val) == 0:
|
|
return None
|
|
# Normalize numeric types (int vs float)
|
|
if isinstance(val, float) and val.is_integer():
|
|
return int(val)
|
|
return val
|
|
|
|
|
|
def normalize_for_comparison(val: Any) -> Any:
|
|
"""Normalize value for comparison, including case-insensitive option."""
|
|
val = normalize_value(val)
|
|
if CONFIG.ignore_case and isinstance(val, str):
|
|
return val.lower()
|
|
return val
|
|
|
|
|
|
def values_equivalent(before_val: Any, after_val: Any) -> bool:
|
|
"""Check if two values are effectively equivalent."""
|
|
return normalize_for_comparison(before_val) == normalize_for_comparison(after_val)
|
|
|
|
|
|
def compare_elements(
|
|
before: Dict[str, Any], after: Dict[str, Any], nested_attrs: Dict[str, Any] = None
|
|
) -> tuple:
|
|
"""
|
|
Compare two elements and return (simple_diffs, nested_set_attrs).
|
|
|
|
simple_diffs: differences in non-Set attributes
|
|
nested_set_attrs: list of (attr_name, before_val, after_val, attr_def) for nested Sets
|
|
"""
|
|
nested_attrs = nested_attrs or {}
|
|
simple_diffs = {}
|
|
nested_set_attrs = []
|
|
|
|
all_keys = set(before.keys()) | set(after.keys())
|
|
|
|
for key in all_keys:
|
|
before_val = before.get(key)
|
|
after_val = after.get(key)
|
|
|
|
# Check if this is a nested Set attribute
|
|
if key in nested_attrs:
|
|
if before_val != after_val:
|
|
nested_set_attrs.append((key, before_val, after_val, nested_attrs[key]))
|
|
elif not values_equivalent(before_val, after_val):
|
|
simple_diffs[key] = {"before": before_val, "after": after_val}
|
|
|
|
return (simple_diffs, nested_set_attrs)
|
|
|
|
|
|
def analyze_primitive_set(
|
|
before_list: Optional[List[Any]],
|
|
after_list: Optional[List[Any]],
|
|
attr_name: str,
|
|
path: str = "",
|
|
) -> SetAttributeChange:
|
|
"""Analyze changes in a primitive Set (string/number array)."""
|
|
full_path = f"{path}.{attr_name}" if path else attr_name
|
|
change = SetAttributeChange(
|
|
attribute_name=attr_name, path=full_path, is_primitive=True
|
|
)
|
|
|
|
before_set = set(before_list) if before_list else set()
|
|
after_set = set(after_list) if after_list else set()
|
|
|
|
# Apply case-insensitive comparison if configured
|
|
if CONFIG.ignore_case:
|
|
before_normalized = {v.lower() if isinstance(v, str) else v for v in before_set}
|
|
after_normalized = {v.lower() if isinstance(v, str) else v for v in after_set}
|
|
else:
|
|
before_normalized = before_set
|
|
after_normalized = after_set
|
|
|
|
removed = before_normalized - after_normalized
|
|
added = after_normalized - before_normalized
|
|
|
|
if removed:
|
|
change.primitive_removed = list(removed)
|
|
if added:
|
|
change.primitive_added = list(added)
|
|
|
|
# Elements that exist in both (order change only)
|
|
common = before_normalized & after_normalized
|
|
if common and not removed and not added:
|
|
change.order_only_count = len(common)
|
|
|
|
return change
|
|
|
|
|
|
def analyze_set_attribute(
|
|
before_list: Optional[List[Dict[str, Any]]],
|
|
after_list: Optional[List[Dict[str, Any]]],
|
|
key_attr: Optional[str],
|
|
attr_name: str,
|
|
nested_attrs: Dict[str, Any] = None,
|
|
path: str = "",
|
|
after_unknown: Optional[Dict[str, Any]] = None,
|
|
) -> SetAttributeChange:
|
|
"""Analyze changes in a Set-type attribute, including nested Sets."""
|
|
full_path = f"{path}.{attr_name}" if path else attr_name
|
|
change = SetAttributeChange(attribute_name=attr_name, path=full_path)
|
|
nested_attrs = nested_attrs or {}
|
|
|
|
before_list = before_list or []
|
|
after_list = after_list or []
|
|
|
|
# Handle non-list values (single element)
|
|
if not isinstance(before_list, list):
|
|
before_list = [before_list] if before_list else []
|
|
if not isinstance(after_list, list):
|
|
after_list = [after_list] if after_list else []
|
|
|
|
# Check if this is a primitive set (non-dict elements)
|
|
has_primitive_before = any(
|
|
not isinstance(e, dict) for e in before_list if e is not None
|
|
)
|
|
has_primitive_after = any(
|
|
not isinstance(e, dict) for e in after_list if e is not None
|
|
)
|
|
|
|
if has_primitive_before or has_primitive_after:
|
|
# Handle primitive sets
|
|
return analyze_primitive_set(before_list, after_list, attr_name, path)
|
|
|
|
# Build maps keyed by the key attribute
|
|
before_map: Dict[str, Dict[str, Any]] = {}
|
|
after_map: Dict[str, Dict[str, Any]] = {}
|
|
|
|
# Detect duplicate keys
|
|
for e in before_list:
|
|
if isinstance(e, dict):
|
|
key = get_element_key(e, key_attr)
|
|
if key in before_map:
|
|
warn(f"Duplicate key '{key}' in before state for {full_path}")
|
|
before_map[key] = e
|
|
|
|
for e in after_list:
|
|
if isinstance(e, dict):
|
|
key = get_element_key(e, key_attr)
|
|
if key in after_map:
|
|
warn(f"Duplicate key '{key}' in after state for {full_path}")
|
|
after_map[key] = e
|
|
|
|
before_keys = set(before_map.keys())
|
|
after_keys = set(after_map.keys())
|
|
|
|
# Find removed elements
|
|
for key in before_keys - after_keys:
|
|
display_key = key if key_attr else "(element)"
|
|
change.removed.append(display_key)
|
|
|
|
# Find added elements
|
|
for key in after_keys - before_keys:
|
|
display_key = key if key_attr else "(element)"
|
|
change.added.append(display_key)
|
|
|
|
# Compare common elements
|
|
for key in before_keys & after_keys:
|
|
before_elem = before_map[key]
|
|
after_elem = after_map[key]
|
|
|
|
if before_elem == after_elem:
|
|
# Exact match - this is just an order change
|
|
change.order_only_count += 1
|
|
else:
|
|
# Content changed - check for meaningful differences
|
|
simple_diffs, nested_set_list = compare_elements(
|
|
before_elem, after_elem, nested_attrs
|
|
)
|
|
|
|
# Process nested Set attributes recursively
|
|
for nested_name, nested_before, nested_after, nested_def in nested_set_list:
|
|
nested_key, sub_nested = get_attr_config(nested_def)
|
|
nested_change = analyze_set_attribute(
|
|
nested_before,
|
|
nested_after,
|
|
nested_key,
|
|
nested_name,
|
|
sub_nested,
|
|
full_path,
|
|
)
|
|
if (
|
|
nested_change.order_only_count > 0
|
|
or nested_change.added
|
|
or nested_change.removed
|
|
or nested_change.modified
|
|
or nested_change.nested_changes
|
|
or nested_change.primitive_added
|
|
or nested_change.primitive_removed
|
|
):
|
|
change.nested_changes.append(nested_change)
|
|
|
|
if simple_diffs:
|
|
# Has actual differences in non-nested attributes
|
|
display_key = key if key_attr else "(element)"
|
|
change.modified.append((display_key, simple_diffs))
|
|
elif not nested_set_list:
|
|
# Only null/empty differences - treat as order change
|
|
change.order_only_count += 1
|
|
|
|
return change
|
|
|
|
|
|
def analyze_resource_change(
|
|
resource_change: Dict[str, Any],
|
|
include_filter: Optional[List[str]] = None,
|
|
exclude_filter: Optional[List[str]] = None,
|
|
) -> Optional[ResourceChange]:
|
|
"""Analyze a single resource change from terraform plan."""
|
|
resource_type = resource_change.get("type", "")
|
|
address = resource_change.get("address", "")
|
|
change = resource_change.get("change", {})
|
|
actions = change.get("actions", [])
|
|
|
|
# Skip if no change or not an AzureRM resource
|
|
if actions == ["no-op"] or not resource_type.startswith("azurerm_"):
|
|
return None
|
|
|
|
# Apply filters
|
|
if include_filter:
|
|
if not any(f in resource_type for f in include_filter):
|
|
return None
|
|
if exclude_filter:
|
|
if any(f in resource_type for f in exclude_filter):
|
|
return None
|
|
|
|
before = change.get("before") or {}
|
|
after = change.get("after") or {}
|
|
after_unknown = change.get("after_unknown") or {}
|
|
before_sensitive = change.get("before_sensitive") or {}
|
|
after_sensitive = change.get("after_sensitive") or {}
|
|
|
|
# Determine action type
|
|
is_create = actions == ["create"]
|
|
is_delete = actions == ["delete"]
|
|
is_replace = "delete" in actions and "create" in actions
|
|
|
|
result = ResourceChange(
|
|
address=address,
|
|
resource_type=resource_type,
|
|
actions=actions,
|
|
is_replace=is_replace,
|
|
is_create=is_create,
|
|
is_delete=is_delete,
|
|
)
|
|
|
|
# Skip detailed Set analysis for create/delete (all elements are new/removed)
|
|
if is_create or is_delete:
|
|
return result
|
|
|
|
# Get Set attributes for this resource type
|
|
set_attrs = AZURERM_SET_ATTRIBUTES.get(resource_type, {})
|
|
|
|
# Analyze Set-type attributes
|
|
analyzed_attrs: Set[str] = set()
|
|
for attr_name, attr_def in set_attrs.items():
|
|
before_val = before.get(attr_name)
|
|
after_val = after.get(attr_name)
|
|
|
|
# Warn about sensitive attributes
|
|
if attr_name in before_sensitive or attr_name in after_sensitive:
|
|
if before_sensitive.get(attr_name) or after_sensitive.get(attr_name):
|
|
warn(
|
|
f"Attribute '{attr_name}' in {address} contains sensitive values (comparison may be incomplete)"
|
|
)
|
|
|
|
# Skip if attribute is not present or unchanged
|
|
if before_val is None and after_val is None:
|
|
continue
|
|
if before_val == after_val:
|
|
continue
|
|
|
|
# Only analyze if it's a list (Set in Terraform) or has changed
|
|
if not isinstance(before_val, list) and not isinstance(after_val, list):
|
|
continue
|
|
|
|
# Parse attribute definition for key and nested attrs
|
|
key_attr, nested_attrs = get_attr_config(attr_def)
|
|
|
|
# Get after_unknown for this attribute
|
|
attr_after_unknown = after_unknown.get(attr_name)
|
|
|
|
set_change = analyze_set_attribute(
|
|
before_val,
|
|
after_val,
|
|
key_attr,
|
|
attr_name,
|
|
nested_attrs,
|
|
after_unknown=attr_after_unknown,
|
|
)
|
|
|
|
# Only include if there are actual findings
|
|
if (
|
|
set_change.order_only_count > 0
|
|
or set_change.added
|
|
or set_change.removed
|
|
or set_change.modified
|
|
or set_change.nested_changes
|
|
or set_change.primitive_added
|
|
or set_change.primitive_removed
|
|
):
|
|
result.set_changes.append(set_change)
|
|
analyzed_attrs.add(attr_name)
|
|
|
|
# Find other (non-Set) changes
|
|
all_keys = set(before.keys()) | set(after.keys())
|
|
for key in all_keys:
|
|
if key in analyzed_attrs:
|
|
continue
|
|
if key.startswith("_"): # Skip internal attributes
|
|
continue
|
|
before_val = before.get(key)
|
|
after_val = after.get(key)
|
|
if before_val != after_val:
|
|
result.other_changes.append(key)
|
|
|
|
return result
|
|
|
|
|
|
def collect_all_changes(set_change: SetAttributeChange, prefix: str = "") -> tuple:
|
|
"""
|
|
Recursively collect order-only and actual changes from nested structure.
|
|
Returns (order_only_list, actual_change_list)
|
|
"""
|
|
order_only = []
|
|
actual = []
|
|
|
|
display_name = (
|
|
f"{prefix}{set_change.attribute_name}" if prefix else set_change.attribute_name
|
|
)
|
|
|
|
has_actual_change = (
|
|
set_change.added
|
|
or set_change.removed
|
|
or set_change.modified
|
|
or set_change.primitive_added
|
|
or set_change.primitive_removed
|
|
)
|
|
|
|
if set_change.order_only_count > 0 and not has_actual_change:
|
|
order_only.append((display_name, set_change))
|
|
elif has_actual_change:
|
|
actual.append((display_name, set_change))
|
|
|
|
# Process nested changes
|
|
for nested in set_change.nested_changes:
|
|
nested_order, nested_actual = collect_all_changes(nested, f"{display_name}.")
|
|
order_only.extend(nested_order)
|
|
actual.extend(nested_actual)
|
|
|
|
return (order_only, actual)
|
|
|
|
|
|
def format_set_change(change: SetAttributeChange, indent: int = 0) -> List[str]:
|
|
"""Format a single SetAttributeChange for output."""
|
|
lines = []
|
|
prefix = " " * indent
|
|
|
|
# Handle primitive sets
|
|
if change.is_primitive:
|
|
if change.primitive_added:
|
|
lines.append(f"{prefix}**Added:**")
|
|
for item in change.primitive_added:
|
|
lines.append(f"{prefix} - {item}")
|
|
if change.primitive_removed:
|
|
lines.append(f"{prefix}**Removed:**")
|
|
for item in change.primitive_removed:
|
|
lines.append(f"{prefix} - {item}")
|
|
if change.order_only_count > 0:
|
|
lines.append(f"{prefix}**Order-only:** {change.order_only_count} elements")
|
|
return lines
|
|
|
|
if change.added:
|
|
lines.append(f"{prefix}**Added:**")
|
|
for item in change.added:
|
|
lines.append(f"{prefix} - {item}")
|
|
|
|
if change.removed:
|
|
lines.append(f"{prefix}**Removed:**")
|
|
for item in change.removed:
|
|
lines.append(f"{prefix} - {item}")
|
|
|
|
if change.modified:
|
|
lines.append(f"{prefix}**Modified:**")
|
|
for item_key, diffs in change.modified:
|
|
lines.append(f"{prefix} - {item_key}:")
|
|
for diff_key, diff_val in diffs.items():
|
|
before_str = json.dumps(diff_val["before"], ensure_ascii=False)
|
|
after_str = json.dumps(diff_val["after"], ensure_ascii=False)
|
|
lines.append(f"{prefix} - {diff_key}: {before_str} → {after_str}")
|
|
|
|
if change.order_only_count > 0:
|
|
lines.append(f"{prefix}**Order-only:** {change.order_only_count} elements")
|
|
|
|
# Format nested changes
|
|
for nested in change.nested_changes:
|
|
if (
|
|
nested.added
|
|
or nested.removed
|
|
or nested.modified
|
|
or nested.nested_changes
|
|
or nested.primitive_added
|
|
or nested.primitive_removed
|
|
):
|
|
lines.append(f"{prefix}**Nested attribute `{nested.attribute_name}`:**")
|
|
lines.extend(format_set_change(nested, indent + 1))
|
|
|
|
return lines
|
|
|
|
|
|
def format_markdown_output(result: AnalysisResult) -> str:
|
|
"""Format analysis results as Markdown."""
|
|
lines = ["# Terraform Plan Analysis Results", ""]
|
|
lines.append(
|
|
'Analyzes AzureRM Set-type attribute changes and identifies order-only "false-positive diffs".'
|
|
)
|
|
lines.append("")
|
|
|
|
# Categorize changes (including nested)
|
|
order_only_changes: List[tuple] = []
|
|
actual_set_changes: List[tuple] = []
|
|
replace_resources: List[ResourceChange] = []
|
|
create_resources: List[ResourceChange] = []
|
|
delete_resources: List[ResourceChange] = []
|
|
other_changes: List[tuple] = []
|
|
|
|
for res in result.resources:
|
|
if res.is_replace:
|
|
replace_resources.append(res)
|
|
elif res.is_create:
|
|
create_resources.append(res)
|
|
elif res.is_delete:
|
|
delete_resources.append(res)
|
|
|
|
for set_change in res.set_changes:
|
|
order_only, actual = collect_all_changes(set_change)
|
|
for name, change in order_only:
|
|
order_only_changes.append((res.address, name, change))
|
|
for name, change in actual:
|
|
actual_set_changes.append((res.address, name, change))
|
|
|
|
if res.other_changes:
|
|
other_changes.append((res.address, res.other_changes))
|
|
|
|
# Section: Order-only changes (false positives)
|
|
lines.append("## 🟢 Order-only Changes (No Impact)")
|
|
lines.append("")
|
|
if order_only_changes:
|
|
lines.append(
|
|
"The following changes are internal reordering of Set-type attributes only, with no actual resource changes."
|
|
)
|
|
lines.append("")
|
|
for address, name, change in order_only_changes:
|
|
lines.append(
|
|
f"- `{address}`: **{name}** ({change.order_only_count} elements)"
|
|
)
|
|
else:
|
|
lines.append("None")
|
|
lines.append("")
|
|
|
|
# Section: Actual Set changes
|
|
lines.append("## 🟡 Actual Set Attribute Changes")
|
|
lines.append("")
|
|
if actual_set_changes:
|
|
for address, name, change in actual_set_changes:
|
|
lines.append(f"### `{address}` - {name}")
|
|
lines.append("")
|
|
lines.extend(format_set_change(change))
|
|
lines.append("")
|
|
else:
|
|
lines.append("None")
|
|
lines.append("")
|
|
|
|
# Section: Resource replacements
|
|
lines.append("## 🔴 Resource Replacement (Caution)")
|
|
lines.append("")
|
|
if replace_resources:
|
|
lines.append(
|
|
"The following resources will be deleted and recreated. This may cause downtime."
|
|
)
|
|
lines.append("")
|
|
for res in replace_resources:
|
|
lines.append(f"- `{res.address}`")
|
|
else:
|
|
lines.append("None")
|
|
lines.append("")
|
|
|
|
# Section: Warnings
|
|
if result.warnings:
|
|
lines.append("## ⚠️ Warnings")
|
|
lines.append("")
|
|
for warning in result.warnings:
|
|
lines.append(f"- {warning}")
|
|
lines.append("")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def format_json_output(result: AnalysisResult) -> str:
|
|
"""Format analysis results as JSON."""
|
|
|
|
def set_change_to_dict(change: SetAttributeChange) -> dict:
|
|
d = {
|
|
"attribute_name": change.attribute_name,
|
|
"path": change.path,
|
|
"order_only_count": change.order_only_count,
|
|
"is_primitive": change.is_primitive,
|
|
}
|
|
if change.added:
|
|
d["added"] = change.added
|
|
if change.removed:
|
|
d["removed"] = change.removed
|
|
if change.modified:
|
|
d["modified"] = [{"key": k, "diffs": v} for k, v in change.modified]
|
|
if change.primitive_added:
|
|
d["primitive_added"] = change.primitive_added
|
|
if change.primitive_removed:
|
|
d["primitive_removed"] = change.primitive_removed
|
|
if change.nested_changes:
|
|
d["nested_changes"] = [set_change_to_dict(n) for n in change.nested_changes]
|
|
return d
|
|
|
|
def resource_to_dict(res: ResourceChange) -> dict:
|
|
return {
|
|
"address": res.address,
|
|
"resource_type": res.resource_type,
|
|
"actions": res.actions,
|
|
"is_replace": res.is_replace,
|
|
"is_create": res.is_create,
|
|
"is_delete": res.is_delete,
|
|
"set_changes": [set_change_to_dict(c) for c in res.set_changes],
|
|
"other_changes": res.other_changes,
|
|
}
|
|
|
|
output = {
|
|
"summary": {
|
|
"order_only_count": result.order_only_count,
|
|
"actual_set_changes_count": result.actual_set_changes_count,
|
|
"replace_count": result.replace_count,
|
|
"create_count": result.create_count,
|
|
"delete_count": result.delete_count,
|
|
"other_changes_count": result.other_changes_count,
|
|
},
|
|
"has_real_changes": (
|
|
result.actual_set_changes_count > 0
|
|
or result.replace_count > 0
|
|
or result.create_count > 0
|
|
or result.delete_count > 0
|
|
or result.other_changes_count > 0
|
|
),
|
|
"resources": [resource_to_dict(r) for r in result.resources],
|
|
"warnings": result.warnings,
|
|
}
|
|
return json.dumps(output, indent=2, ensure_ascii=False)
|
|
|
|
|
|
def format_summary_output(result: AnalysisResult) -> str:
|
|
"""Format analysis results as a single-line summary."""
|
|
parts = []
|
|
|
|
if result.order_only_count > 0:
|
|
parts.append(f"🟢 {result.order_only_count} order-only")
|
|
if result.actual_set_changes_count > 0:
|
|
parts.append(f"🟡 {result.actual_set_changes_count} set changes")
|
|
if result.replace_count > 0:
|
|
parts.append(f"🔴 {result.replace_count} replacements")
|
|
|
|
if not parts:
|
|
return "✅ No changes detected"
|
|
|
|
return " | ".join(parts)
|
|
|
|
|
|
def analyze_plan(
|
|
plan_json: Dict[str, Any],
|
|
include_filter: Optional[List[str]] = None,
|
|
exclude_filter: Optional[List[str]] = None,
|
|
) -> AnalysisResult:
|
|
"""Analyze a terraform plan JSON and return results."""
|
|
result = AnalysisResult()
|
|
|
|
resource_changes = plan_json.get("resource_changes", [])
|
|
|
|
for rc in resource_changes:
|
|
res = analyze_resource_change(rc, include_filter, exclude_filter)
|
|
if res:
|
|
result.resources.append(res)
|
|
|
|
# Count statistics
|
|
if res.is_replace:
|
|
result.replace_count += 1
|
|
elif res.is_create:
|
|
result.create_count += 1
|
|
elif res.is_delete:
|
|
result.delete_count += 1
|
|
|
|
if res.other_changes:
|
|
result.other_changes_count += len(res.other_changes)
|
|
|
|
for set_change in res.set_changes:
|
|
order_only, actual = collect_all_changes(set_change)
|
|
result.order_only_count += len(order_only)
|
|
result.actual_set_changes_count += len(actual)
|
|
|
|
# Add warnings from global config
|
|
result.warnings = CONFIG.warnings.copy()
|
|
|
|
return result
|
|
|
|
|
|
def determine_exit_code(result: AnalysisResult) -> int:
|
|
"""Determine exit code based on analysis results."""
|
|
if result.replace_count > 0:
|
|
return EXIT_RESOURCE_REPLACE
|
|
if (
|
|
result.actual_set_changes_count > 0
|
|
or result.create_count > 0
|
|
or result.delete_count > 0
|
|
):
|
|
return EXIT_SET_CHANGES
|
|
return EXIT_NO_CHANGES
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
"""Parse command line arguments."""
|
|
parser = argparse.ArgumentParser(
|
|
description="Analyze Terraform plan JSON for AzureRM Set-type attribute changes.",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Examples:
|
|
# Basic usage
|
|
python analyze_plan.py plan.json
|
|
|
|
# From stdin
|
|
terraform show -json plan.tfplan | python analyze_plan.py
|
|
|
|
# CI/CD with exit code
|
|
python analyze_plan.py plan.json --exit-code
|
|
|
|
# JSON output for programmatic processing
|
|
python analyze_plan.py plan.json --format json
|
|
|
|
# Summary for CI logs
|
|
python analyze_plan.py plan.json --format summary
|
|
|
|
Exit codes (with --exit-code):
|
|
0 - No changes or order-only changes
|
|
1 - Actual Set attribute changes
|
|
2 - Resource replacement detected
|
|
3 - Error
|
|
""",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"plan_file",
|
|
nargs="?",
|
|
help="Path to terraform plan JSON file (reads from stdin if not provided)",
|
|
)
|
|
parser.add_argument(
|
|
"--format",
|
|
"-f",
|
|
choices=["markdown", "json", "summary"],
|
|
default="markdown",
|
|
help="Output format (default: markdown)",
|
|
)
|
|
parser.add_argument(
|
|
"--exit-code",
|
|
"-e",
|
|
action="store_true",
|
|
help="Return exit code based on change severity",
|
|
)
|
|
parser.add_argument(
|
|
"--quiet",
|
|
"-q",
|
|
action="store_true",
|
|
help="Suppress warnings and verbose output",
|
|
)
|
|
parser.add_argument(
|
|
"--verbose",
|
|
"-v",
|
|
action="store_true",
|
|
help="Show detailed warnings and debug info",
|
|
)
|
|
parser.add_argument(
|
|
"--ignore-case",
|
|
action="store_true",
|
|
help="Ignore case when comparing string values",
|
|
)
|
|
parser.add_argument(
|
|
"--attributes", type=Path, help="Path to custom attributes JSON file"
|
|
)
|
|
parser.add_argument(
|
|
"--include",
|
|
action="append",
|
|
help="Only analyze resources matching this pattern (can be repeated)",
|
|
)
|
|
parser.add_argument(
|
|
"--exclude",
|
|
action="append",
|
|
help="Exclude resources matching this pattern (can be repeated)",
|
|
)
|
|
|
|
return parser.parse_args()
|
|
|
|
|
|
def main():
|
|
"""Main entry point."""
|
|
global AZURERM_SET_ATTRIBUTES
|
|
|
|
args = parse_args()
|
|
|
|
# Configure global settings
|
|
CONFIG.ignore_case = args.ignore_case
|
|
CONFIG.quiet = args.quiet
|
|
CONFIG.verbose = args.verbose
|
|
CONFIG.warnings = []
|
|
|
|
# Load Set attributes from external JSON
|
|
AZURERM_SET_ATTRIBUTES = load_set_attributes(args.attributes)
|
|
|
|
# Read plan input
|
|
if args.plan_file:
|
|
try:
|
|
with open(args.plan_file, "r") as f:
|
|
plan_json = json.load(f)
|
|
except FileNotFoundError:
|
|
print(f"Error: File not found: {args.plan_file}", file=sys.stderr)
|
|
sys.exit(EXIT_ERROR)
|
|
except json.JSONDecodeError as e:
|
|
print(f"Error: Invalid JSON: {e}", file=sys.stderr)
|
|
sys.exit(EXIT_ERROR)
|
|
else:
|
|
try:
|
|
plan_json = json.load(sys.stdin)
|
|
except json.JSONDecodeError as e:
|
|
print(f"Error: Invalid JSON from stdin: {e}", file=sys.stderr)
|
|
sys.exit(EXIT_ERROR)
|
|
|
|
# Check for empty plan
|
|
resource_changes = plan_json.get("resource_changes", [])
|
|
if not resource_changes:
|
|
if args.format == "json":
|
|
print(
|
|
json.dumps(
|
|
{
|
|
"summary": {},
|
|
"has_real_changes": False,
|
|
"resources": [],
|
|
"warnings": [],
|
|
}
|
|
)
|
|
)
|
|
elif args.format == "summary":
|
|
print("✅ No changes detected")
|
|
else:
|
|
print("# Terraform Plan Analysis Results\n")
|
|
print("No resource changes detected.")
|
|
sys.exit(EXIT_NO_CHANGES)
|
|
|
|
# Analyze the plan
|
|
result = analyze_plan(plan_json, args.include, args.exclude)
|
|
|
|
# Format output
|
|
if args.format == "json":
|
|
output = format_json_output(result)
|
|
elif args.format == "summary":
|
|
output = format_summary_output(result)
|
|
else:
|
|
output = format_markdown_output(result)
|
|
|
|
print(output)
|
|
|
|
# Determine exit code
|
|
if args.exit_code:
|
|
sys.exit(determine_exit_code(result))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|