Architecture
System Overview
See the rendered system diagram: architecture.svg.
At a high level:
- firmware application code runs on top of xylolabs-sdk
- xylolabs-protocol provides the shared XMBP wire format
- xylolabs-hal-* crates implement target-specific platform adapters
- xylolabs-server accepts HTTP/WebSocket ingest, enforces auth, and coordinates persistence
- PostgreSQL stores metadata/index state while MinIO stores audio/blob payloads
Data Flow
- MCU captures audio (I2S) and sensor data, feeds them into
XylolabsClient. - XylolabsClient encodes audio via XAP or ADPCM into the ring buffer, accumulates metadata samples.
- On each
tick(), the client batches metadata into an XMBP packet and sends audio + metadata over HTTP POST (or WebSocket) to the server. - Server authenticates via API key, decodes XMBP, buffers samples in
IngestManager, and flushes to PostgreSQL (metadata) and MinIO (audio chunks in XMCH format).
Crate Dependency Graph
Dependency flow summary:
- xylolabs-server depends on xylolabs-core, xylolabs-db, xylolabs-storage, and xylolabs-transcode
- xylolabs-transcode also depends on xylolabs-core and xylolabs-protocol
- xylolabs-sdk depends on xylolabs-protocol
- xylolabs-hal-* crates sit on top of xylolabs-sdk
Crate Roles
| Crate | Role | no_std |
|---|---|---|
xylolabs-core |
Domain models, DTOs, error types, chunk format (XMCH), XMBP server-side codec, downsampling | No |
xylolabs-db |
SQLx repository layer + PostgreSQL migrations | No |
xylolabs-storage |
S3/MinIO client abstraction | No |
xylolabs-transcode |
FFmpeg worker, XAP server-side decoder, stale job reaper, transcoding pipeline | No |
xylolabs-server |
Axum web server, routes, middleware, ingest engine | No |
xylolabs-protocol |
XMBP wire protocol encoder/decoder (shared with SDK) | Yes |
xylolabs-sdk |
Embedded SDK core: client, session, codecs, transport | Yes |
xylolabs-hal-* |
Per-target Platform trait implementations (not in workspace) | Yes |
Proprietary Protocols (Summary)
XMBP -- Xylolabs Metadata Batch Protocol
Binary big-endian wire protocol for batched sensor/metadata transmission from MCU to server. Shared between the no_std SDK encoder and the server decoder.
- Magic:
0x584D4250("XMBP"), Version: 1 - Supports 11 value types (F64, F32, I64, I32, Bool, String, Bytes, F64Array, F32Array, I32Array, Json)
- Up to 256 streams per batch, microsecond timestamps
- Encoder: zero-copy, zero-alloc into borrowed buffer with sticky overflow
- Decoder: validates all lengths before allocation (DoS-safe)
- Batch sequence wraps at u16::MAX with forward/backward gap detection
Full specification: XMBP-SPECIFICATION.md
XAP -- Xylolabs Audio Protocol
Proprietary MDCT-based spectral audio codec providing ~8:1 to 10:1 compression of 16-bit PCM.
- Sample rates: 8, 16, 24, 32, 48, 96 kHz
- Channels: 1--4 (encoded independently)
- Frame durations: 7.5 ms, 10 ms
- ~10 MIPS per channel (with DSP acceleration), ~8 KB RAM per channel
- Requires FPU; platforms without FPU use IMA-ADPCM instead
- Codec ID in XMBP:
0x03
Server-side decoder: The xylolabs-transcode crate includes an XAP decoder (xap_decode.rs) that performs inverse MDCT + dequantization. When a .xap file is uploaded, the transcode pipeline automatically decodes it to PCM WAV before passing to FFmpeg for transcoding to the target format. Sample rate is inferred from the frame header's frame_samples field.
Full specification: XAP-SPECIFICATION.md
XMCH -- Xylolabs Metadata Chunk Format
Server-side binary format for storing flushed metadata chunks in S3/MinIO.
Layout:
- 32-byte header
- magic: 0x584D4348 (XMCH)
- version: 1
- value type: wire tag
- reserved: 2 bytes
- sample count: u32
- start timestamp: u64 microseconds
- end timestamp: u64 microseconds
- data size: u32
- timestamps column: N × u64 big-endian
- values column: type-dependent payload
Chunks are optionally zstd-compressed via encode_chunk_compressed() before S3 upload.
Server Ingestion Pipeline
IngestManager
Located at crates/xylolabs-server/src/ingest/manager.rs.
- Maintains per-session state with per-stream
StreamBuffers. - Buffers up to 500,000 samples per stream before flush.
- Flush writes encoded XMCH chunks to S3 and metadata records to PostgreSQL.
- Concurrent flush guard (
flushingflag) prevents double-flush. Samples arriving during flush are accumulated separately and merged on completion. - On flush failure, original samples are prepended back for retry.
- Supports live event broadcasting via
tokio::sync::broadcastfor real-time WebSocket subscribers. - Stale session eviction based on last activity timestamp.
Batch Sequence Tracking
The server uses classify_batch_sequence() with forward/backward gap heuristics to distinguish reordering from genuine gaps at u16 wraparound boundaries.
Downsampling
LTTB (Largest Triangle Three Buckets) algorithm at crates/xylolabs-core/src/downsample.rs for query-time visual downsampling of time series data. Preserves visual shape while reducing point count. Applied server-side in the timeline endpoint before returning points to the frontend.
Device Timeline Performance
The GET /api/v1/devices/{id}/timeseries endpoint uses a three-phase pipeline to minimize latency:
- Phase 1 — single batch DB query:
repo::metadata_chunk::list_by_streams(stream_ids[])fetches all chunks for all stream names in one query, eliminating N+1 per-stream DB roundtrips. - Phase 2 — parallel S3 download: all numeric chunks across all stream names are downloaded in one
buffered(128)concurrent pass. Cache lookups happen first: decoded chunks are cached by S3 key inAppState::timeline_chunk_cache(in-memorymini_moka, 24 h TTL,Arc<Vec<MetadataSample>>values for O(1) arc-clone hits). Decoding is offloaded tospawn_blocking. - Phase 3 — anchor, filter, downsample: per-session clock-anchor correction is applied (device clocks may be ahead of server time), then the requested time range filter, then LTTB downsampling to the requested
downsampletarget.
F32Array, F64Array, and I32Array samples are mean-aggregated to a single F64 value per sample before downsampling. This prevents 2 M-point OOM conditions from multi-axis accelerometer streams.
Frontend Charts
Both dashboards (frontend/ admin and frontend-app/ operator) use uPlot (canvas-based) for all time-series rendering: DeviceTimelineChart, StreamChart (numeric and array paths), and VectorTripletChart. uPlot handles large point counts efficiently on canvas. WaveformPlayer (WaveSurfer), FFT spectrogram, boolean bar charts, and data tables are not migrated and remain as-is.
Live Audio Streaming Pipeline
A second ingest path runs alongside IngestManager for continuous low-latency audio. Located at crates/xylolabs-server/src/live/manager.rs (LiveAudioManager).
- Wire path: WebSocket ingest at
/api/v1/live/streams/{stream_key}/ingest(API key +live:ingest) → per-streamtokio::sync::broadcast::Sender<Bytes>(1024-slot capacity) → WebSocket listener at/api/v1/live/streams/{id}/listen.ws(JWT or API key +live:listen). Browsers mint a 5-min listener JWT atPOST /api/v1/auth/live-token. - Archive: every 60 s, raw LC3 frames are zstd-compressed inside
tokio::task::spawn_blocking, written tos3://.../live/{facility}/{stream}/lc3/{start}_{end}.bin.zst, and indexed inlive_archive_segments. A 5-minutemini_mokacache for(retention_days, codec)eliminates the per-flush DB roundtrip. - Retention: 30-minute worker prunes segments older than
retention_days(default 30) and deletes S3 objects withbuffer_unordered(8)parallelism. - Cleanup: 30-minute worker removes broadcast senders that have no subscribers and no recent producer.
- Orphan reaper: server startup closes any
live_stream_connectionsleft open by a prior process (mirrors theingest_sessionscleanup). - Observability: atomic counters (
LIVE_ARCHIVE_FLUSH_SUCCESSES/FAILURES,LIVE_ARCHIVE_FLUSH_LAST_US,LIVE_RETENTION_ROWS_PRUNED) surface atGET /api/v1/live/metrics(SuperAdmin) and as a dashboard panel. - Audit: every mutation on
live_streams(POST/PATCH/DELETE) is written toaudit_logwith actor (user or API key) + before/after state. - Validation gate:
channels1–8,sample_rate_hz∈ {8/16/24/32/48/96 kHz},bitrate_per_channel_bps16k–320k,display_name≤ 200 B,channel_nameslength matcheschannelsand each ≤ 32 B, control chars rejected,transcode_profile∈ {default, low, high},retention_days1–3650. - Nginx log scrubbing: a dedicated
log_format live_scrubbedstrips the listener JWT query param from/listen.wsaccess logs (path-only). Same format applied defensively to ingest.
Database tables: live_streams, live_stream_connections, live_archive_segments (migrations 20260522130000*).
Auth Architecture
Dual Auth Model
| Path | Method | Auth |
|---|---|---|
/api/v1/ingest/* |
API Key | X-Api-Key header, scoped to facility |
/api/v1/auth/* |
Public | Rate-limited (100 req/min per IP) |
All other /api/v1/* |
JWT | Bearer token, RBAC-enforced |
RBAC Hierarchy
super_admin-- global access, manages all facilitiesfacility_admin-- manages own facility's data and users (read + write)user-- read-only access to own facility's data
All JWT-authenticated endpoints enforce require_role() explicitly. Read endpoints
require at least Role::User; mutating endpoints (create, update, delete, acknowledge,
resolve, retranscode) require at least Role::FacilityAdmin.
Rate Limiting
Auth endpoints use in-memory rate limiting via mini_moka::sync::Cache with 60-second TTL windows. Max 100 attempts per IP per minute. Trusted proxy IPs can be configured for correct X-Forwarded-For extraction.
Inference Pipeline Data Flow
External ML clients (GPU servers, cloud functions, or edge workers) fetch closed sessions, run local models, and post results back to the platform.
┌─────────────────────────────────────────────────────────────────────┐
│ Inference Pipeline │
│ │
│ Device ──► Ingest endpoint ──► PostgreSQL + S3 │
│ │ │
│ ┌───────────────────┘ │
│ ▼ │
│ Inference Client │
│ GET /internal/sessions/{id}/inference-bundle │
│ GET /v1/metadata/sessions/{id}/streams/{sid}/data │
│ GET /v1/metadata/sessions/{id}/streams/{sid}/audio │
│ │ │
│ ▼ │
│ Local ML Model │
│ (ONNX / TensorRT / PyTorch / custom) │
│ │ │
│ ▼ │
│ POST /internal/inference/results │
│ POST /internal/inference/results/batch │
│ │ │
│ ▼ │
│ anomaly_reports row created │
│ AnomalyEvent broadcast via tokio::sync::broadcast │
│ │ │
│ ┌──────────┴──────────┐ │
│ ▼ ▼ │
│ GET /internal/anomaly/live Alert bridge │
│ (SSE, operator app) (email/SMS/push/webhook) │
└─────────────────────────────────────────────────────────────────────┘
Authentication: all /api/internal/* endpoints require X-Api-Key with the internal scope. Every key is facility-scoped; cross-facility access is impossible.
Result submission (POST /api/internal/inference/results): the client submits a list of events, each carrying anomaly_type, severity, title, and optional confidence/details/time window. The handler creates one anomaly_reports row per event and broadcasts each as an AnomalyEvent on the facility's broadcast channel.
Batch submission (POST /api/internal/inference/results/batch): up to 100 result objects in a single request. Entries are processed independently; partial failures are reported per-entry without rolling back successes.
Report lifecycle:
- Created with is_resolved: false.
- POST /api/internal/anomaly/reports/{id}/resolve sets is_resolved: true and resolved_at.
- PATCH /api/internal/anomaly/reports/{id} reclassifies anomaly_type, severity, confidence, or description for post-hoc human correction.
Inference bundle (GET /api/internal/sessions/{id}/inference-bundle): aggregates all numeric streams (LTTB-downsampled) and audio stream headers for a session in a single response, minimising round trips for the common fetch-everything pattern.
Internal API & Inference Pipeline
The Internal API (/api/internal/*) provides machine-to-machine endpoints for GPU
server management, inference model/job/proxy handling, and anomaly detection.
Authentication is via API keys carrying the internal scope and a fixed
facility_id; every response is scoped to that facility. See
API Reference §24 for the full endpoint list and DTOs.
GPU Server State Model
Each gpu_servers row carries two orthogonal flags:
status(online/offline/draining/error) gates job scheduling and proxy selection.health_status(healthy/degraded/unknown) is the observability signal driven bygpu_health_checker.
Direct status overrides via PATCH /gpu-servers/{id} bypass the health-checker
state machine and are recorded in the audit log.
GPU Health Checker
crates/xylolabs-server/src/services/gpu_health_checker.rs. Runs on
GPU_HEALTH_CHECK_INTERVAL_SECS (default 60, minimum 10). Selects every
gpu_servers row with status='online' and probes it via
GET http://<ip>:<port>/health with a buffer_unordered(10) concurrency
cap. Successful responses are parsed (≤64 KB) and persisted via
update_health. Persistent non-success statuses or network errors call
mark_server_error; HTTP 429/503 are treated as transient and ignored. All
error bodies are streamed under a 10 KB cap to prevent OOM from compromised
upstreams.
Inference Worker
crates/xylolabs-server/src/services/inference_worker.rs. The bootstrap spawns
INFERENCE_WORKER_CONCURRENCY (default 4) independent tokio tasks. Each task
polls inference_jobs for facilities with queued work and claims a single
job per loop iteration via repo::inference_job::claim_next_queued
(SELECT ... FOR UPDATE SKIP LOCKED). Empty-queue backoff is exponential
(2 → 30 s) and resets on a successful claim. Once claimed:
- Resolve the target GPU (the job's pinned
gpu_server_id, otherwisegpu_server::find_first_available). - Transition the job to
runningonly ifmark_runningreturnsSome— if the job was cancelled between claim and start, the worker drops it without contacting the GPU. POST /v1/inferenceto the GPU. Read the response body through a streaming bounded reader (1 MB cap) to bound memory exposure to a compromised GPU. Persist the parsed result viamark_completed.- On HTTP 429/503, mark the job
failedbut do not take the GPU offline. On any other failure, mark both the job and the GPU server error; failure messages are truncated to 256 characters before storage.
Inference Proxy
POST /api/internal/proxy/inference (routes/inference_proxy.rs) is the
synchronous, low-latency path. The handler resolves
(model_name, model_version) to an active model row, picks a GPU via
find_first_available, and forwards the request with a 30 s timeout. The
upstream response is read with the same 1 MB streaming bound; oversize
responses fail closed with 400 Bad Request. Persistent upstream errors
mark the GPU server with last_error so the health checker can rotate it
out.
Real-time Anomaly Detection
IngestManager::process_batch performs inline anomaly detection. For every
metadata sample, sensor values are checked against configurable thresholds.
When a threshold trips:
- An
anomaly_reportsrow is created withsource='realtime', the offending stream, value, threshold, and timestamp. - An
AnomalyEventis broadcast on atokio::sync::broadcastchannel sized byANOMALY_BROADCAST_CAPACITY(default 10000). TheGET /api/internal/anomaly/liveSSE endpoint forwards events filtered by the subscriber'sfacility_idand emits anevent: lagnotice when the channel overflows.
The hook runs synchronously inside the ingest path, so threshold checks must
remain O(1). The retrospective batch path (POST /api/internal/anomaly/batch)
creates a placeholder report and broadcasts a batch_analysis event;
automated batch analysis itself is not yet wired up.
Anomaly-to-Alert Bridge
When an anomaly report is created, the system evaluates whether it matches any
configured alert rule (threshold, duration, severity). If a rule matches, the
user-facing alert pipeline (/api/v1/alerts) is invoked, creating an alert
record and dispatching notifications (email, SMS, push) per the facility's
alert configuration. This bridges the internal anomaly subsystem with the
user-facing alerting system.