Live Audio Streaming — Phase 0 (Foundations) Implementation Plan
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Land the database schema, domain models, repositories, DTOs, and a skeleton LiveAudioManager for the live audio streaming feature — enough plumbing that subsequent phases (ingest, listen, HLS, frontend, SDK) can be built independently against a stable contract.
Architecture: Three new PostgreSQL tables (live_streams, live_stream_connections, live_archive_segments) backed by SQLx repos, surfaced through Rust models + DTOs in xylolabs-core. A new crates/xylolabs-server/src/live/manager.rs module exposes a LiveAudioManager whose public methods exist as stubs returning AppError::NotImplemented so callers in later phases can wire to them without breaking the build.
Tech Stack: Rust 2024 edition, Axum 0.8, SQLx 0.8 (PostgreSQL), Tokio, tokio::sync::broadcast, uuid, chrono, serde.
Source spec: docs/superpowers/specs/2026-05-22-live-streaming-design.md
File Structure
Create
| Path | Responsibility |
|---|---|
crates/xylolabs-db/migrations/20260522130000_create_live_streams.sql |
DDL for live_streams table |
crates/xylolabs-db/migrations/20260522130100_create_live_stream_connections.sql |
DDL for live_stream_connections table |
crates/xylolabs-db/migrations/20260522130200_create_live_archive_segments.sql |
DDL for live_archive_segments table |
crates/xylolabs-core/src/models/live_stream.rs |
LiveStream domain row |
crates/xylolabs-core/src/models/live_stream_connection.rs |
LiveStreamConnection domain row |
crates/xylolabs-core/src/models/live_archive_segment.rs |
LiveArchiveSegment domain row |
crates/xylolabs-core/src/dto/live_stream.rs |
Request / response DTOs |
crates/xylolabs-db/src/repo/live_stream.rs |
CRUD repo |
crates/xylolabs-db/src/repo/live_stream_connection.rs |
CRUD repo |
crates/xylolabs-db/src/repo/live_archive_segment.rs |
CRUD repo |
crates/xylolabs-server/src/live/mod.rs |
New module index |
crates/xylolabs-server/src/live/manager.rs |
LiveAudioManager skeleton |
crates/xylolabs-server/tests/api_live_repo.rs |
Integration test (Docker fixtures) |
Modify
| Path | What changes |
|---|---|
crates/xylolabs-core/src/models/mod.rs |
export 3 new model modules |
crates/xylolabs-core/src/dto/mod.rs |
export live_stream DTO module |
crates/xylolabs-db/src/repo/mod.rs |
export 3 new repo modules |
crates/xylolabs-server/src/state.rs |
add pub live: Arc<LiveAudioManager> to AppState |
crates/xylolabs-server/src/main.rs |
construct LiveAudioManager at boot and pass to AppState |
crates/xylolabs-server/src/lib.rs (or src/main.rs mod tree) |
pub mod live; |
crates/xylolabs-server/src/error.rs |
add AppError::NotImplemented(String) variant + 501 mapping |
crates/xylolabs-server/tests/common/mod.rs |
construct the live manager in the test AppState |
Task 1: live_streams Migration
Files:
- Create: crates/xylolabs-db/migrations/20260522130000_create_live_streams.sql
- [ ] Step 1: Verify monotonicity reservation
The latest existing prefix is 20260520000002. The Phase-0 migrations are at 20260522130000, 20260522130100, 20260522130200. Confirm none collide:
ls crates/xylolabs-db/migrations | grep -E "20260522130[012]"
Expected: empty (no existing files).
- [ ] Step 2: Write the migration SQL
-- 20260522130000_create_live_streams.sql
-- Phase 0 (P672 followup, cycle 5 2026-05-22): logical channel per
-- (device, port). One row exists per audio source on a device; the
-- stream survives reboots and reconnects (the stream_id is stable,
-- not session-scoped).
CREATE TABLE live_streams (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
facility_id uuid NOT NULL REFERENCES facilities(id) ON DELETE CASCADE,
device_id uuid NOT NULL REFERENCES devices(id) ON DELETE CASCADE,
port_index smallint NOT NULL DEFAULT 0,
stream_key text NOT NULL UNIQUE,
display_name text NOT NULL,
channels smallint NOT NULL CHECK (channels BETWEEN 1 AND 8),
channel_names text[] NOT NULL DEFAULT '{}',
sample_rate_hz integer NOT NULL,
codec text NOT NULL DEFAULT 'lc3',
bitrate_per_channel_bps integer NOT NULL DEFAULT 64000,
frame_duration_us integer NOT NULL DEFAULT 10000,
transcode_profile text NOT NULL DEFAULT 'default',
state text NOT NULL DEFAULT 'idle'
CHECK (state IN ('idle','live','paused')),
last_connected_at timestamptz,
last_disconnected_at timestamptz,
total_seconds_live bigint NOT NULL DEFAULT 0,
retention_days integer,
metadata jsonb NOT NULL DEFAULT '{}'::jsonb,
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now(),
deleted_at timestamptz
);
CREATE UNIQUE INDEX live_streams_facility_device_port_uq
ON live_streams (facility_id, device_id, port_index)
WHERE deleted_at IS NULL;
CREATE INDEX live_streams_facility_state_ix
ON live_streams (facility_id, state)
WHERE deleted_at IS NULL;
CREATE INDEX live_streams_stream_key_ix
ON live_streams (stream_key);
- [ ] Step 3: Verify monotonicity
ls crates/xylolabs-db/migrations | awk -F_ '{print $1}' | sort -c
Expected: silent (sorted). Failure indicates a duplicate / non-monotonic prefix and must be resolved before continuing.
- [ ] Step 4: Run the dev DB up
docker compose -f docker-compose.dev.yml up -d postgres
sqlx migrate run --source crates/xylolabs-db/migrations \
--database-url postgresql://xylolabs:xylolabs@localhost:5432/xylolabs
Expected output ends with: Applied 20260522130000/migrate create live streams. No errors.
- [ ] Step 5: Verify the table shape
docker exec xylolabs-api-postgres-1 psql -U xylolabs -d xylolabs -c '\d live_streams'
Expected: 21 columns, 3 indexes (live_streams_pkey, live_streams_facility_device_port_uq, live_streams_facility_state_ix, live_streams_stream_key_ix, and the implicit stream_key unique index).
- [ ] Step 6: Commit
git add crates/xylolabs-db/migrations/20260522130000_create_live_streams.sql
git commit -S -m "feat(db): ✨ add live_streams table (Phase 0)"
Task 2: live_stream_connections Migration
Files:
- Create: crates/xylolabs-db/migrations/20260522130100_create_live_stream_connections.sql
- [ ] Step 1: Write the migration SQL
-- 20260522130100_create_live_stream_connections.sql
-- Per-WS connection audit log. One row per physical WebSocket
-- connection a device opens against `/api/v1/live/streams/{key}/ingest`.
-- `ended_at IS NULL` means the connection is currently open.
CREATE TABLE live_stream_connections (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
stream_id uuid NOT NULL REFERENCES live_streams(id) ON DELETE CASCADE,
api_key_id uuid NOT NULL REFERENCES api_keys(id),
started_at timestamptz NOT NULL DEFAULT now(),
ended_at timestamptz,
client_addr inet,
disconnect_reason text,
samples_received bigint NOT NULL DEFAULT 0,
bytes_received bigint NOT NULL DEFAULT 0,
last_batch_seq integer
);
CREATE INDEX live_stream_connections_stream_started_ix
ON live_stream_connections (stream_id, started_at DESC);
CREATE INDEX live_stream_connections_apikey_started_ix
ON live_stream_connections (api_key_id, started_at DESC);
- [ ] Step 2: Verify monotonicity
ls crates/xylolabs-db/migrations | awk -F_ '{print $1}' | sort -c
Expected: silent.
- [ ] Step 3: Run migration
sqlx migrate run --source crates/xylolabs-db/migrations \
--database-url postgresql://xylolabs:xylolabs@localhost:5432/xylolabs
Expected: Applied 20260522130100/migrate create live stream connections.
- [ ] Step 4: Verify shape
docker exec xylolabs-api-postgres-1 psql -U xylolabs -d xylolabs -c '\d live_stream_connections'
Expected: 10 columns, 3 indexes (PK + the two named ones).
- [ ] Step 5: Commit
git add crates/xylolabs-db/migrations/20260522130100_create_live_stream_connections.sql
git commit -S -m "feat(db): ✨ add live_stream_connections audit table (Phase 0)"
Task 3: live_archive_segments Migration
Files:
- Create: crates/xylolabs-db/migrations/20260522130200_create_live_archive_segments.sql
- [ ] Step 1: Write the migration SQL
-- 20260522130200_create_live_archive_segments.sql
-- Unified index over LC3 zstd chunks AND HLS fMP4 segments archived to
-- S3 for a live stream. `kind` discriminates: 'lc3_zstd' (lossless,
-- transcode source); 'hls_init' (single init.mp4 per stream codec);
-- 'hls_m4s' (one per media segment). `sequence_num` is the HLS media
-- sequence (NULL for lc3_zstd).
CREATE TABLE live_archive_segments (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
stream_id uuid NOT NULL REFERENCES live_streams(id) ON DELETE CASCADE,
kind text NOT NULL CHECK (kind IN ('lc3_zstd','hls_init','hls_m4s')),
s3_key text NOT NULL,
sequence_num bigint,
start_us bigint NOT NULL,
duration_us bigint NOT NULL,
byte_size bigint NOT NULL,
discontinuity boolean NOT NULL DEFAULT false,
created_at timestamptz NOT NULL DEFAULT now()
);
CREATE INDEX live_archive_segments_stream_start_ix
ON live_archive_segments (stream_id, start_us);
CREATE INDEX live_archive_segments_stream_kind_seq_ix
ON live_archive_segments (stream_id, kind, sequence_num);
CREATE INDEX live_archive_segments_created_ix
ON live_archive_segments (created_at);
- [ ] Step 2: Verify monotonicity
ls crates/xylolabs-db/migrations | awk -F_ '{print $1}' | sort -c
Expected: silent.
- [ ] Step 3: Run migration
sqlx migrate run --source crates/xylolabs-db/migrations \
--database-url postgresql://xylolabs:xylolabs@localhost:5432/xylolabs
Expected: Applied 20260522130200/migrate create live archive segments.
- [ ] Step 4: Verify shape
docker exec xylolabs-api-postgres-1 psql -U xylolabs -d xylolabs -c '\d live_archive_segments'
Expected: 10 columns, 4 indexes.
- [ ] Step 5: Commit
git add crates/xylolabs-db/migrations/20260522130200_create_live_archive_segments.sql
git commit -S -m "feat(db): ✨ add live_archive_segments table (Phase 0)"
Task 4: LiveStream Model
Files:
- Create: crates/xylolabs-core/src/models/live_stream.rs
- Modify: crates/xylolabs-core/src/models/mod.rs
- [ ] Step 1: Write the failing test
Create crates/xylolabs-core/src/models/live_stream.rs (initial form for the test only):
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
pub struct LiveStream {
pub id: Uuid,
pub facility_id: Uuid,
pub device_id: Uuid,
pub port_index: i16,
pub stream_key: String,
pub display_name: String,
pub channels: i16,
pub channel_names: Vec<String>,
pub sample_rate_hz: i32,
pub codec: String,
pub bitrate_per_channel_bps: i32,
pub frame_duration_us: i32,
pub transcode_profile: String,
pub state: String,
pub last_connected_at: Option<DateTime<Utc>>,
pub last_disconnected_at: Option<DateTime<Utc>>,
pub total_seconds_live: i64,
pub retention_days: Option<i32>,
pub metadata: serde_json::Value,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub deleted_at: Option<DateTime<Utc>>,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn live_stream_round_trips_through_json() {
let row = LiveStream {
id: Uuid::nil(),
facility_id: Uuid::nil(),
device_id: Uuid::nil(),
port_index: 0,
stream_key: "fac/dev/0".into(),
display_name: "Front mic".into(),
channels: 4,
channel_names: vec!["FL".into(), "FR".into(), "RL".into(), "RR".into()],
sample_rate_hz: 48_000,
codec: "lc3".into(),
bitrate_per_channel_bps: 64_000,
frame_duration_us: 10_000,
transcode_profile: "default".into(),
state: "idle".into(),
last_connected_at: None,
last_disconnected_at: None,
total_seconds_live: 0,
retention_days: None,
metadata: serde_json::json!({}),
created_at: Utc::now(),
updated_at: Utc::now(),
deleted_at: None,
};
let json = serde_json::to_string(&row).expect("serialize");
let back: LiveStream = serde_json::from_str(&json).expect("deserialize");
assert_eq!(back.stream_key, row.stream_key);
assert_eq!(back.channels, 4);
assert_eq!(back.channel_names.len(), 4);
}
}
- [ ] Step 2: Add module export
Edit crates/xylolabs-core/src/models/mod.rs and add an alphabetically-correct pub mod live_stream; line. Verify by reading the existing file before editing so the alphabetical order is preserved.
- [ ] Step 3: Run the test
cargo test -p xylolabs-core --lib models::live_stream::tests
Expected: PASS (1 passed).
- [ ] Step 4: Commit
git add crates/xylolabs-core/src/models/live_stream.rs \
crates/xylolabs-core/src/models/mod.rs
git commit -S -m "feat(core/models): ✨ add LiveStream domain row (Phase 0)"
Task 5: LiveStreamConnection Model
Files:
- Create: crates/xylolabs-core/src/models/live_stream_connection.rs
- Modify: crates/xylolabs-core/src/models/mod.rs
- [ ] Step 1: Write the model + test
use std::net::IpAddr;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
pub struct LiveStreamConnection {
pub id: Uuid,
pub stream_id: Uuid,
pub api_key_id: Uuid,
pub started_at: DateTime<Utc>,
pub ended_at: Option<DateTime<Utc>>,
pub client_addr: Option<IpAddr>,
pub disconnect_reason: Option<String>,
pub samples_received: i64,
pub bytes_received: i64,
pub last_batch_seq: Option<i32>,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn open_connection_has_no_end_time() {
let row = LiveStreamConnection {
id: Uuid::nil(),
stream_id: Uuid::nil(),
api_key_id: Uuid::nil(),
started_at: Utc::now(),
ended_at: None,
client_addr: Some("10.0.0.1".parse().unwrap()),
disconnect_reason: None,
samples_received: 0,
bytes_received: 0,
last_batch_seq: None,
};
assert!(row.ended_at.is_none());
assert!(row.disconnect_reason.is_none());
}
}
- [ ] Step 2: Export module
Add pub mod live_stream_connection; to crates/xylolabs-core/src/models/mod.rs in alphabetical order.
- [ ] Step 3: Run test
cargo test -p xylolabs-core --lib models::live_stream_connection::tests
Expected: PASS.
- [ ] Step 4: Commit
git add crates/xylolabs-core/src/models/live_stream_connection.rs \
crates/xylolabs-core/src/models/mod.rs
git commit -S -m "feat(core/models): ✨ add LiveStreamConnection audit row (Phase 0)"
Task 6: LiveArchiveSegment Model
Files:
- Create: crates/xylolabs-core/src/models/live_archive_segment.rs
- Modify: crates/xylolabs-core/src/models/mod.rs
- [ ] Step 1: Write the model + test
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
pub struct LiveArchiveSegment {
pub id: Uuid,
pub stream_id: Uuid,
pub kind: String,
pub s3_key: String,
pub sequence_num: Option<i64>,
pub start_us: i64,
pub duration_us: i64,
pub byte_size: i64,
pub discontinuity: bool,
pub created_at: DateTime<Utc>,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn lc3_zstd_kind_omits_sequence() {
let row = LiveArchiveSegment {
id: Uuid::nil(),
stream_id: Uuid::nil(),
kind: "lc3_zstd".into(),
s3_key: "live/abc/lc3/0_10000000.bin.zst".into(),
sequence_num: None,
start_us: 0,
duration_us: 10_000_000,
byte_size: 1024,
discontinuity: false,
created_at: Utc::now(),
};
assert_eq!(row.kind, "lc3_zstd");
assert!(row.sequence_num.is_none());
}
#[test]
fn hls_m4s_kind_carries_sequence() {
let row = LiveArchiveSegment {
id: Uuid::nil(),
stream_id: Uuid::nil(),
kind: "hls_m4s".into(),
s3_key: "live/abc/hls/1234.m4s".into(),
sequence_num: Some(1234),
start_us: 1_000_000,
duration_us: 1_000_000,
byte_size: 2048,
discontinuity: false,
created_at: Utc::now(),
};
assert_eq!(row.kind, "hls_m4s");
assert_eq!(row.sequence_num, Some(1234));
}
}
- [ ] Step 2: Export module
Add pub mod live_archive_segment; to crates/xylolabs-core/src/models/mod.rs in alphabetical order.
- [ ] Step 3: Run test
cargo test -p xylolabs-core --lib models::live_archive_segment::tests
Expected: PASS (2 tests).
- [ ] Step 4: Commit
git add crates/xylolabs-core/src/models/live_archive_segment.rs \
crates/xylolabs-core/src/models/mod.rs
git commit -S -m "feat(core/models): ✨ add LiveArchiveSegment domain row (Phase 0)"
Task 7: DTOs (request / response wire types)
Files:
- Create: crates/xylolabs-core/src/dto/live_stream.rs
- Modify: crates/xylolabs-core/src/dto/mod.rs
- [ ] Step 1: Write the DTOs + test
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct CreateLiveStreamRequest {
pub facility_id: Uuid,
pub device_id: Uuid,
#[serde(default)]
pub port_index: i16,
pub display_name: String,
pub channels: i16,
#[serde(default)]
pub channel_names: Vec<String>,
pub sample_rate_hz: i32,
#[serde(default = "default_bitrate")]
pub bitrate_per_channel_bps: i32,
#[serde(default = "default_transcode_profile")]
pub transcode_profile: String,
}
fn default_bitrate() -> i32 {
64_000
}
fn default_transcode_profile() -> String {
"default".to_string()
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LiveStreamResponse {
pub id: Uuid,
pub stream_key: String,
pub display_name: String,
pub channels: i16,
pub channel_names: Vec<String>,
pub sample_rate_hz: i32,
pub codec: String,
pub state: String,
pub ingest_url: String,
pub listen_urls: ListenUrls,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ListenUrls {
pub ws_lc3: String,
pub ws_aac: String,
pub hls: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LiveStreamListQuery {
#[serde(default)]
pub facility_id: Option<Uuid>,
#[serde(default)]
pub device_id: Option<Uuid>,
#[serde(default)]
pub state: Option<String>,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn create_request_applies_defaults() {
let json = serde_json::json!({
"facility_id": Uuid::nil(),
"device_id": Uuid::nil(),
"display_name": "x",
"channels": 4,
"sample_rate_hz": 48000,
});
let req: CreateLiveStreamRequest = serde_json::from_value(json).unwrap();
assert_eq!(req.port_index, 0);
assert_eq!(req.bitrate_per_channel_bps, 64_000);
assert_eq!(req.transcode_profile, "default");
}
#[test]
fn create_request_rejects_unknown_fields() {
let json = serde_json::json!({
"facility_id": Uuid::nil(),
"device_id": Uuid::nil(),
"display_name": "x",
"channels": 4,
"sample_rate_hz": 48000,
"rogue_field": true,
});
let err = serde_json::from_value::<CreateLiveStreamRequest>(json).unwrap_err();
assert!(err.to_string().contains("unknown field"));
}
}
- [ ] Step 2: Export module
Add pub mod live_stream; to crates/xylolabs-core/src/dto/mod.rs in alphabetical order.
- [ ] Step 3: Run test
cargo test -p xylolabs-core --lib dto::live_stream::tests
Expected: PASS (2 tests).
- [ ] Step 4: Commit
git add crates/xylolabs-core/src/dto/live_stream.rs \
crates/xylolabs-core/src/dto/mod.rs
git commit -S -m "feat(core/dto): ✨ add LiveStream DTOs (Phase 0)"
Task 8: live_stream Repository
Files:
- Create: crates/xylolabs-db/src/repo/live_stream.rs
- Modify: crates/xylolabs-db/src/repo/mod.rs
- [ ] Step 1: Write the repo
use sqlx::PgPool;
use uuid::Uuid;
use xylolabs_core::models::live_stream::LiveStream;
#[allow(clippy::too_many_arguments)]
pub async fn create(
pool: &PgPool,
facility_id: Uuid,
device_id: Uuid,
port_index: i16,
stream_key: &str,
display_name: &str,
channels: i16,
channel_names: &[String],
sample_rate_hz: i32,
bitrate_per_channel_bps: i32,
transcode_profile: &str,
) -> Result<LiveStream, sqlx::Error> {
sqlx::query_as::<_, LiveStream>(
r#"
INSERT INTO live_streams (
facility_id, device_id, port_index, stream_key, display_name,
channels, channel_names, sample_rate_hz, bitrate_per_channel_bps,
transcode_profile
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
RETURNING *
"#,
)
.bind(facility_id)
.bind(device_id)
.bind(port_index)
.bind(stream_key)
.bind(display_name)
.bind(channels)
.bind(channel_names)
.bind(sample_rate_hz)
.bind(bitrate_per_channel_bps)
.bind(transcode_profile)
.fetch_one(pool)
.await
}
pub async fn find_by_id(pool: &PgPool, id: Uuid) -> Result<Option<LiveStream>, sqlx::Error> {
sqlx::query_as::<_, LiveStream>(
r#"SELECT * FROM live_streams WHERE id = $1 AND deleted_at IS NULL"#,
)
.bind(id)
.fetch_optional(pool)
.await
}
pub async fn find_by_stream_key(
pool: &PgPool,
stream_key: &str,
) -> Result<Option<LiveStream>, sqlx::Error> {
sqlx::query_as::<_, LiveStream>(
r#"SELECT * FROM live_streams WHERE stream_key = $1 AND deleted_at IS NULL"#,
)
.bind(stream_key)
.fetch_optional(pool)
.await
}
pub async fn list_by_facility(
pool: &PgPool,
facility_id: Uuid,
state_filter: Option<&str>,
) -> Result<Vec<LiveStream>, sqlx::Error> {
if let Some(state) = state_filter {
sqlx::query_as::<_, LiveStream>(
r#"
SELECT * FROM live_streams
WHERE facility_id = $1 AND state = $2 AND deleted_at IS NULL
ORDER BY display_name ASC
LIMIT 1000
"#,
)
.bind(facility_id)
.bind(state)
.fetch_all(pool)
.await
} else {
sqlx::query_as::<_, LiveStream>(
r#"
SELECT * FROM live_streams
WHERE facility_id = $1 AND deleted_at IS NULL
ORDER BY display_name ASC
LIMIT 1000
"#,
)
.bind(facility_id)
.fetch_all(pool)
.await
}
}
pub async fn list_by_device(
pool: &PgPool,
device_id: Uuid,
) -> Result<Vec<LiveStream>, sqlx::Error> {
sqlx::query_as::<_, LiveStream>(
r#"
SELECT * FROM live_streams
WHERE device_id = $1 AND deleted_at IS NULL
ORDER BY port_index ASC
"#,
)
.bind(device_id)
.fetch_all(pool)
.await
}
pub async fn update_state(
pool: &PgPool,
id: Uuid,
new_state: &str,
) -> Result<(), sqlx::Error> {
sqlx::query(
r#"
UPDATE live_streams
SET state = $2, updated_at = now()
WHERE id = $1
"#,
)
.bind(id)
.bind(new_state)
.execute(pool)
.await?;
Ok(())
}
pub async fn touch_connected(pool: &PgPool, id: Uuid) -> Result<(), sqlx::Error> {
sqlx::query(
r#"
UPDATE live_streams
SET last_connected_at = now(), state = 'live', updated_at = now()
WHERE id = $1
"#,
)
.bind(id)
.execute(pool)
.await?;
Ok(())
}
pub async fn touch_disconnected(pool: &PgPool, id: Uuid) -> Result<(), sqlx::Error> {
sqlx::query(
r#"
UPDATE live_streams
SET last_disconnected_at = now(), state = 'idle', updated_at = now()
WHERE id = $1
"#,
)
.bind(id)
.execute(pool)
.await?;
Ok(())
}
pub async fn soft_delete(pool: &PgPool, id: Uuid) -> Result<(), sqlx::Error> {
sqlx::query(
r#"UPDATE live_streams SET deleted_at = now(), updated_at = now() WHERE id = $1"#,
)
.bind(id)
.execute(pool)
.await?;
Ok(())
}
- [ ] Step 2: Export module
Add pub mod live_stream; to crates/xylolabs-db/src/repo/mod.rs in alphabetical order.
- [ ] Step 3: cargo check
cargo check -p xylolabs-db
Expected: clean compile, zero warnings on the new file.
- [ ] Step 4: Commit
git add crates/xylolabs-db/src/repo/live_stream.rs \
crates/xylolabs-db/src/repo/mod.rs
git commit -S -m "feat(db/repo): ✨ live_stream CRUD repo (Phase 0)"
Task 9: live_stream_connection Repository
Files:
- Create: crates/xylolabs-db/src/repo/live_stream_connection.rs
- Modify: crates/xylolabs-db/src/repo/mod.rs
- [ ] Step 1: Write the repo
use std::net::IpAddr;
use sqlx::PgPool;
use uuid::Uuid;
use xylolabs_core::models::live_stream_connection::LiveStreamConnection;
pub async fn open(
pool: &PgPool,
stream_id: Uuid,
api_key_id: Uuid,
client_addr: Option<IpAddr>,
) -> Result<LiveStreamConnection, sqlx::Error> {
sqlx::query_as::<_, LiveStreamConnection>(
r#"
INSERT INTO live_stream_connections (stream_id, api_key_id, client_addr)
VALUES ($1, $2, $3)
RETURNING *
"#,
)
.bind(stream_id)
.bind(api_key_id)
.bind(client_addr)
.fetch_one(pool)
.await
}
pub async fn close(
pool: &PgPool,
id: Uuid,
reason: &str,
samples_received: i64,
bytes_received: i64,
last_batch_seq: Option<i32>,
) -> Result<(), sqlx::Error> {
sqlx::query(
r#"
UPDATE live_stream_connections
SET ended_at = now(),
disconnect_reason = $2,
samples_received = $3,
bytes_received = $4,
last_batch_seq = $5
WHERE id = $1 AND ended_at IS NULL
"#,
)
.bind(id)
.bind(reason)
.bind(samples_received)
.bind(bytes_received)
.bind(last_batch_seq)
.execute(pool)
.await?;
Ok(())
}
pub async fn close_all_open_for_stream(
pool: &PgPool,
stream_id: Uuid,
reason: &str,
) -> Result<u64, sqlx::Error> {
let result = sqlx::query(
r#"
UPDATE live_stream_connections
SET ended_at = now(), disconnect_reason = $2
WHERE stream_id = $1 AND ended_at IS NULL
"#,
)
.bind(stream_id)
.bind(reason)
.execute(pool)
.await?;
Ok(result.rows_affected())
}
- [ ] Step 2: Export module
Add pub mod live_stream_connection; to crates/xylolabs-db/src/repo/mod.rs in alphabetical order.
- [ ] Step 3: cargo check
cargo check -p xylolabs-db
Expected: clean compile.
- [ ] Step 4: Commit
git add crates/xylolabs-db/src/repo/live_stream_connection.rs \
crates/xylolabs-db/src/repo/mod.rs
git commit -S -m "feat(db/repo): ✨ live_stream_connection audit repo (Phase 0)"
Task 10: live_archive_segment Repository
Files:
- Create: crates/xylolabs-db/src/repo/live_archive_segment.rs
- Modify: crates/xylolabs-db/src/repo/mod.rs
- [ ] Step 1: Write the repo
use sqlx::PgPool;
use uuid::Uuid;
use xylolabs_core::models::live_archive_segment::LiveArchiveSegment;
#[allow(clippy::too_many_arguments)]
pub async fn record(
pool: &PgPool,
stream_id: Uuid,
kind: &str,
s3_key: &str,
sequence_num: Option<i64>,
start_us: i64,
duration_us: i64,
byte_size: i64,
discontinuity: bool,
) -> Result<LiveArchiveSegment, sqlx::Error> {
sqlx::query_as::<_, LiveArchiveSegment>(
r#"
INSERT INTO live_archive_segments (
stream_id, kind, s3_key, sequence_num,
start_us, duration_us, byte_size, discontinuity
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
RETURNING *
"#,
)
.bind(stream_id)
.bind(kind)
.bind(s3_key)
.bind(sequence_num)
.bind(start_us)
.bind(duration_us)
.bind(byte_size)
.bind(discontinuity)
.fetch_one(pool)
.await
}
pub async fn list_by_stream_in_range(
pool: &PgPool,
stream_id: Uuid,
start_us: i64,
end_us: i64,
kind: Option<&str>,
) -> Result<Vec<LiveArchiveSegment>, sqlx::Error> {
if let Some(k) = kind {
sqlx::query_as::<_, LiveArchiveSegment>(
r#"
SELECT * FROM live_archive_segments
WHERE stream_id = $1 AND kind = $2 AND start_us < $4 AND start_us + duration_us > $3
ORDER BY start_us ASC
LIMIT 10000
"#,
)
.bind(stream_id)
.bind(k)
.bind(start_us)
.bind(end_us)
.fetch_all(pool)
.await
} else {
sqlx::query_as::<_, LiveArchiveSegment>(
r#"
SELECT * FROM live_archive_segments
WHERE stream_id = $1 AND start_us < $3 AND start_us + duration_us > $2
ORDER BY start_us ASC
LIMIT 10000
"#,
)
.bind(stream_id)
.bind(start_us)
.bind(end_us)
.fetch_all(pool)
.await
}
}
pub async fn delete_older_than(
pool: &PgPool,
cutoff: chrono::DateTime<chrono::Utc>,
batch_limit: i64,
) -> Result<Vec<LiveArchiveSegment>, sqlx::Error> {
sqlx::query_as::<_, LiveArchiveSegment>(
r#"
DELETE FROM live_archive_segments
WHERE id IN (
SELECT id FROM live_archive_segments
WHERE created_at < $1
ORDER BY created_at ASC
LIMIT $2
)
RETURNING *
"#,
)
.bind(cutoff)
.bind(batch_limit)
.fetch_all(pool)
.await
}
- [ ] Step 2: Export module
Add pub mod live_archive_segment; to crates/xylolabs-db/src/repo/mod.rs in alphabetical order.
- [ ] Step 3: cargo check
cargo check -p xylolabs-db
Expected: clean compile.
- [ ] Step 4: Commit
git add crates/xylolabs-db/src/repo/live_archive_segment.rs \
crates/xylolabs-db/src/repo/mod.rs
git commit -S -m "feat(db/repo): ✨ live_archive_segment index repo (Phase 0)"
Task 11: AppError::NotImplemented Variant
Files:
- Modify: crates/xylolabs-server/src/error.rs
- [ ] Step 1: Read the existing error enum
Read crates/xylolabs-server/src/error.rs and note the current variants. The skeleton manager needs to return AppError::NotImplemented(reason) from stubbed methods so callers fail loudly with HTTP 501 instead of crashing.
- [ ] Step 2: Add variant + status mapping
In the pub enum AppError { … } block, insert (alphabetically) a new variant:
NotImplemented(String),
In the IntoResponse for AppError (or equivalent status_code() mapping), add:
AppError::NotImplemented(msg) => (StatusCode::NOT_IMPLEMENTED, msg.clone()),
- [ ] Step 3: Build
cargo check -p xylolabs-server
Expected: clean compile.
- [ ] Step 4: Commit
git add crates/xylolabs-server/src/error.rs
git commit -S -m "feat(server/error): ✨ add NotImplemented variant for Phase 0 stubs"
Task 12: LiveAudioManager Skeleton
Files:
- Create: crates/xylolabs-server/src/live/mod.rs
- Create: crates/xylolabs-server/src/live/manager.rs
- Modify: crates/xylolabs-server/src/lib.rs (or src/main.rs if no lib.rs — check before editing)
- [ ] Step 1: Create
live/mod.rs
pub mod manager;
pub use manager::LiveAudioManager;
- [ ] Step 2: Create
live/manager.rsskeleton
use std::sync::Arc;
use dashmap::DashMap;
use sqlx::PgPool;
use uuid::Uuid;
use xylolabs_storage::S3Client;
use crate::error::AppError;
/// Phase 0 skeleton. Every public method returns
/// `AppError::NotImplemented` so callers can wire up routes and tests
/// against a stable signature, but actually exercising them at runtime
/// fails loudly with HTTP 501. Phase 1+ replaces each `NotImplemented`
/// with a real implementation.
#[derive(Clone)]
pub struct LiveAudioManager {
inner: Arc<LiveAudioManagerInner>,
}
struct LiveAudioManagerInner {
db: PgPool,
s3: S3Client,
streams: DashMap<Uuid, ()>,
}
#[allow(dead_code)]
impl LiveAudioManager {
pub fn new(db: PgPool, s3: S3Client) -> Self {
Self {
inner: Arc::new(LiveAudioManagerInner {
db,
s3,
streams: DashMap::new(),
}),
}
}
pub async fn open_ingest(
&self,
_stream_id: Uuid,
_api_key_id: Uuid,
) -> Result<(), AppError> {
Err(AppError::NotImplemented(
"live.open_ingest is a Phase 0 skeleton".into(),
))
}
pub async fn close_ingest(
&self,
_stream_id: Uuid,
_reason: &str,
) -> Result<(), AppError> {
Err(AppError::NotImplemented(
"live.close_ingest is a Phase 0 skeleton".into(),
))
}
pub async fn hls_playlist(&self, _stream_id: Uuid) -> Result<String, AppError> {
Err(AppError::NotImplemented(
"live.hls_playlist is a Phase 0 skeleton".into(),
))
}
pub async fn hls_segment(
&self,
_stream_id: Uuid,
_seq: u64,
) -> Result<bytes::Bytes, AppError> {
Err(AppError::NotImplemented(
"live.hls_segment is a Phase 0 skeleton".into(),
))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn open_ingest_stubbed_returns_not_implemented() {
let pool = PgPool::connect_lazy("postgres://invalid/").unwrap();
let s3 = xylolabs_storage::S3Client::new_for_test();
let mgr = LiveAudioManager::new(pool, s3);
let err = mgr.open_ingest(Uuid::nil(), Uuid::nil()).await.unwrap_err();
assert!(matches!(err, AppError::NotImplemented(_)));
}
}
NOTE: the test above assumes a
S3Client::new_for_test()constructor exists. If it does not, replace with whatever no-op / stub constructor the existing tests use (searchtests/common/mod.rsfor examples). If no test constructor exists, gate the test on#[ignore]and skip Step 4.
- [ ] Step 3: Register the module
Open crates/xylolabs-server/src/lib.rs. If it exists, add pub mod live; in alphabetical order. If it does not exist (i.e., the crate is binary-only), add pub mod live; to crates/xylolabs-server/src/main.rs instead.
- [ ] Step 4: Run the unit test
cargo test -p xylolabs-server --lib live::manager::tests
Expected: PASS (or 0 passed; 1 ignored if the test was gated).
- [ ] Step 5: Commit
git add crates/xylolabs-server/src/live/mod.rs \
crates/xylolabs-server/src/live/manager.rs \
crates/xylolabs-server/src/lib.rs # (or main.rs)
git commit -S -m "feat(server/live): ✨ LiveAudioManager skeleton (Phase 0)"
Task 13: Wire LiveAudioManager into AppState
Files:
- Modify: crates/xylolabs-server/src/state.rs
- Modify: crates/xylolabs-server/src/main.rs
- Modify: crates/xylolabs-server/tests/common/mod.rs
- [ ] Step 1: Add field to
AppState
Open crates/xylolabs-server/src/state.rs. Add the field (alphabetical order within the struct):
pub live: std::sync::Arc<crate::live::LiveAudioManager>,
- [ ] Step 2: Construct at boot
Open crates/xylolabs-server/src/main.rs. After the let storage = ... block and before the AppState { ... } construction, add:
let live = std::sync::Arc::new(crate::live::LiveAudioManager::new(
pool.clone(),
storage.clone(),
));
Then add live, (alphabetical) inside the AppState { ... } literal.
- [ ] Step 3: Mirror in test fixtures
Open crates/xylolabs-server/tests/common/mod.rs. Locate the AppState { ... } literal inside TestApp::setup(). Add live, (alphabetical) and the construction let live = std::sync::Arc::new(xylolabs_server::live::LiveAudioManager::new(pool.clone(), storage.clone())); just before the literal.
- [ ] Step 4: Build
cargo check -p xylolabs-server
cargo check --tests -p xylolabs-server
Expected: clean compile on both.
- [ ] Step 5: Commit
git add crates/xylolabs-server/src/state.rs \
crates/xylolabs-server/src/main.rs \
crates/xylolabs-server/tests/common/mod.rs
git commit -S -m "feat(server/state): ✨ wire LiveAudioManager into AppState (Phase 0)"
Task 14: Integration test — round-trip the three new tables
Files:
- Create: crates/xylolabs-server/tests/api_live_repo.rs
- [ ] Step 1: Write the integration test
#![allow(clippy::manual_div_ceil)]
//! Phase 0 integration: round-trip `live_streams`, `live_stream_connections`,
//! and `live_archive_segments` through their repo APIs. Requires the
//! Docker test infrastructure (postgres). `#[ignore]`'d by default.
mod common;
use uuid::Uuid;
use xylolabs_db::repo;
#[tokio::test]
#[ignore]
async fn create_then_find_live_stream() {
let app = common::TestApp::setup().await;
let device = xylolabs_db::repo::device::find_or_create(
&app.pool,
app.facility_id,
"test-device-live-0",
Some("Test device"),
)
.await
.expect("seed device");
let stream = repo::live_stream::create(
&app.pool,
app.facility_id,
device.id,
0,
"test-fac/test-device-live-0/0",
"Front",
4,
&["FL".into(), "FR".into(), "RL".into(), "RR".into()],
48_000,
64_000,
"default",
)
.await
.expect("create live_stream");
assert_eq!(stream.channels, 4);
assert_eq!(stream.state, "idle");
let found = repo::live_stream::find_by_id(&app.pool, stream.id)
.await
.expect("find_by_id")
.expect("present");
assert_eq!(found.id, stream.id);
let by_key = repo::live_stream::find_by_stream_key(
&app.pool,
"test-fac/test-device-live-0/0",
)
.await
.expect("find_by_stream_key")
.expect("present");
assert_eq!(by_key.id, stream.id);
}
#[tokio::test]
#[ignore]
async fn open_then_close_connection_audit() {
let app = common::TestApp::setup().await;
let device = xylolabs_db::repo::device::find_or_create(
&app.pool,
app.facility_id,
"test-device-live-1",
None,
)
.await
.expect("seed device");
let stream = repo::live_stream::create(
&app.pool,
app.facility_id,
device.id,
0,
"test-fac/test-device-live-1/0",
"X",
2,
&[],
48_000,
64_000,
"default",
)
.await
.expect("create");
let api_key_id = xylolabs_db::repo::api_key::find_by_hash(
&app.pool,
&sha256_of(&app.api_key_raw),
)
.await
.expect("api key lookup")
.expect("seeded key present")
.id;
let conn = repo::live_stream_connection::open(
&app.pool,
stream.id,
api_key_id,
Some("127.0.0.1".parse().unwrap()),
)
.await
.expect("open");
assert!(conn.ended_at.is_none());
repo::live_stream_connection::close(
&app.pool,
conn.id,
"client_close",
12345,
67890,
Some(42),
)
.await
.expect("close");
}
#[tokio::test]
#[ignore]
async fn archive_segment_records_and_queries() {
let app = common::TestApp::setup().await;
let device = xylolabs_db::repo::device::find_or_create(
&app.pool,
app.facility_id,
"test-device-live-2",
None,
)
.await
.expect("seed device");
let stream = repo::live_stream::create(
&app.pool,
app.facility_id,
device.id,
0,
"test-fac/test-device-live-2/0",
"X",
2,
&[],
48_000,
64_000,
"default",
)
.await
.expect("create");
repo::live_archive_segment::record(
&app.pool,
stream.id,
"lc3_zstd",
"live/x/lc3/0_10000000.bin.zst",
None,
0,
10_000_000,
1024,
false,
)
.await
.expect("record lc3");
repo::live_archive_segment::record(
&app.pool,
stream.id,
"hls_m4s",
"live/x/hls/1.m4s",
Some(1),
5_000_000,
1_000_000,
2048,
false,
)
.await
.expect("record hls");
let in_range = repo::live_archive_segment::list_by_stream_in_range(
&app.pool,
stream.id,
0,
10_000_000,
None,
)
.await
.expect("range query");
assert_eq!(in_range.len(), 2);
let only_hls = repo::live_archive_segment::list_by_stream_in_range(
&app.pool,
stream.id,
0,
10_000_000,
Some("hls_m4s"),
)
.await
.expect("hls only");
assert_eq!(only_hls.len(), 1);
assert_eq!(only_hls[0].kind, "hls_m4s");
assert_eq!(only_hls[0].sequence_num, Some(1));
}
fn sha256_of(s: &str) -> String {
use sha2::{Digest, Sha256};
let mut h = Sha256::new();
h.update(s.as_bytes());
h.finalize().iter().map(|b| format!("{b:02x}")).collect()
}
NOTE: if
xylolabs_db::repo::device::find_or_createdoes not exist, replace with whatever device creation helper the existing tests use (searchcommon/mod.rsandtests/api_devices.rs). The exact API is secondary; the contract here is "seed a device under the test facility".
- [ ] Step 2: Build the test crate
cargo check --tests -p xylolabs-server
Expected: clean compile.
- [ ] Step 3: Run the test (Docker required)
docker compose -f docker-compose.dev.yml up -d postgres minio
cargo test -p xylolabs-server --test api_live_repo -- --ignored
Expected: 3 tests pass.
- [ ] Step 4: Commit
git add crates/xylolabs-server/tests/api_live_repo.rs
git commit -S -m "test(server/live): 🧪 round-trip Phase 0 repos through Docker fixtures"
Task 15: Top-level workspace build + lint
Files: none (workspace-wide validation)
- [ ] Step 1: Full build
cargo build
Expected: clean build.
- [ ] Step 2: Format check on touched files
rustfmt --check --edition 2024 \
crates/xylolabs-core/src/models/live_stream.rs \
crates/xylolabs-core/src/models/live_stream_connection.rs \
crates/xylolabs-core/src/models/live_archive_segment.rs \
crates/xylolabs-core/src/dto/live_stream.rs \
crates/xylolabs-db/src/repo/live_stream.rs \
crates/xylolabs-db/src/repo/live_stream_connection.rs \
crates/xylolabs-db/src/repo/live_archive_segment.rs \
crates/xylolabs-server/src/live/mod.rs \
crates/xylolabs-server/src/live/manager.rs \
crates/xylolabs-server/tests/api_live_repo.rs
Expected: EXIT=0. If any file diffs, rustfmt --edition 2024 <file> and re-commit.
- [ ] Step 3: Clippy on the server crate
cargo clippy -p xylolabs-server --tests 2>&1 | grep -E "warning|error" \
| grep -v "lc3_legacy\|incremental\|generated"
Expected: empty (no new warnings in code we authored). Pre-existing warnings in lc3_legacy.rs and device_timeline.rs are out of scope for this plan.
- [ ] Step 4: Push and deploy
git pull --rebase
git push
bash scripts/deploy.sh
Wait for === Deployment complete === line and the green health check that the deploy script prints.
- [ ] Step 5: Smoke-verify production
# Server should be running. No live routes exist yet; this just
# confirms the new migrations applied cleanly.
curl -s -o /dev/null -w "%{http_code}\n" https://api.xylolabs.com/api/v1/health
ssh -i ~/.ssh/xylolabs-api.pem ubuntu@api.xylolabs.com \
'docker exec xylolabs-api-postgres-1 psql -U xylolabs -d xylolabs -c "\dt live_*"'
Expected: 200 on health, \dt live_* lists live_streams, live_stream_connections, live_archive_segments (3 tables).
Self-review notes (already applied inline)
- Spec coverage: Phase 0 of the spec calls for DB migrations, scope additions,
MediaAccessActorextension, and transcode skeleton (1-week scope). This plan covers migrations, models, repos, DTOs, and aLiveAudioManagerskeleton wired intoAppState. Scope additions (live:ingest,live:listen) are convention strings — they live in the API-key scope text array with no schema change required, andMediaAccessActor(which already exists, added in commit000000037acycle 5 2026-05-22) already supports any scope string viarequire_api_key_scope. The transcode skeleton (FFmpeg wrappers) is deferred to Phase 1's first task because there is nothing in Phase 0 that calls it yet — adding it now would be dead code that fails Self-review §3 (scope check). - Placeholder scan: Every step has either runnable shell, complete code, or a concrete validation. No "TBD" / "fill in later".
- Type consistency:
LiveStream::channelsisi16everywhere (model, DTO, repo);LiveStream::sample_rate_hzisi32everywhere;LiveArchiveSegment::sequence_numisOption<i64>everywhere;LiveStreamConnection::client_addrisOption<IpAddr>everywhere. - Ambiguity: The note in Task 12 about
S3Client::new_for_testand Task 14 aboutdevice::find_or_createis intentional — both are codebase-dependent and the implementer must check existing test patterns before committing. Both are isolated to test code; production paths use whatever the runtime helpers actually provide.
Next phases (deferred)
After Phase 0 deploys cleanly:
| Phase | Title | Plan path |
|---|---|---|
| 1 | Ingest WS + LC3 zstd archive | docs/superpowers/plans/2026-05-2X-live-streaming-phase-1.md (to be written) |
| 2 | Listener WS + per-format encoders + token endpoint | (TBW) |
| 3 | HLS segmenter + memory ring + LL-HLS playlist | (TBW) |
| 4 | Frontend LiveAudioPlayer + DeviceTimelinePage integration |
(TBW) |
| 5 | SDK LiveClient + ESP32-S3 4ch PDM |
(TBW) |
| 6 | E2E + load + docs | (TBW) |
Each subsequent phase plan will be written against the actual code shape that lands from the prior phase, not pre-speculatively, because Rust API decisions tend to ripple and pre-written plans rot quickly when the type system pushes back on them.