Operations Layer — Proof

Real software. Real tests (just ran). Real live evidence. No claims.

Honest tally: 8 modules total. 5 are wired and doing real work right now. 1 (Ingestion Planner) is built and tested but has no live caller yet — it's a library waiting for integration. 1 (Security Layer) is built, tested (101 tests pass), and dormant — nothing flows through it yet because there's no JSON-RPC traffic to secure. 1 (Integration Test Suite) isn't a service — it's the 578-test suite that proves the other 7 work together.
5
Wired & Running
2
Built, Not Wired
578
Tests Passing

#1 — Task Scheduler (AgentRM)

Wired & Running
What it is (plain English)

A priority queue that decides which jobs run first. Health checks go before background work. Every recurring task (cleanup, snapshots, credential sync) gets submitted here and runs in priority order.

The actual code (core function)
def submit_task(self, task_id, priority, payload, max_runtime_s=30.0):
    """Submit a task to the MLFQ.
    priority: 0 (interactive), 1 (standard), 2 (background)"""
    if priority not in (0, 1, 2):
        raise ValueError(f"Invalid priority {priority}")
    if self._find_task(task_id) is not None:
        raise ValueError(f"Task {task_id} already exists")
    task = Task(task_id=task_id, priority=priority,
                original_priority=priority, payload=payload,
                max_runtime_s=max_runtime_s, ...)
    self.queues[priority].append(task)
    self._persist()
    return {"task_id": task_id, "queue": priority, ...}
Tests (just ran)
$ python3 -m pytest tests/test_agent_rm.py -q
43 passed in 0.05s
Live evidence (from data/agent-rm-state.json, read just now)
5 tasks submitted, 5 completed, 0 failed, 0 still queued.
Last completed: zombie-cleanup at Q0 (health priority), health monitor-health at Q1, git-snapshot at Q2 (background).
The scheduler daemon submits every recurring job through this queue by priority.

#2 — Stuck-Process Cleaner (Zombie Reaper)

Wired & Running
What it is (plain English)

A background watchdog that checks every running process, detects ones that are stuck or exceeded their time limit, and kills them cleanly. Prevents the system from accumulating zombie processes that waste resources.

The actual code (core function)
def check_all(self) -> List[ZombieReport]:
    """Check all registered PIDs for zombie status.
    1. Check if process still exists
    2. Check if PID was recycled (cmdline changed)
    3. Check if runtime exceeds max_runtime_s"""
    self._total_checked += 1
    for task_id in list(self._registry.keys()):
        proc = self._registry.get(task_id)
        pid = proc.pid
        runtime_s = now - proc.registered_at
        if not _process_exists(pid):
            self.unregister_pid(task_id)  # already dead
        elif current_cmdline != proc.cmdline_snapshot:
            self.unregister_pid(task_id)  # PID recycled
        elif runtime_s > proc.max_runtime_s:
            self._reap_process(task_id, pid)  # SIGTERM then SIGKILL
Tests (just ran)
$ python3 -m pytest tests/test_zombie_reaper.py -q
56 passed in 1.43s
Live evidence (from data/zombie-reaper-status.json, read just now)
Running right now. Uptime: 63 minutes.
757 process checks completed this session. 0 stuck processes found (clean system).
Last heartbeat: a few seconds ago. Runs a sweep every 5 seconds.

#3 — Memory Manager (Context Lifecycle Manager)

Wired & Running
What it is (plain English)

Manages the AI's conversation memory. Archives old conversations, compresses them into searchable knowledge, and flushes context when it gets too full. Prevents the AI from running out of thinking space mid-task.

The actual code (architecture from the file header)
"""Context Lifecycle Manager (CLM)

Intelligence layer on top of ctx_watchdog (sensing layer).
Manages the full context lifecycle:

  Tier 2 (Cold)  : JSONL archives of raw conversation messages
  Compression    : Rule-based semantic compression
  Tier 1 (Warm)  : SQLite database of compressed knowledge
  Flush Protocol : Orchestrates archive -> compress -> store
"""

# Commands:
#   status   - show current memory usage
#   archive  - archive a session's messages
#   query    - search compressed knowledge
#   flush    - orchestrate full archive->compress->store
#   check    - read watchdog status and act if needed
Tests (just ran)
$ python3 -m pytest tests/test_clm.py -q
58 passed in 0.08s
Live evidence (from data/ctx-metrics.json, read just now)
Running as a daemon (clm-daemon tmux session).
Monitoring context usage continuously. Last recorded context level at 90% on June 6 — triggered compression cycle to free up space.

#4 — Work Planner (Ingestion Planner)

Built, Not Wired
What it is (plain English)

Takes a high-level objective (like "research competitors, then analyze, then report") and breaks it into a dependency graph of sub-tasks. Classifies each sub-task as light/medium/heavy and figures out which can run in parallel. Then submits the plan to the Task Scheduler (#1) for execution.

The actual code (core class)
class IngestionPlanner:
    """Ingest user objectives, build execution plan DAGs,
    and submit to AgentRM for scheduling."""

    def ingest_objective(self, objective):
        # 1. Split objective into sub-tasks
        # 2. Detect dependencies between them
        # 3. Classify each as light/medium/heavy
        # 4. Build an ExecutionPlan (DAG)
        return plan

    def submit_plan(self, plan_id):
        # Submit ready tasks to AgentRM queue
        return result
Tests (just ran)
$ python3 -m pytest tests/test_ingestion_planner.py -q
129 passed in 0.53s
Honest status
Built and tested, but not wired yet. No part of the system currently sends objectives to this planner. It's ready for the day we want the AI to automatically break down complex requests into scheduled sub-tasks. The code works (129 tests prove it), it just doesn't have a live caller.

#5 — Leader Lock (Dual-System Lease Manager)

Wired & Running
What it is (plain English)

Two AI systems (Secondary node and Primary node) share the same infrastructure. This module decides which one is "in charge" at any moment. It uses a lock (lease) with a heartbeat — if the leader stops heartbeating, the other one can take over. Prevents both AIs from stepping on each other's work.

The actual code (core function)
def try_acquire_lease(force=False):
    """Attempt to claim leadership.
    Checks if existing lease is expired (heartbeat + ttl < now).
    If expired or no lease, claims it with a new term number.
    Uses file locking to prevent race conditions."""
    lock_fd = open(lock_path, "w")
    fcntl.flock(lock_fd.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
    lease = _load_lease()
    if lease["holder"] is not None and not force:
        if not _is_lease_expired(lease):
            return {"acquired": True, ...}  # already ours
    # Expired or empty -- claim it
    new_term = lease.get("term", 0) + 1
    _write_lease({"holder": CIV_NAME, "term": new_term, ...})
Tests (just ran)
$ python3 -m pytest tests/test_lease_manager.py -q
57 passed in 0.10s
Live evidence (from data/lease-state.json, read just now)
Secondary node holds the lease right now. Term: 2. Acquired at 2:00 AM today.
Heartbeat is live — last beat a few seconds ago (every 0.5s).
7,658 events logged in the event ledger since this term started.

#6 — Backup & Recovery (State Recovery Engine)

Wired & Running
What it is (plain English)

Creates point-in-time snapshots of system state every 2 hours. If the leader fails over (#5), this module loads the latest snapshot, checks its integrity (SHA-256 hash), replays any events that happened after the snapshot, and re-queues unfinished work. It's the disaster recovery system.

The actual code (core function)
def create_checkpoint(self, checkpoint_id=None, extra_state=None):
    """Create a validated state checkpoint.
    Captures task summary and any extra state.
    Stores as JSON with SHA-256 integrity hash."""
    state_snapshot = {"system": CIV_NAME, "checkpoint_id": checkpoint_id}
    task_summary = self._get_task_summary()
    state_snapshot["task_summary"] = task_summary
    term = self._get_current_term()
    integrity_hash = _compute_hash(state_snapshot)
    checkpoint = Checkpoint(
        checkpoint_id=checkpoint_id,
        state_snapshot=state_snapshot,
        integrity_hash=integrity_hash,
        term=term, ...)
Tests (just ran)
$ python3 -m pytest tests/test_state_recovery.py -q
88 passed in 0.74s
Live evidence (from data/recovery-checkpoints/, read just now)
2 checkpoints on disk right now (live-checkpoint-1.json, scheduled.json).
Last checkpoint created at 2:52 AM today by the scheduler daemon.
New checkpoints are created every 2 hours automatically.

#7 — Security Gate (MCP/JSON-RPC Security Layer)

Built, Dormant
What it is (plain English)

A 3-tier security system that controls who can do what. Tier 1 (Trusted Core) gets full access. Tier 2 (Department Leads) gets schema-locked access. Tier 3 (External) gets read-only, air-gapped access. Also scans for prompt injection attacks and logs every access attempt.

The actual code (core class)
class SecurityLayer:
    """Unified facade for the MCP/JSON-RPC security layer.
    Combines entity registry, JSON-RPC validation,
    injection scanning, access control, and audit logging."""

    def __init__(self, registry_path, audit_dir, risk_threshold):
        self.audit = AuditLogger(log_dir=audit_dir)
        self.registry = EntityRegistry(path=registry_path)
        self.validator = JSONRPCValidator()
        self.scanner = InjectionScanner(threshold=risk_threshold)
        self.access = AccessController(self.registry, self.audit)

    def validate_message(self, message, sender_id):
        """Validate a JSON-RPC message from sender.
        Resolves sender's tier, then validates."""
        entity = self.registry.get_entity(sender_id)
        if entity is None:
            return ValidationResult(valid=False,
                errors=[f"Unknown sender: '{sender_id}'"])
Tests (just ran)
$ python3 -m pytest tests/test_security_layer.py -q
101 passed in 0.08s
Honest status
Built and fully tested, but dormant. The 101 tests prove the security model works — tier enforcement, injection scanning, access control, audit logging. But no JSON-RPC traffic flows through the system yet, so there's nothing to secure. When inter-agent communication starts using JSON-RPC pipes, this gates it. Until then it sits ready.

#8 — Integration Test Suite (proves the other 7 work together)

Test Suite
What it is (plain English)

Not a service — this is the test suite that proves modules 1–7 work correctly, individually and together. 578 tests across all modules. Tests things like: "does the scheduler correctly submit to the priority queue?", "does the reaper actually kill stuck processes?", "does recovery correctly restore from a checkpoint?"

Full test run (just ran, all 578 tests)
$ python3 -m pytest tests/test_agent_rm.py tests/test_zombie_reaper.py tests/test_clm.py tests/test_lease_manager.py tests/test_ingestion_planner.py tests/test_state_recovery.py tests/test_security_layer.py tests/test_ops_layer_integration.py -q

578 passed in 3.12s
Breakdown by module
Task Scheduler (agent_rm)43 passed
Stuck-Process Cleaner (zombie_reaper)56 passed
Memory Manager (clm)58 passed
Leader Lock (lease_manager)57 passed
Work Planner (ingestion_planner)129 passed
Backup & Recovery (state_recovery)88 passed
Security Gate (security_layer)101 passed
Integration (cross-module)46 passed
Total578 passed

The Wiring: Scheduler Daemon

Running
What it is (plain English)

This is the glue. It runs 24/7 in the background and submits every recurring job (zombie cleanup, credential sync, snapshots, health checks, checkpoint creation) through the Task Scheduler (#1) on a timed schedule. It's the reason modules 1, 2, 5, and 6 are doing real work — the scheduler daemon is what keeps calling them.

Live evidence (from data/scheduler-log.json + config/scheduler-state.json)
500 jobs executed in the log. 492 succeeded, 8 failed (98.4% success rate).

Last runs (all succeeded):
• zombie-cleanup: 2:56 AM today
• credential-sync: 2:59 AM today
• health-check: 2:51 AM today
• state-checkpoint: 2:52 AM today
• git-snapshot: 2:26 AM today
• ops-layer-refresh: 2:52 AM today
📚Library