add mini-context-graph skill (#1580)

* add mini-context-graph skill

* remove pycache files

* filename case update to SKILL.md

* update readme
This commit is contained in:
Nixon Kurian
2026-05-05 09:34:37 +05:30
committed by GitHub
parent 1f96bce626
commit 746ba555b6
16 changed files with 2343 additions and 0 deletions
@@ -0,0 +1,191 @@
"""
documents_store.py — Persistent storage for raw documents and chunks (RAG layer).
Inspired by Karpathy's LLM Wiki pattern: raw sources are immutable and stored
as the ground truth. Chunks are the retrieval unit; provenance links tie graph
nodes/edges back to specific chunks.
Handles:
- Storing raw documents with metadata
- Chunking documents into overlapping text windows
- Retrieving chunks by id or by keyword search
- Persisting to data/documents.json
"""
from __future__ import annotations
import json
import os
import re
import sys
import uuid
from datetime import datetime, timezone
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
import config
_DATA_DIR = Path(os.environ.get("MINI_CONTEXT_GRAPH_DATA_DIR", str(config.DATA_DIR)))
_DOCS_FILE = _DATA_DIR / "documents.json"
_CHUNK_SIZE = 500 # characters per chunk
_CHUNK_OVERLAP = 100 # overlap between consecutive chunks
_STOPWORDS = frozenset([
"a", "an", "the", "is", "are", "was", "were", "be", "been", "being",
"have", "has", "had", "do", "does", "did", "will", "would", "could",
"should", "may", "might", "shall", "can", "to", "of", "in", "on",
"at", "by", "for", "with", "from", "and", "or", "but", "not", "it",
"its", "this", "that", "these", "those", "i", "you", "he", "she",
"we", "they", "what", "which", "who", "how", "why", "when", "where",
])
def _load() -> dict:
if _DOCS_FILE.exists():
with open(_DOCS_FILE, "r") as f:
return json.load(f)
return {"documents": {}}
def _save(store: dict) -> None:
_DATA_DIR.mkdir(parents=True, exist_ok=True)
with open(_DOCS_FILE, "w") as f:
json.dump(store, f, indent=2)
def _tokenize(text: str) -> list[str]:
tokens = re.findall(r"[a-z0-9]+", text.lower())
return [t for t in tokens if t not in _STOPWORDS and len(t) > 1]
def _chunk_text(content: str, chunk_size: int = _CHUNK_SIZE, overlap: int = _CHUNK_OVERLAP) -> list[str]:
"""Split content into overlapping character windows."""
chunks = []
start = 0
while start < len(content):
end = start + chunk_size
chunks.append(content[start:end].strip())
if end >= len(content):
break
start += chunk_size - overlap
return [c for c in chunks if c]
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def add_document(
doc_id: str,
title: str,
source: str,
content: str,
) -> dict:
"""
Store a raw document and auto-generate chunks.
Args:
doc_id: Caller-supplied stable identifier (e.g. "doc_001" or a filename).
title: Human-readable title.
source: Origin path/URL (immutable provenance pointer).
content: Full raw text to store and chunk.
Returns:
The stored document dict including generated chunk_ids.
"""
store = _load()
# Idempotent: return existing doc if already stored
if doc_id in store["documents"]:
return store["documents"][doc_id]
raw_chunks = _chunk_text(content)
chunks = []
for i, text in enumerate(raw_chunks):
chunks.append({
"chunk_id": f"{doc_id}_chunk_{i:03d}",
"index": i,
"text": text,
})
doc = {
"id": doc_id,
"title": title,
"source": source,
"content": content,
"chunks": chunks,
"ingestion_date": datetime.now(timezone.utc).isoformat(),
}
store["documents"][doc_id] = doc
_save(store)
return doc
def get_document(doc_id: str) -> dict | None:
"""Return the full document record or None if not found."""
store = _load()
return store["documents"].get(doc_id)
def get_chunk(chunk_id: str) -> dict | None:
"""Return a specific chunk by its chunk_id (searches across all documents)."""
store = _load()
for doc in store["documents"].values():
for chunk in doc["chunks"]:
if chunk["chunk_id"] == chunk_id:
return chunk
return None
def get_chunks_for_document(doc_id: str) -> list[dict]:
"""Return all chunks for a document."""
doc = get_document(doc_id)
if doc is None:
return []
return doc["chunks"]
def search_chunks(query: str, top_k: int = 5) -> list[dict]:
"""
Keyword search over chunk text. Returns top_k matching chunks sorted by
term overlap (simple TF-style scoring, no embeddings required).
Returns list of dicts with keys: chunk_id, doc_id, score, text.
"""
store = _load()
query_tokens = set(_tokenize(query))
if not query_tokens:
return []
scored: list[tuple[float, dict]] = []
for doc in store["documents"].values():
for chunk in doc["chunks"]:
chunk_tokens = set(_tokenize(chunk["text"]))
overlap = len(query_tokens & chunk_tokens)
if overlap > 0:
score = overlap / len(query_tokens)
scored.append((score, {
"chunk_id": chunk["chunk_id"],
"doc_id": doc["id"],
"doc_title": doc["title"],
"score": round(score, 4),
"text": chunk["text"],
}))
scored.sort(key=lambda x: x[0], reverse=True)
return [item for _, item in scored[:top_k]]
def list_documents() -> list[dict]:
"""Return a summary list of all stored documents (no content, no chunks)."""
store = _load()
return [
{
"id": doc["id"],
"title": doc["title"],
"source": doc["source"],
"chunk_count": len(doc["chunks"]),
"ingestion_date": doc["ingestion_date"],
}
for doc in store["documents"].values()
]
@@ -0,0 +1,202 @@
"""
graph_store.py — Persistent storage for graph nodes and edges.
Handles:
- Adding/deduplicating nodes
- Adding edges with confidence
- Fetching neighbors
- Persisting to graph.json
"""
from __future__ import annotations
import json
import os
import sys
import uuid
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
import config
_DATA_DIR = Path(os.environ.get("MINI_CONTEXT_GRAPH_DATA_DIR", str(config.DATA_DIR)))
_GRAPH_FILE = _DATA_DIR / "graph.json"
def _load() -> dict:
if _GRAPH_FILE.exists():
with open(_GRAPH_FILE, "r") as f:
return json.load(f)
return {"nodes": {}, "edges": []}
def _save(graph: dict) -> None:
_DATA_DIR.mkdir(parents=True, exist_ok=True)
with open(_GRAPH_FILE, "w") as f:
json.dump(graph, f, indent=2)
def add_node(
name: str,
node_type: str,
source_document: str | None = None,
source_chunks: list[str] | None = None,
) -> str:
"""
Add a node if it doesn't exist. Returns node_id.
Args:
source_document: doc_id from documents_store (provenance pointer).
source_chunks: list of chunk_ids that mention this entity.
"""
graph = _load()
name_lower = name.strip().lower()
# Deduplication: search by normalized name
for node_id, node in graph["nodes"].items():
if node["name"] == name_lower:
# Merge provenance if new info provided
changed = False
if source_document and node.get("source_document") is None:
node["source_document"] = source_document
changed = True
if source_chunks:
existing = set(node.get("source_chunks") or [])
merged = list(existing | set(source_chunks))
if merged != list(existing):
node["source_chunks"] = merged
changed = True
if changed:
_save(graph)
return node_id
node_id = str(uuid.uuid4())[:8]
graph["nodes"][node_id] = {
"name": name_lower,
"type": node_type.strip().lower(),
"source_document": source_document,
"source_chunks": source_chunks or [],
}
_save(graph)
return node_id
def add_edge(
source_id: str,
target_id: str,
relation: str,
confidence: float,
source_document: str | None = None,
supporting_text: str | None = None,
chunk_id: str | None = None,
) -> None:
"""
Add a directed edge between two nodes.
Args:
source_document: doc_id from documents_store (provenance pointer).
supporting_text: The exact text span that supports this relation.
chunk_id: The specific chunk_id the supporting text came from.
"""
graph = _load()
# Deduplicate edges by source + target + relation
relation_lower = relation.strip().lower()
for edge in graph["edges"]:
if (
edge["source"] == source_id
and edge["target"] == target_id
and edge["type"] == relation_lower
):
changed = False
if confidence > edge["confidence"]:
edge["confidence"] = confidence
changed = True
if source_document and edge.get("source_document") is None:
edge["source_document"] = source_document
changed = True
if supporting_text and edge.get("supporting_text") is None:
edge["supporting_text"] = supporting_text
changed = True
if chunk_id and edge.get("chunk_id") is None:
edge["chunk_id"] = chunk_id
changed = True
if changed:
_save(graph)
return
graph["edges"].append({
"source": source_id,
"target": target_id,
"type": relation_lower,
"confidence": confidence,
"source_document": source_document,
"supporting_text": supporting_text,
"chunk_id": chunk_id,
})
_save(graph)
def get_neighbors(node_id: str, min_confidence: float = 0.0) -> list[str]:
"""Return node_ids of all neighbors reachable from node_id."""
graph = _load()
neighbors = []
for edge in graph["edges"]:
if edge["confidence"] < min_confidence:
continue
if edge["source"] == node_id:
neighbors.append(edge["target"])
elif edge["target"] == node_id:
neighbors.append(edge["source"])
return list(set(neighbors))
def get_node(node_id: str) -> dict | None:
"""Fetch a single node by ID."""
graph = _load()
return graph["nodes"].get(node_id)
def get_subgraph(node_ids: list[str]) -> dict:
"""Return nodes and edges induced by the given node_ids."""
graph = _load()
node_id_set = set(node_ids)
nodes = {nid: graph["nodes"][nid] for nid in node_ids if nid in graph["nodes"]}
edges = [
e
for e in graph["edges"]
if e["source"] in node_id_set and e["target"] in node_id_set
]
return {"nodes": nodes, "edges": edges}
def find_node_by_name(name: str) -> str | None:
"""Return node_id for a given normalized name, or None."""
graph = _load()
name_lower = name.strip().lower()
for node_id, node in graph["nodes"].items():
if node["name"] == name_lower:
return node_id
return None
def link_node_to_source(node_id: str, doc_id: str, chunk_ids: list[str]) -> None:
"""Attach provenance (doc_id + chunk_ids) to an existing node."""
graph = _load()
if node_id not in graph["nodes"]:
return
node = graph["nodes"][node_id]
node["source_document"] = doc_id
existing = set(node.get("source_chunks") or [])
node["source_chunks"] = list(existing | set(chunk_ids))
_save(graph)
def get_node_sources(node_id: str) -> dict:
"""Return provenance info (source_document + source_chunks) for a node."""
graph = _load()
node = graph["nodes"].get(node_id, {})
return {
"source_document": node.get("source_document"),
"source_chunks": node.get("source_chunks", []),
}
@@ -0,0 +1,90 @@
"""
index_store.py — Maintains entity and keyword indexes for fast lookup.
Handles:
- Entity index: name → [node_ids]
- Keyword index: token → [node_ids]
- Persist to index.json
"""
from __future__ import annotations
import json
import os
import re
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
import config
_DATA_DIR = Path(os.environ.get("MINI_CONTEXT_GRAPH_DATA_DIR", str(config.DATA_DIR)))
_INDEX_FILE = _DATA_DIR / "index.json"
_STOPWORDS = frozenset(
[
"a", "an", "the", "is", "are", "was", "were", "be", "been", "being",
"have", "has", "had", "do", "does", "did", "will", "would", "could",
"should", "may", "might", "shall", "can", "to", "of", "in", "on",
"at", "by", "for", "with", "from", "and", "or", "but", "not", "it",
"its", "this", "that", "these", "those", "i", "you", "he", "she",
"we", "they", "what", "which", "who", "how", "why", "when", "where",
]
)
def _load() -> dict:
if _INDEX_FILE.exists():
with open(_INDEX_FILE, "r") as f:
return json.load(f)
return {"entity_index": {}, "keyword_index": {}}
def _save(index: dict) -> None:
_DATA_DIR.mkdir(parents=True, exist_ok=True)
with open(_INDEX_FILE, "w") as f:
json.dump(index, f, indent=2)
def _tokenize(text: str) -> list[str]:
"""Split text into lowercase tokens, removing stopwords and short tokens."""
tokens = re.findall(r"[a-z0-9]+", text.lower())
return [t for t in tokens if t not in _STOPWORDS and len(t) > 1]
def add_entity(name: str, node_id: str) -> None:
"""Register an entity name → node_id in both entity and keyword indexes."""
index = _load()
name_lower = name.strip().lower()
# Entity index
if name_lower not in index["entity_index"]:
index["entity_index"][name_lower] = []
if node_id not in index["entity_index"][name_lower]:
index["entity_index"][name_lower].append(node_id)
# Keyword index
for token in _tokenize(name_lower):
if token not in index["keyword_index"]:
index["keyword_index"][token] = []
if node_id not in index["keyword_index"][token]:
index["keyword_index"][token].append(node_id)
_save(index)
def search(query: str) -> list[str]:
"""Search for node_ids matching the query via entity name or keywords."""
index = _load()
query_lower = query.strip().lower()
matched_ids: set[str] = set()
# Exact entity name match
if query_lower in index["entity_index"]:
matched_ids.update(index["entity_index"][query_lower])
# Keyword match
for token in _tokenize(query_lower):
if token in index["keyword_index"]:
matched_ids.update(index["keyword_index"][token])
return list(matched_ids)
@@ -0,0 +1,175 @@
"""
ontology_store.py — Tracks entity types and relation types.
Handles:
- Registering types and relations with usage counts
- Normalizing types and relations via synonym mapping
- Persisting to ontology.json
NOTE: No LLM logic here. Normalization is rule-based (lowercase + synonym map).
"""
import json
import os
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
import config
_DATA_DIR = Path(os.environ.get("MINI_CONTEXT_GRAPH_DATA_DIR", str(config.DATA_DIR)))
_ONTOLOGY_FILE = _DATA_DIR / "ontology.json"
# Synonym maps — lowercase variants map to canonical forms
_ENTITY_TYPE_MAP: dict[str, str] = {
"component": "component",
"module": "component",
"class": "component",
"function": "component",
"method": "component",
"bug": "issue",
"defect": "issue",
"fault": "issue",
"error": "issue",
"failure": "issue",
"problem": "issue",
"crash": "issue",
"server": "infrastructure",
"host": "infrastructure",
"machine": "infrastructure",
"node": "infrastructure",
"user": "actor",
"person": "actor",
"operator": "actor",
"admin": "actor",
"administrator": "actor",
"actor": "actor",
"app": "software",
"application": "software",
"service": "software",
"program": "software",
"software": "software",
"database": "storage",
"datastore": "storage",
"db": "storage",
"storage": "storage",
"api": "interface",
"endpoint": "interface",
"interface": "interface",
"connection": "interface",
"event": "event",
"incident": "event",
"occurrence": "event",
"trigger": "event",
"concept": "concept",
"idea": "concept",
"principle": "concept",
"theory": "concept",
"process": "process",
"thread": "process",
"task": "process",
"job": "process",
"workflow": "process",
"object": "component",
"resource": "component",
"memory": "resource",
"cpu": "resource",
"system": "system",
"platform": "system",
"framework": "system",
"library": "software",
"package": "software",
}
_RELATION_TYPE_MAP: dict[str, str] = {
"causes": "causes",
"triggers": "causes",
"leads to": "causes",
"results in": "causes",
"produces": "causes",
"is part of": "contains",
"belongs to": "contains",
"lives in": "contains",
"sits in": "contains",
"contains": "contains",
"depends on": "depends on",
"requires": "depends on",
"needs": "depends on",
"uses": "uses",
"calls": "uses",
"invokes": "uses",
"consumes": "uses",
"affects": "affects",
"impacts": "affects",
"influences": "affects",
"creates": "creates",
"instantiates": "creates",
"spawns": "creates",
"connects to": "connects to",
"links to": "connects to",
"references": "connects to",
"inherits from": "extends",
"extends": "extends",
"subclasses": "extends",
"reads from": "reads from",
"queries": "reads from",
"fetches": "reads from",
"writes to": "writes to",
"stores in": "writes to",
"persists to": "writes to",
"contributes to": "contributes to",
"allocated by": "allocated by",
"released by": "released by",
"not released": "not released",
}
def _load() -> dict:
if _ONTOLOGY_FILE.exists():
with open(_ONTOLOGY_FILE, "r") as f:
return json.load(f)
return {"entity_types": {}, "relation_types": {}}
def _save(ontology: dict) -> None:
_DATA_DIR.mkdir(parents=True, exist_ok=True)
with open(_ONTOLOGY_FILE, "w") as f:
json.dump(ontology, f, indent=2)
def normalize_type(type_name: str) -> str:
"""Return the canonical form of an entity type."""
key = type_name.strip().lower().replace("-", " ").replace("_", " ")
return _ENTITY_TYPE_MAP.get(key, key)
def normalize_relation(relation_name: str) -> str:
"""Return the canonical form of a relation type."""
key = relation_name.strip().lower().replace("-", " ").replace("_", " ")
return _RELATION_TYPE_MAP.get(key, key)
def add_type(type_name: str) -> None:
"""Register an entity type, incrementing its usage count."""
ontology = _load()
canonical = normalize_type(type_name)
ontology["entity_types"][canonical] = ontology["entity_types"].get(canonical, 0) + 1
_save(ontology)
def add_relation(relation_name: str) -> None:
"""Register a relation type, incrementing its usage count."""
ontology = _load()
canonical = normalize_relation(relation_name)
ontology["relation_types"][canonical] = ontology["relation_types"].get(canonical, 0) + 1
_save(ontology)
def get_all_types() -> dict[str, int]:
"""Return all registered entity types with counts."""
return _load()["entity_types"]
def get_all_relations() -> dict[str, int]:
"""Return all registered relation types with counts."""
return _load()["relation_types"]
@@ -0,0 +1,58 @@
"""
retrieval_engine.py — BFS-based graph traversal for context retrieval.
Input: seed node_ids + depth
Output: list of node_ids within traversal depth filtered by min_confidence
"""
from __future__ import annotations
import sys
from pathlib import Path
from collections import deque
# Allow imports from parent package
sys.path.insert(0, str(Path(__file__).parent.parent))
from tools import graph_store
import config
def retrieve(
seed_node_ids: list[str],
depth: int = config.MAX_GRAPH_DEPTH,
min_confidence: float = config.MIN_CONFIDENCE,
max_nodes: int = config.MAX_NODES,
) -> list[str]:
"""
BFS from seed nodes up to `depth` hops.
Returns a list of node_ids (including seeds) within the traversal,
filtered by min_confidence on edges and capped at max_nodes.
"""
visited: set[str] = set()
# Queue items: (node_id, current_depth)
queue: deque[tuple[str, int]] = deque()
for seed in seed_node_ids:
if seed not in visited:
visited.add(seed)
queue.append((seed, 0))
while queue:
if len(visited) >= max_nodes:
break
node_id, current_depth = queue.popleft()
if current_depth >= depth:
continue
neighbors = graph_store.get_neighbors(node_id, min_confidence=min_confidence)
for neighbor in neighbors:
if neighbor not in visited:
visited.add(neighbor)
queue.append((neighbor, current_depth + 1))
if len(visited) >= max_nodes:
break
return list(visited)
@@ -0,0 +1,294 @@
"""
wiki_store.py — Manages the persistent wiki layer.
Inspired by Karpathy's LLM Wiki pattern: the wiki is a directory of LLM-generated
markdown pages that the agent writes and maintains. This module provides the
deterministic file I/O and index/log management so the agent can focus on
reasoning, not bookkeeping.
Wiki structure (relative to project root):
wiki/
index.md ← content-oriented catalog of all pages
log.md ← chronological append-only operation log
entities/ ← one page per entity (person, concept, system, etc.)
summaries/ ← source document summary pages
topics/ ← cross-cutting synthesis and topic pages
The agent WRITES pages; this module handles the filesystem + index + log.
"""
from __future__ import annotations
import os
import re
import sys
from datetime import datetime, timezone
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
import config
_WIKI_DIR = Path(os.environ.get("MINI_CONTEXT_GRAPH_WIKI_DIR", str(config.WIKI_DIR)))
_INDEX_FILE = _WIKI_DIR / "index.md"
_LOG_FILE = _WIKI_DIR / "log.md"
_CATEGORY_DIRS = {
"entity": _WIKI_DIR / "entities",
"summary": _WIKI_DIR / "summaries",
"topic": _WIKI_DIR / "topics",
}
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _ensure_dirs() -> None:
_WIKI_DIR.mkdir(parents=True, exist_ok=True)
for d in _CATEGORY_DIRS.values():
d.mkdir(parents=True, exist_ok=True)
def _now_iso() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%d")
def _slug(title: str) -> str:
"""Convert a title to a filesystem-safe slug."""
slug = title.lower().strip()
slug = re.sub(r"[^a-z0-9]+", "-", slug)
return slug.strip("-")
def _page_path(category: str, slug: str) -> Path:
base = _CATEGORY_DIRS.get(category, _WIKI_DIR)
return base / f"{slug}.md"
# ---------------------------------------------------------------------------
# Index management
# ---------------------------------------------------------------------------
def _load_index() -> list[dict]:
"""Parse index.md into a list of entry dicts."""
if not _INDEX_FILE.exists():
return []
entries = []
for line in _INDEX_FILE.read_text().splitlines():
# Expected table row: | [[slug]] | category | summary | date |
if line.startswith("| [["):
parts = [p.strip() for p in line.split("|") if p.strip()]
if len(parts) >= 3:
link = parts[0] # [[slug]]
category = parts[1] if len(parts) > 1 else ""
summary = parts[2] if len(parts) > 2 else ""
date = parts[3] if len(parts) > 3 else ""
slug = re.sub(r"\[\[|\]\]", "", link)
entries.append({
"slug": slug,
"category": category,
"summary": summary,
"date": date,
})
return entries
def _save_index(entries: list[dict]) -> None:
"""Rewrite index.md from the entries list."""
_ensure_dirs()
lines = [
"# Wiki Index\n",
"_Auto-managed by wiki_store. Do not edit the table manually._\n\n",
"| Page | Category | Summary | Date |\n",
"|------|----------|---------|------|\n",
]
for e in entries:
lines.append(
f"| [[{e['slug']}]] | {e['category']} | {e['summary']} | {e['date']} |\n"
)
_INDEX_FILE.write_text("".join(lines))
def _append_log(operation: str, detail: str) -> None:
"""Append a timestamped entry to log.md."""
_ensure_dirs()
timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%d")
entry = f"\n## [{timestamp}] {operation} | {detail}\n"
with open(_LOG_FILE, "a") as f:
f.write(entry)
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def write_page(
category: str,
title: str,
content: str,
summary: str = "",
) -> str:
"""
Write (or overwrite) a wiki page.
The agent provides the full markdown content. This method handles:
- Writes the .md file to the appropriate category subfolder.
- Updates index.md with a one-line entry.
- Appends an entry to log.md.
Args:
category: One of "entity", "summary", "topic".
title: Human-readable page title (used for slug + index).
content: Full markdown content the agent wrote.
summary: One-line summary for the index (optional; auto-extracted if empty).
Returns:
Relative path from wiki root (e.g. "entities/memory-leak.md").
"""
_ensure_dirs()
slug = _slug(title)
path = _page_path(category, slug)
# Auto-extract first non-heading, non-empty line as summary if not provided
if not summary:
for line in content.splitlines():
stripped = line.strip()
if stripped and not stripped.startswith("#"):
summary = stripped[:100]
break
path.write_text(content)
# Update index
entries = _load_index()
existing = next((e for e in entries if e["slug"] == slug), None)
if existing:
existing["summary"] = summary
existing["date"] = _now_iso()
else:
entries.append({
"slug": slug,
"category": category,
"summary": summary,
"date": _now_iso(),
})
_save_index(entries)
_append_log("write", title)
return str(path.relative_to(_WIKI_DIR))
def read_page(category: str, title: str) -> str | None:
"""Read a wiki page's content. Returns None if not found."""
slug = _slug(title)
path = _page_path(category, slug)
if not path.exists():
return None
return path.read_text()
def read_page_by_slug(slug: str) -> str | None:
"""Read a wiki page by slug, searching across all categories."""
for d in list(_CATEGORY_DIRS.values()) + [_WIKI_DIR]:
path = d / f"{slug}.md"
if path.exists():
return path.read_text()
return None
def search_wiki(query: str) -> list[dict]:
"""
Simple keyword search over all wiki pages.
Returns list of {slug, category, path, snippet} sorted by relevance.
"""
query_tokens = set(re.findall(r"[a-z0-9]+", query.lower()))
if not query_tokens:
return []
results = []
for category, base_dir in _CATEGORY_DIRS.items():
if not base_dir.exists():
continue
for page_path in base_dir.glob("*.md"):
content = page_path.read_text().lower()
content_tokens = set(re.findall(r"[a-z0-9]+", content))
overlap = len(query_tokens & content_tokens)
if overlap > 0:
# Extract a short snippet around first match
first_token = next(iter(query_tokens & content_tokens), "")
idx = content.find(first_token)
snippet = content[max(0, idx - 30):idx + 80].replace("\n", " ").strip()
results.append({
"slug": page_path.stem,
"category": category,
"path": str(page_path.relative_to(_WIKI_DIR)),
"score": overlap,
"snippet": snippet,
})
results.sort(key=lambda x: x["score"], reverse=True)
return results
def list_pages(category: str | None = None) -> list[dict]:
"""List all wiki pages, optionally filtered by category."""
entries = _load_index()
if category:
return [e for e in entries if e["category"] == category]
return entries
def get_log(last_n: int = 20) -> list[str]:
"""Return the last N log entries from log.md."""
if not _LOG_FILE.exists():
return []
lines = _LOG_FILE.read_text().splitlines()
entries = [l for l in lines if l.startswith("## [")]
return entries[-last_n:]
def lint_wiki() -> dict:
"""
Health-check the wiki as described in Karpathy's LLM Wiki pattern.
Checks for:
- Orphan pages (in directory but not in index)
- Missing pages (in index but file deleted)
- Broken wikilinks ([[slug]] pointing to non-existent file)
- Pages with no wikilinks (isolated pages)
Returns:
{
"orphan_pages": [...],
"missing_pages": [...],
"broken_wikilinks": {slug: [broken_links]},
"isolated_pages": [...],
}
"""
index_entries = {e["slug"] for e in _load_index()}
file_slugs: dict[str, Path] = {}
for d in _CATEGORY_DIRS.values():
if d.exists():
for p in d.glob("*.md"):
file_slugs[p.stem] = p
orphans = [s for s in file_slugs if s not in index_entries]
missing = [s for s in index_entries if s not in file_slugs]
broken_wikilinks: dict[str, list[str]] = {}
isolated: list[str] = []
all_slugs = set(file_slugs.keys())
for slug, path in file_slugs.items():
content = path.read_text()
links = re.findall(r"\[\[([^\]]+)\]\]", content)
if not links:
isolated.append(slug)
broken = [lnk for lnk in links if _slug(lnk) not in all_slugs]
if broken:
broken_wikilinks[slug] = broken
return {
"orphan_pages": orphans,
"missing_pages": missing,
"broken_wikilinks": broken_wikilinks,
"isolated_pages": isolated,
}