Skip to content

Architecture

System Overview

See the rendered system diagram: architecture.svg.

At a high level: - firmware application code runs on top of xylolabs-sdk - xylolabs-protocol provides the shared XMBP wire format - xylolabs-hal-* crates implement target-specific platform adapters - xylolabs-server accepts HTTP/WebSocket ingest, enforces auth, and coordinates persistence - PostgreSQL stores metadata/index state while MinIO stores audio/blob payloads

Data Flow

  1. MCU captures audio (I2S) and sensor data, feeds them into XylolabsClient.
  2. XylolabsClient encodes audio via XAP or ADPCM into the ring buffer, accumulates metadata samples.
  3. On each tick(), the client batches metadata into an XMBP packet and sends audio + metadata over HTTP POST (or WebSocket) to the server.
  4. Server authenticates via API key, decodes XMBP, buffers samples in IngestManager, and flushes to PostgreSQL (metadata) and MinIO (audio chunks in XMCH format).

Crate Dependency Graph

Dependency flow summary: - xylolabs-server depends on xylolabs-core, xylolabs-db, xylolabs-storage, and xylolabs-transcode - xylolabs-transcode also depends on xylolabs-core and xylolabs-protocol - xylolabs-sdk depends on xylolabs-protocol - xylolabs-hal-* crates sit on top of xylolabs-sdk

Crate Roles

Crate Role no_std
xylolabs-core Domain models, DTOs, error types, chunk format (XMCH), XMBP server-side codec, downsampling No
xylolabs-db SQLx repository layer + PostgreSQL migrations No
xylolabs-storage S3/MinIO client abstraction No
xylolabs-transcode FFmpeg worker, XAP server-side decoder, stale job reaper, transcoding pipeline No
xylolabs-server Axum web server, routes, middleware, ingest engine No
xylolabs-protocol XMBP wire protocol encoder/decoder (shared with SDK) Yes
xylolabs-sdk Embedded SDK core: client, session, codecs, transport Yes
xylolabs-hal-* Per-target Platform trait implementations (not in workspace) Yes

Proprietary Protocols (Summary)

XMBP -- Xylolabs Metadata Batch Protocol

Binary big-endian wire protocol for batched sensor/metadata transmission from MCU to server. Shared between the no_std SDK encoder and the server decoder.

  • Magic: 0x584D4250 ("XMBP"), Version: 1
  • Supports 11 value types (F64, F32, I64, I32, Bool, String, Bytes, F64Array, F32Array, I32Array, Json)
  • Up to 256 streams per batch, microsecond timestamps
  • Encoder: zero-copy, zero-alloc into borrowed buffer with sticky overflow
  • Decoder: validates all lengths before allocation (DoS-safe)
  • Batch sequence wraps at u16::MAX with forward/backward gap detection

Full specification: XMBP-SPECIFICATION.md

XAP -- Xylolabs Audio Protocol

Proprietary MDCT-based spectral audio codec providing ~8:1 to 10:1 compression of 16-bit PCM.

  • Sample rates: 8, 16, 24, 32, 48, 96 kHz
  • Channels: 1--4 (encoded independently)
  • Frame durations: 7.5 ms, 10 ms
  • ~10 MIPS per channel (with DSP acceleration), ~8 KB RAM per channel
  • Requires FPU; platforms without FPU use IMA-ADPCM instead
  • Codec ID in XMBP: 0x03

Server-side decoder: The xylolabs-transcode crate includes an XAP decoder (xap_decode.rs) that performs inverse MDCT + dequantization. When a .xap file is uploaded, the transcode pipeline automatically decodes it to PCM WAV before passing to FFmpeg for transcoding to the target format. Sample rate is inferred from the frame header's frame_samples field.

Full specification: XAP-SPECIFICATION.md

XMCH -- Xylolabs Metadata Chunk Format

Server-side binary format for storing flushed metadata chunks in S3/MinIO.

Layout: - 32-byte header - magic: 0x584D4348 (XMCH) - version: 1 - value type: wire tag - reserved: 2 bytes - sample count: u32 - start timestamp: u64 microseconds - end timestamp: u64 microseconds - data size: u32 - timestamps column: N × u64 big-endian - values column: type-dependent payload

Chunks are optionally zstd-compressed via encode_chunk_compressed() before S3 upload.

Server Ingestion Pipeline

IngestManager

Located at crates/xylolabs-server/src/ingest/manager.rs.

  • Maintains per-session state with per-stream StreamBuffers.
  • Buffers up to 500,000 samples per stream before flush.
  • Flush writes encoded XMCH chunks to S3 and metadata records to PostgreSQL.
  • Concurrent flush guard (flushing flag) prevents double-flush. Samples arriving during flush are accumulated separately and merged on completion.
  • On flush failure, original samples are prepended back for retry.
  • Supports live event broadcasting via tokio::sync::broadcast for real-time WebSocket subscribers.
  • Stale session eviction based on last activity timestamp.

Batch Sequence Tracking

The server uses classify_batch_sequence() with forward/backward gap heuristics to distinguish reordering from genuine gaps at u16 wraparound boundaries.

Downsampling

LTTB (Largest Triangle Three Buckets) algorithm at crates/xylolabs-core/src/downsample.rs for query-time visual downsampling of time series data. Preserves visual shape while reducing point count. Applied server-side in the timeline endpoint before returning points to the frontend.

Device Timeline Performance

The GET /api/v1/devices/{id}/timeseries endpoint uses a three-phase pipeline to minimize latency:

  1. Phase 1 — single batch DB query: repo::metadata_chunk::list_by_streams(stream_ids[]) fetches all chunks for all stream names in one query, eliminating N+1 per-stream DB roundtrips.
  2. Phase 2 — parallel S3 download: all numeric chunks across all stream names are downloaded in one buffered(128) concurrent pass. Cache lookups happen first: decoded chunks are cached by S3 key in AppState::timeline_chunk_cache (in-memory mini_moka, 24 h TTL, Arc<Vec<MetadataSample>> values for O(1) arc-clone hits). Decoding is offloaded to spawn_blocking.
  3. Phase 3 — anchor, filter, downsample: per-session clock-anchor correction is applied (device clocks may be ahead of server time), then the requested time range filter, then LTTB downsampling to the requested downsample target.

F32Array, F64Array, and I32Array samples are mean-aggregated to a single F64 value per sample before downsampling. This prevents 2 M-point OOM conditions from multi-axis accelerometer streams.

Frontend Charts

Both dashboards (frontend/ admin and frontend-app/ operator) use uPlot (canvas-based) for all time-series rendering: DeviceTimelineChart, StreamChart (numeric and array paths), and VectorTripletChart. uPlot handles large point counts efficiently on canvas. WaveformPlayer (WaveSurfer), FFT spectrogram, boolean bar charts, and data tables are not migrated and remain as-is.

Live Audio Streaming Pipeline

A second ingest path runs alongside IngestManager for continuous low-latency audio. Located at crates/xylolabs-server/src/live/manager.rs (LiveAudioManager).

  • Wire path: WebSocket ingest at /api/v1/live/streams/{stream_key}/ingest (API key + live:ingest) → per-stream tokio::sync::broadcast::Sender<Bytes> (1024-slot capacity) → WebSocket listener at /api/v1/live/streams/{id}/listen.ws (JWT or API key + live:listen). Browsers mint a 5-min listener JWT at POST /api/v1/auth/live-token.
  • Archive: every 60 s, raw LC3 frames are zstd-compressed inside tokio::task::spawn_blocking, written to s3://.../live/{facility}/{stream}/lc3/{start}_{end}.bin.zst, and indexed in live_archive_segments. A 5-minute mini_moka cache for (retention_days, codec) eliminates the per-flush DB roundtrip.
  • Retention: 30-minute worker prunes segments older than retention_days (default 30) and deletes S3 objects with buffer_unordered(8) parallelism.
  • Cleanup: 30-minute worker removes broadcast senders that have no subscribers and no recent producer.
  • Orphan reaper: server startup closes any live_stream_connections left open by a prior process (mirrors the ingest_sessions cleanup).
  • Observability: atomic counters (LIVE_ARCHIVE_FLUSH_SUCCESSES/FAILURES, LIVE_ARCHIVE_FLUSH_LAST_US, LIVE_RETENTION_ROWS_PRUNED) surface at GET /api/v1/live/metrics (SuperAdmin) and as a dashboard panel.
  • Audit: every mutation on live_streams (POST/PATCH/DELETE) is written to audit_log with actor (user or API key) + before/after state.
  • Validation gate: channels 1–8, sample_rate_hz ∈ {8/16/24/32/48/96 kHz}, bitrate_per_channel_bps 16k–320k, display_name ≤ 200 B, channel_names length matches channels and each ≤ 32 B, control chars rejected, transcode_profile ∈ {default, low, high}, retention_days 1–3650.
  • Nginx log scrubbing: a dedicated log_format live_scrubbed strips the listener JWT query param from /listen.ws access logs (path-only). Same format applied defensively to ingest.

Database tables: live_streams, live_stream_connections, live_archive_segments (migrations 20260522130000*).

Auth Architecture

Dual Auth Model

Path Method Auth
/api/v1/ingest/* API Key X-Api-Key header, scoped to facility
/api/v1/auth/* Public Rate-limited (100 req/min per IP)
All other /api/v1/* JWT Bearer token, RBAC-enforced

RBAC Hierarchy

  • super_admin -- global access, manages all facilities
  • facility_admin -- manages own facility's data and users (read + write)
  • user -- read-only access to own facility's data

All JWT-authenticated endpoints enforce require_role() explicitly. Read endpoints require at least Role::User; mutating endpoints (create, update, delete, acknowledge, resolve, retranscode) require at least Role::FacilityAdmin.

Rate Limiting

Auth endpoints use in-memory rate limiting via mini_moka::sync::Cache with 60-second TTL windows. Max 100 attempts per IP per minute. Trusted proxy IPs can be configured for correct X-Forwarded-For extraction.

Inference Pipeline Data Flow

External ML clients (GPU servers, cloud functions, or edge workers) fetch closed sessions, run local models, and post results back to the platform.

┌─────────────────────────────────────────────────────────────────────┐
│                        Inference Pipeline                           │
│                                                                     │
│  Device ──► Ingest endpoint ──► PostgreSQL + S3                    │
│                                        │                           │
│                    ┌───────────────────┘                           │
│                    ▼                                                │
│           Inference Client                                          │
│    GET /internal/sessions/{id}/inference-bundle                    │
│    GET /v1/metadata/sessions/{id}/streams/{sid}/data               │
│    GET /v1/metadata/sessions/{id}/streams/{sid}/audio              │
│                    │                                                │
│                    ▼                                                │
│           Local ML Model                                            │
│    (ONNX / TensorRT / PyTorch / custom)                            │
│                    │                                                │
│                    ▼                                                │
│    POST /internal/inference/results                                 │
│    POST /internal/inference/results/batch                          │
│                    │                                                │
│                    ▼                                                │
│    anomaly_reports row created                                      │
│    AnomalyEvent broadcast via tokio::sync::broadcast               │
│                    │                                                │
│         ┌──────────┴──────────┐                                    │
│         ▼                     ▼                                    │
│  GET /internal/anomaly/live   Alert bridge                         │
│  (SSE, operator app)          (email/SMS/push/webhook)             │
└─────────────────────────────────────────────────────────────────────┘

Authentication: all /api/internal/* endpoints require X-Api-Key with the internal scope. Every key is facility-scoped; cross-facility access is impossible.

Result submission (POST /api/internal/inference/results): the client submits a list of events, each carrying anomaly_type, severity, title, and optional confidence/details/time window. The handler creates one anomaly_reports row per event and broadcasts each as an AnomalyEvent on the facility's broadcast channel.

Batch submission (POST /api/internal/inference/results/batch): up to 100 result objects in a single request. Entries are processed independently; partial failures are reported per-entry without rolling back successes.

Report lifecycle: - Created with is_resolved: false. - POST /api/internal/anomaly/reports/{id}/resolve sets is_resolved: true and resolved_at. - PATCH /api/internal/anomaly/reports/{id} reclassifies anomaly_type, severity, confidence, or description for post-hoc human correction.

Inference bundle (GET /api/internal/sessions/{id}/inference-bundle): aggregates all numeric streams (LTTB-downsampled) and audio stream headers for a session in a single response, minimising round trips for the common fetch-everything pattern.

Internal API & Inference Pipeline

The Internal API (/api/internal/*) provides machine-to-machine endpoints for GPU server management, inference model/job/proxy handling, and anomaly detection. Authentication is via API keys carrying the internal scope and a fixed facility_id; every response is scoped to that facility. See API Reference §24 for the full endpoint list and DTOs.

GPU Server State Model

Each gpu_servers row carries two orthogonal flags:

  • status (online / offline / draining / error) gates job scheduling and proxy selection.
  • health_status (healthy / degraded / unknown) is the observability signal driven by gpu_health_checker.

Direct status overrides via PATCH /gpu-servers/{id} bypass the health-checker state machine and are recorded in the audit log.

GPU Health Checker

crates/xylolabs-server/src/services/gpu_health_checker.rs. Runs on GPU_HEALTH_CHECK_INTERVAL_SECS (default 60, minimum 10). Selects every gpu_servers row with status='online' and probes it via GET http://<ip>:<port>/health with a buffer_unordered(10) concurrency cap. Successful responses are parsed (≤64 KB) and persisted via update_health. Persistent non-success statuses or network errors call mark_server_error; HTTP 429/503 are treated as transient and ignored. All error bodies are streamed under a 10 KB cap to prevent OOM from compromised upstreams.

Inference Worker

crates/xylolabs-server/src/services/inference_worker.rs. The bootstrap spawns INFERENCE_WORKER_CONCURRENCY (default 4) independent tokio tasks. Each task polls inference_jobs for facilities with queued work and claims a single job per loop iteration via repo::inference_job::claim_next_queued (SELECT ... FOR UPDATE SKIP LOCKED). Empty-queue backoff is exponential (2 → 30 s) and resets on a successful claim. Once claimed:

  1. Resolve the target GPU (the job's pinned gpu_server_id, otherwise gpu_server::find_first_available).
  2. Transition the job to running only if mark_running returns Some — if the job was cancelled between claim and start, the worker drops it without contacting the GPU.
  3. POST /v1/inference to the GPU. Read the response body through a streaming bounded reader (1 MB cap) to bound memory exposure to a compromised GPU. Persist the parsed result via mark_completed.
  4. On HTTP 429/503, mark the job failed but do not take the GPU offline. On any other failure, mark both the job and the GPU server error; failure messages are truncated to 256 characters before storage.

Inference Proxy

POST /api/internal/proxy/inference (routes/inference_proxy.rs) is the synchronous, low-latency path. The handler resolves (model_name, model_version) to an active model row, picks a GPU via find_first_available, and forwards the request with a 30 s timeout. The upstream response is read with the same 1 MB streaming bound; oversize responses fail closed with 400 Bad Request. Persistent upstream errors mark the GPU server with last_error so the health checker can rotate it out.

Real-time Anomaly Detection

IngestManager::process_batch performs inline anomaly detection. For every metadata sample, sensor values are checked against configurable thresholds. When a threshold trips:

  1. An anomaly_reports row is created with source='realtime', the offending stream, value, threshold, and timestamp.
  2. An AnomalyEvent is broadcast on a tokio::sync::broadcast channel sized by ANOMALY_BROADCAST_CAPACITY (default 10000). The GET /api/internal/anomaly/live SSE endpoint forwards events filtered by the subscriber's facility_id and emits an event: lag notice when the channel overflows.

The hook runs synchronously inside the ingest path, so threshold checks must remain O(1). The retrospective batch path (POST /api/internal/anomaly/batch) creates a placeholder report and broadcasts a batch_analysis event; automated batch analysis itself is not yet wired up.

Anomaly-to-Alert Bridge

When an anomaly report is created, the system evaluates whether it matches any configured alert rule (threshold, duration, severity). If a rule matches, the user-facing alert pipeline (/api/v1/alerts) is invoked, creating an alert record and dispatching notifications (email, SMS, push) per the facility's alert configuration. This bridges the internal anomaly subsystem with the user-facing alerting system.