netsky: the code

The netsky source is in-tree now. The file paths and commit hashes are narrative anchors, not placeholders. What follows is the discipline from part 1 rendered as Rust.

atomic envelope write #

src/crates/netsky-core/src/envelope.rs:17 - the write path for every inter-agent message.

pub fn write_envelope(dir: &Path, env: &Envelope) -> Result<PathBuf> {
    let name = canonical_filename(env);
    let dest = dir.join(&name);
    let temp = dir.join(format!(".{}.tmp", &name));

    let mut f = OpenOptions::new()
        .write(true).create_new(true).open(&temp)?;
    f.write_all(&serde_json::to_vec(env)?)?;
    f.sync_all()?;
    drop(f);

    fs::hard_link(&temp, &dest)
        .map_err(|e| anyhow!("hardlink {} -> {}: {e}", temp.display(), dest.display()))?;
    let _ = fs::remove_file(&temp);
    Ok(dest)
}

The temp file gets the payload. The hard link promotes it into the inbox atomically. The reader never sees a half-written envelope. A crash after write_all but before hard_link leaves a .tmp file that the next drain can garbage-collect. A crash after hard_link but before remove_file leaves a harmless dangling temp.

That pattern cost under twenty lines and it makes the bus crash-consistent.

three-state discriminator #

src/crates/netsky-cli/src/cmd/watchdog.rs:1406 - per-clone idle-by-design proof.

stateDiagram-v2
    [*] --> UnknownEmpty: first observed empty
    [*] --> Nonempty: envelope present
    UnknownEmpty --> UnknownEmpty: still empty
    UnknownEmpty --> Nonempty: envelope appears
    Nonempty --> Drained: observed nonempty to empty
    Drained --> Drained: still empty
    Drained --> Nonempty: new envelope

    Drained --> Suppress: pane_stable_since >= drain_since
    Drained --> Page: pane_stable_since < drain_since
    UnknownEmpty --> Page: no proof of work
    Nonempty --> Page: work waiting

The implementation is a single-line state file at ~/.netsky/state/agent<N>-inbox.state, re-evaluated on every watchdog tick:

enum InboxState {
    Nonempty,
    Drained { since_s: u64 },
    UnknownEmpty { since_s: u64 },
}

fn advance(previous: Option<InboxState>, now_s: u64, is_empty: bool) -> InboxState {
    match (previous, is_empty) {
        (_, false) => InboxState::Nonempty,
        (None, true) => InboxState::UnknownEmpty { since_s: now_s },
        (Some(InboxState::Nonempty), true) => InboxState::Drained { since_s: now_s },
        (Some(prev @ (InboxState::Drained { .. } | InboxState::UnknownEmpty { .. })), true) => prev,
    }
}

fn suppress_idle(state: &InboxState, pane_stable_since_s: u64) -> bool {
    match state {
        InboxState::Drained { since_s } => pane_stable_since_s >= *since_s,
        _ => false,
    }
}

The gate has one job: distinguish “clone finished work and went quiet” from “clone wedged mid-work.” The first version treated any empty inbox as proof of idleness. A review swarm caught it the same day. Empty-inbox-from-the-start is not proof of work. Neither is an empty inbox observed to drain after the pane already stopped moving. Both paths fire the page.

mutation ledger #

src/crates/netsky-io/src/mcp.rs:643 - closing the MCP 60-second timeout race.

The JSON-RPC dispatcher runs every tool call in spawn_blocking with a 60-second timeout. A timed-out task keeps running. A retry could duplicate an email send, a drive upload, or a calendar event. The fix:

async fn dispatch_mutation(
    source: &str,
    tool: &str,
    args: &JsonValue,
    request_id: Option<String>,
) -> Result<DispatchOutcome> {
    let args_hash = canonical_hash(args);
    let key = LedgerKey { source, tool, args_hash: args_hash.clone(), request_id };

    if let Some(prior) = ledger_lookup(&key)? {
        return Ok(DispatchOutcome::Replayed(prior));
    }

    let handle = tokio::task::spawn_blocking(move || run_tool(source, tool, args));
    match tokio::time::timeout(Duration::from_secs(60), handle).await {
        Ok(Ok(result)) => {
            ledger_record(&key, &result)?;
            Ok(DispatchOutcome::Completed(result))
        }
        Ok(Err(join_err)) => Err(join_err.into()),
        Err(_elapsed) => Ok(DispatchOutcome::TimeoutRace { args_hash }),
    }
}

TimeoutRace becomes a typed field in the JSON response. The caller is told to read before retry: query the source for the current state, then decide whether to submit the same request id again. The rule is in the docstring at src/crates/netsky-io/src/mcp.rs:548: read before retrying any mutation.

Coverage is every mutating tool across email (send, create_draft, send_draft, archive, trash), drive (upload, copy, share, folder, move, rename, delete, empty-trash), calendar (create, delete), and tasks (create, complete, delete).

channel watch #

src/crates/netsky-cli/src/cmd/channel.rs:107 - the tmux-paste delivery path that replaced the Codex CLI drain hack.

pub fn watch(agent: &str, tmux_session: &str) -> Result<()> {
    let inbox = channel_inbox_dir(agent)?;
    loop {
        let mut pending = list_pending(&inbox)?;
        pending.sort();
        for path in pending {
            let env = claim(&path)?;
            let text = xml_escape_body(&env.text);
            let pane_line = format!(
                "<channel source=\"agent\" chat_id=\"{}\" from=\"{}\" ts=\"{}\">\n{}\n</channel>",
                agent, env.from, env.ts, text,
            );
            tmux_paste(tmux_session, &pane_line)?;
            tmux_submit(tmux_session)?;
            channel_ack(agent, &env, AckStatus::Received, None)?;
            archive(&path)?;
        }
        std::thread::sleep(Duration::from_millis(750));
    }
}

The Codex CLI resident REPL does not subscribe to filesystem inboxes. Before this, agent0 had to tmux-send-keys a netsky channel drain <agent> command into every Codex CLI pane after every dispatch. netsky channel watch runs as a long-lived process on the receiving side. It drains on its own cadence, pastes the envelope text into the pane, and fires the submit key. The ack JSONL records every delivery for forensic replay.

It is the mechanical answer to a problem that had been solved with convention. Conventions rot. Mechanical gates do not.

unified events reader #

src/crates/netsky-cli/src/cmd/events.rs:78 - the merged timeline.

pub fn run_events(filter: EventsFilter) -> Result<()> {
    let mut events: Vec<Event> = Vec::new();
    events.extend(read_watchdog_events_jsonl(&filter)?);
    events.extend(read_channel_acks_jsonl(&filter)?);
    events.extend(read_restart_status_jsons(&filter)?);
    events.extend(read_escalate_failed_markers(&filter)?);
    if let Ok(db) = Db::open_readonly() {
        events.extend(db.list_communication_events(&filter)?);
    }
    events.sort_by_key(|e| e.ts);
    if let Some(limit) = filter.limit {
        events.truncate(limit);
    }
    if filter.json {
        println!("{}", serde_json::to_string_pretty(&events)?);
    } else {
        for e in events {
            println!("{} | {} | {} | {}", e.ts, e.kind, e.agent.as_deref().unwrap_or("-"), e.detail_short());
        }
    }
    Ok(())
}

Five sources. Five schemas. One Event enum to merge them. The meta.db read is wrapped in a fallible Db::open_readonly() so the reader still returns useful output when the analytical backend is down.

That is the shape of a tool designed for the actual failure mode - a file-based system where the database is not a dependency, just the preferred index.

turso backend open #

src/crates/netsky-db/src/lib.rs - the meta.db setup that replaced redb with turso.

pub fn open_or_create(path: &Path) -> Result<Db> {
    let conn = turso::Connection::open(path)?;
    conn.execute_batch("
        PRAGMA journal_mode = WAL;
        PRAGMA busy_timeout = 5000;
        PRAGMA synchronous = NORMAL;
    ")?;
    ensure_schema(&conn)?;
    Ok(Db { conn: Mutex::new(conn) })
}

pub fn record_communication_event(&self, ev: &CommunicationEventRecord) -> Result<()> {
    let result = self.try_record(ev);
    if let Err(e) = &result {
        let _ = append_jsonl(errors_log_path()?, &JsonlError::from(ev, e));
    }
    result.or_else(|_| Ok(()))  // best-effort at the top layer
}

WAL mode plus a 5-second busy timeout resolves the concurrent-writer contention that produced 762 lock errors in one 24-hour window on the prior redb backend. Post-swap: zero.

The error path matters. Every write wraps in a best-effort layer that appends to a JSONL error log when the primary write fails. The analytical store can be locked, corrupted, or simply absent. The operational layer keeps running, with a durable trail of what the database missed.

That is cybernetic architecture. The analytical subsystem is a probe, not a dependency.

escalate - the algedonic path #

src/crates/netsky-cli/src/cmd/escalate.rs:1 - pain signals to the owner.

pub fn escalate(subject: &str, body: Option<&str>) -> Result<()> {
    let attempt1 = try_send(&compose(subject, body));
    if attempt1.is_ok() {
        return Ok(());
    }
    std::thread::sleep(Duration::from_millis(1000));
    let attempt2 = try_send(&compose(subject, body));
    if attempt2.is_ok() {
        return Ok(());
    }
    let marker = escalate_failed_marker(chrono::Utc::now())?;
    fs::write(&marker, format_failure(subject, body, &attempt1, &attempt2))?;
    bail!("escalate: both attempts failed; marker at {}", marker.display())
}

fn try_send(text: &str) -> Result<()> {
    let script = format!(
        r#"tell application "Messages" to send "{}" to buddy "{}""#,
        escape_applescript(text), OWNER_HANDLE,
    );
    let out = Command::new("osascript").arg("-e").arg(&script).output()?;
    if out.status.success() { Ok(()) } else { Err(anyhow!("osascript: {}", String::from_utf8_lossy(&out.stderr))) }
}

No model. No MCP. No netsky-io. This path is osascript against Messages.app directly, because the algedonic channel has to survive the collapse of every higher layer. If the watchdog ticks but the MCP server is dead, escalate still reaches the owner’s phone. If osascript hangs, the 1-second backoff + one retry gives the OS a chance to recover. If both attempts fail, the durable marker under ~/.netsky/state/escalate-failed-<ts> records the failure so the next diagnostic tool can see it.

Beer’s rule: pain must reach policy faster than routine regulation. netsky escalate is thirteen lines of Rust and one shell-out, because the whole point is to minimize the surface between the failure and the owner’s attention.

what the code wants to be #

Every one of these snippets has the same pattern. The gate lives right next to the failure it catches. There is no central “safety engine.” There is no universal middleware. Each failure class gets the smallest mechanism that closes it and no more.

That is the practical version of cybernetic design. It is not a framework. It is a habit of writing the check at the scene of the crime.

The recurring failure from this week is in part 1 as the theory. The architecture is in part 2. This is where the architecture shows up as files you can grep.


Part 3 of 3. Part 1: netsky: the cybernetics. Part 2: netsky: the system.