Skip to content

Fleet Wire Design — Central Ledger Architecture

Overview

hwLedger fleet ledger is a central coordination point for hobbyist-scale infrastructure (tens of devices, hundreds of models). No distributed queueing needed for v1; simple mTLS + JSON/HTTPS + SSH agentless fallback.

Architecture Diagram

┌──────────────────────────────────┐
│  hwledger-server (Axum + SQLite) │
│  ├─ mTLS + JSON/HTTPS endpoint  │
│  ├─ event-sourcing audit log    │
│  ├─ device registry             │
│  └─ cost ledger                 │
└──────────────────────────────────┘
       ▲              ▲              ▲
       │ mTLS/JSON    │ russh SSH    │ reqwest HTTP
       │ (agents)     │ (agentless)  │ (rentals)
       │              │              │
   ┌───┴────┐   ┌─────┴─────┐   ┌───┴──────┐
   │ Agent  │   │ Agent     │   │ Rental   │
   │(LAN)   │   │ (SSH)     │   │ (Vast)   │
   │tsnet   │   │ nvidia-smi│   │ API      │
   └────────┘   └───────────┘   └──────────┘

1. Central Server: Axum + SQLite + mTLS

Why Axum Over gRPC?

AspectAxum + JSONgRPC
ProtocolHTTP/1.1 + JSONHTTP/2 + Protobuf
ScaleTens to hundreds of devices (hobbyist)Thousands+ (production)
ComplexityMinimalProtoc, .proto files, codec overhead
LatencySlightly higher (JSON parsing)Lower (binary)
MultiplexingPer-connection (fine for fleet-of-tens)Built-in (useful at scale)
mtlsrustls + rcgen (simple)tonic requires explicit cert management
ObservabilityHTTP middleware standardgRPC middleware custom

Decision: Axum + JSON for MVP. Switch to gRPC in v2 if fleet exceeds 500 devices.

Server Structure

crates/hwledger-server/Cargo.toml:

toml
[dependencies]
axum = "0.7"
tokio = { version = "1", features = ["full"] }
rustls = "0.23"
rcgen = "0.12"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
sqlx = { version = "0.8", features = ["sqlite", "macros"] }
phenotype-event-sourcing = { path = "../../phenotype-shared/crates/phenotype-event-sourcing" }

Handler Implementation

crates/hwledger-server/src/handlers.rs:

rust
use axum::{
    extract::{ConnectInfo, State},
    http::StatusCode,
    response::IntoResponse,
    Json,
};
use std::sync::Arc;

#[derive(Clone)]
pub struct AppState {
    pub db: sqlx::SqlitePool,
    pub event_log: Arc<phenotype_event_sourcing::Store>,
}

#[derive(serde::Deserialize)]
pub struct RegisterDeviceRequest {
    pub device_id: String,
    pub hostname: String,
    pub device_type: String, // "local", "rental", "ssh"
    pub gpus: Vec<GpuInfo>,
}

#[derive(serde::Serialize)]
pub struct DeviceRegistered {
    pub device_id: String,
    pub registered_at: u64,
}

pub async fn register_device(
    State(state): State<AppState>,
    Json(req): Json<RegisterDeviceRequest>,
) -> impl IntoResponse {
    // Record event
    let event = serde_json::json!({
        "type": "DeviceRegistered",
        "device_id": req.device_id,
        "hostname": req.hostname,
        "device_type": req.device_type,
        "timestamp": std::time::SystemTime::now()
            .duration_since(std::time::UNIX_EPOCH)
            .unwrap()
            .as_secs(),
    });

    if let Err(e) = state.event_log.append(&req.device_id, &event).await {
        return (StatusCode::INTERNAL_SERVER_ERROR, format!("Event log error: {}", e)).into_response();
    }

    // Insert into registry
    let registered_at = std::time::SystemTime::now()
        .duration_since(std::time::UNIX_EPOCH)
        .unwrap()
        .as_secs();

    if let Err(e) = sqlx::query(
        "INSERT INTO devices (device_id, hostname, device_type, registered_at) VALUES (?, ?, ?, ?)"
    )
    .bind(&req.device_id)
    .bind(&req.hostname)
    .bind(&req.device_type)
    .bind(registered_at)
    .execute(&state.db)
    .await
    {
        return (StatusCode::INTERNAL_SERVER_ERROR, format!("DB error: {}", e)).into_response();
    }

    (StatusCode::OK, Json(DeviceRegistered {
        device_id: req.device_id,
        registered_at,
    })).into_response()
}

pub async fn list_devices(
    State(state): State<AppState>,
) -> impl IntoResponse {
    let rows = sqlx::query_as::<_, (String, String, String)>(
        "SELECT device_id, hostname, device_type FROM devices"
    )
    .fetch_all(&state.db)
    .await;

    match rows {
        Ok(devices) => {
            let devices: Vec<_> = devices
                .into_iter()
                .map(|(id, hostname, device_type)| {
                    serde_json::json!({
                        "device_id": id,
                        "hostname": hostname,
                        "device_type": device_type,
                    })
                })
                .collect();
            (StatusCode::OK, Json(serde_json::json!({"devices": devices}))).into_response()
        }
        Err(e) => {
            (StatusCode::INTERNAL_SERVER_ERROR, format!("DB error: {}", e)).into_response()
        }
    }
}

Router Setup

crates/hwledger-server/src/main.rs:

rust
use axum::{
    routing::{get, post},
    Router,
};
use std::sync::Arc;
use tokio::net::TcpListener;

#[tokio::main]
async fn main() {
    // Database setup
    let db = sqlx::sqlite::SqlitePool::connect("sqlite:///var/lib/hwledger/ledger.db")
        .await
        .expect("Failed to connect to database");

    sqlx::migrate!("./migrations")
        .run(&db)
        .await
        .expect("Failed to run migrations");

    // Event sourcing store
    let event_log = Arc::new(
        phenotype_event_sourcing::Store::new("/var/lib/hwledger/events")
            .expect("Failed to init event log")
    );

    let state = AppState {
        db,
        event_log,
    };

    // Routes
    let app = Router::new()
        .route("/api/devices", post(handlers::register_device))
        .route("/api/devices", get(handlers::list_devices))
        .route("/api/dispatch", post(handlers::dispatch_job))
        .route("/health", get(handlers::health))
        .with_state(state)
        .into_make_service_with_connect_info::<std::net::SocketAddr>();

    // mTLS configuration
    let certs = rustls::RootCertStore::empty();
    let client_auth = rustls::server::WebPkiClientVerifier::new(certs);
    
    let config = rustls::ServerConfig::builder()
        .with_client_verifier(client_auth)
        .with_single_cert(
            vec![load_cert("server.crt").unwrap()],
            load_private_key("server.key").unwrap(),
        )
        .unwrap();

    let listener = TcpListener::bind("127.0.0.1:9443")
        .await
        .expect("Failed to bind");

    println!("Server listening on https://127.0.0.1:9443");
    
    axum_server::bind_rustls("127.0.0.1:9443", config)
        .serve(app)
        .await
        .expect("Server error");
}

2. Agent-Side Client: mTLS + JSON

crates/hwledger-agent/src/client.rs:

rust
use reqwest::Client;
use std::sync::Arc;

pub struct LedgerClient {
    http: Client,
    server_url: String,
}

impl LedgerClient {
    pub fn new(server_url: &str, cert_path: &str, key_path: &str) -> Result<Self> {
        let cert = std::fs::read(cert_path)?;
        let key = std::fs::read(key_path)?;

        let identity = reqwest::Identity::from_pem(&cert, &key)?;
        let http = Client::builder()
            .identity(identity)
            .build()?;

        Ok(Self {
            http,
            server_url: server_url.to_string(),
        })
    }

    pub async fn register_device(&self, device_id: &str, info: DeviceInfo) -> Result<()> {
        let resp = self.http
            .post(&format!("{}/api/devices", self.server_url))
            .json(&serde_json::json!({
                "device_id": device_id,
                "hostname": info.hostname,
                "device_type": info.device_type,
                "gpus": info.gpus,
            }))
            .send()
            .await?;

        if !resp.status().is_success() {
            return Err(format!("Registration failed: {}", resp.status()).into());
        }

        Ok(())
    }

    pub async fn report_metrics(&self, device_id: &str, metrics: Metrics) -> Result<()> {
        self.http
            .post(&format!("{}/api/metrics/{}", self.server_url, device_id))
            .json(&metrics)
            .send()
            .await?;

        Ok(())
    }
}

3. Agentless SSH: russh + deadpool

For devices without hwLedger agent, query via SSH:

crates/hwledger-server/src/ssh.rs:

rust
use russh::*;
use std::sync::Arc;

pub struct SshProbe {
    hostname: String,
    username: String,
    key_path: String,
}

impl SshProbe {
    pub async fn query_gpu_status(&self) -> Result<String> {
        let config = Arc::new(Config::default());
        let mut session = client::Session::new(config, self.hostname.clone()).await?;

        // Execute nvidia-smi
        session.authenticate_publickey(
            self.username.clone(),
            Arc::new(russh_keys::key::KeyPair::generate_rsa().unwrap()),
        ).await?;

        let mut channel = session.channel_session().await?;
        channel.exec(true, "nvidia-smi --json").await?;

        let mut output = String::new();
        channel.read_to_string(&mut output).await?;

        Ok(output)
    }
}

// Connection pooling via deadpool
use deadpool::managed::{Object, Pool, PoolError};

pub struct SshConnPool {
    pool: Pool<SshProbe>,
}

impl SshConnPool {
    pub async fn get_connection(&self) -> Result<Object<SshProbe>> {
        self.pool.get().await.map_err(|e| e.into())
    }
}

4. Tailscale Integration

Query tailnet peer status:

rust
pub async fn detect_tailnet_peers() -> Result<Vec<TailnetPeer>> {
    let output = tokio::process::Command::new("tailscale")
        .arg("status")
        .arg("--json")
        .output()
        .await?;

    let status: serde_json::Value = serde_json::from_slice(&output.stdout)?;

    let peers = status["Peer"]
        .as_object()
        .unwrap_or(&serde_json::Map::new())
        .iter()
        .filter_map(|(ip, peer_data)| {
            let hostname = peer_data["HostName"].as_str().unwrap_or("Unknown");
            Some(TailnetPeer {
                ip: ip.clone(),
                hostname: hostname.to_string(),
                online: peer_data["Online"].as_bool().unwrap_or(false),
            })
        })
        .collect();

    Ok(peers)
}

5. Rental Cloud Integration

Query Vast.ai / RunPod / Lambda via their REST APIs:

rust
pub async fn query_vast_ai(api_key: &str) -> Result<Vec<RentalInstance>> {
    let client = reqwest::Client::new();
    
    let resp = client
        .get("https://api.vast.ai/api/v0/instances/")
        .header("Authorization", format!("Bearer {}", api_key))
        .send()
        .await?;

    let data: serde_json::Value = resp.json().await?;

    let instances = data["instances"]
        .as_array()
        .unwrap_or(&vec![])
        .iter()
        .map(|inst| {
            RentalInstance {
                instance_id: inst["id"].as_str().unwrap_or("").to_string(),
                gpu_name: inst["gpu_name"].as_str().unwrap_or("").to_string(),
                status: inst["status_text"].as_str().unwrap_or("unknown").to_string(),
                vram_gb: inst["vram_gb"].as_f64().unwrap_or(0.0),
            }
        })
        .collect();

    Ok(instances)
}

6. Event Sourcing — Audit Log

Using phenotype-event-sourcing crate:

rust
// Event log schema: SHA-256 hash chain, immutable append-only

pub async fn record_dispatch(
    event_log: &Arc<phenotype_event_sourcing::Store>,
    dispatch: &DispatchJob,
) -> Result<()> {
    let event = serde_json::json!({
        "type": "JobDispatched",
        "job_id": dispatch.job_id,
        "model": dispatch.model_id,
        "device": dispatch.target_device,
        "vram_requested_mb": dispatch.vram_mb,
        "timestamp": std::time::SystemTime::now()
            .duration_since(std::time::UNIX_EPOCH)
            .unwrap()
            .as_secs(),
    });

    // Append creates SHA-256 hash chain
    event_log.append(&dispatch.job_id, &event).await?;

    Ok(())
}

// Tamper detection: verify hash chain integrity
pub async fn verify_ledger_integrity(event_log: &Arc<phenotype_event_sourcing::Store>) -> Result<bool> {
    event_log.verify_chain().await
}

Database Schema

sql
CREATE TABLE devices (
    device_id TEXT PRIMARY KEY,
    hostname TEXT NOT NULL,
    device_type TEXT NOT NULL, -- "local", "rental", "ssh"
    registered_at INTEGER NOT NULL
);

CREATE TABLE dispatch_jobs (
    job_id TEXT PRIMARY KEY,
    device_id TEXT NOT NULL,
    model_id TEXT NOT NULL,
    vram_requested_mb INTEGER NOT NULL,
    status TEXT NOT NULL, -- "pending", "running", "completed", "failed"
    created_at INTEGER NOT NULL,
    completed_at INTEGER,
    cost_usd REAL,
    FOREIGN KEY (device_id) REFERENCES devices(device_id)
);

CREATE TABLE gpu_snapshots (
    device_id TEXT NOT NULL,
    timestamp INTEGER NOT NULL,
    gpu_id INTEGER NOT NULL,
    vram_used_mb INTEGER,
    vram_total_mb INTEGER,
    utilization_percent INTEGER,
    temperature_celsius REAL,
    power_watts REAL,
    PRIMARY KEY (device_id, timestamp, gpu_id),
    FOREIGN KEY (device_id) REFERENCES devices(device_id)
);

Security: mTLS Certificate Management

Use rcgen to auto-generate client certs per device:

rust
pub fn generate_client_cert(device_id: &str) -> Result<(Vec<u8>, Vec<u8>)> {
    let subject_alt_names = vec![device_id.to_string()];
    
    let cert = rcgen::generate_simple_self_signed(subject_alt_names)?;
    
    let cert_pem = cert.serialize_pem()?;
    let key_pem = cert.serialize_private_key_pem();
    
    Ok((cert_pem.into_bytes(), key_pem.into_bytes()))
}

See also

  • ADR-0003: Fleet Wire Axum + mTLS
  • Brief 01: oMlx Analysis
  • Brief 03: Inference Engine Matrix
  • crates/hwledger-server/
  • crates/hwledger-agent/

Sources

Released under the Apache 2.0 License.