mirror of
https://github.com/linsa-io/linsa.git
synced 2026-04-27 10:47:14 +02:00
.
This commit is contained in:
411
packages/worker/stream/src/main.rs
Normal file
411
packages/worker/stream/src/main.rs
Normal file
@@ -0,0 +1,411 @@
|
||||
mod receiver;
|
||||
mod s3;
|
||||
mod watcher;
|
||||
|
||||
use std::net::SocketAddr;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use axum::extract::State;
|
||||
use axum::http::StatusCode;
|
||||
use axum::response::Json;
|
||||
use axum::routing::get;
|
||||
use axum::Router;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::sync::RwLock;
|
||||
use tower_http::cors::CorsLayer;
|
||||
use tower_http::services::ServeDir;
|
||||
use tower_http::trace::TraceLayer;
|
||||
use tracing::info;
|
||||
|
||||
use receiver::ReceiverHandle;
|
||||
use s3::S3Uploader;
|
||||
use watcher::SegmentWatcher;
|
||||
|
||||
#[derive(Clone)]
|
||||
struct AppState {
|
||||
receiver: Arc<RwLock<Option<ReceiverHandle>>>,
|
||||
uploader: Option<Arc<S3Uploader>>,
|
||||
config: Arc<Config>,
|
||||
stats: Arc<RwLock<Stats>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
struct Config {
|
||||
/// Port to listen for SRT stream
|
||||
srt_port: u16,
|
||||
/// Directory to store segments
|
||||
segment_dir: PathBuf,
|
||||
/// Segment duration in seconds
|
||||
segment_duration: u32,
|
||||
/// S3 bucket name
|
||||
s3_bucket: String,
|
||||
/// S3 prefix for uploads
|
||||
s3_prefix: String,
|
||||
/// HTTP API port
|
||||
api_port: u16,
|
||||
/// Delete local files after S3 upload
|
||||
delete_after_upload: bool,
|
||||
/// ffmpeg path
|
||||
ffmpeg_path: PathBuf,
|
||||
/// YouTube RTMP URL (rtmp://a.rtmp.youtube.com/live2)
|
||||
youtube_rtmp_url: String,
|
||||
/// YouTube stream key
|
||||
youtube_stream_key: String,
|
||||
/// Enable YouTube streaming
|
||||
youtube_enabled: bool,
|
||||
/// HLS output directory (for web playback)
|
||||
hls_dir: PathBuf,
|
||||
/// Enable HLS output for web playback
|
||||
hls_enabled: bool,
|
||||
/// HLS segment duration in seconds
|
||||
hls_segment_duration: u32,
|
||||
}
|
||||
|
||||
impl Default for Config {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
srt_port: 6000,
|
||||
segment_dir: PathBuf::from("/root/stream/segments"),
|
||||
segment_duration: 60,
|
||||
s3_bucket: String::new(),
|
||||
s3_prefix: "stream".into(),
|
||||
api_port: 8080,
|
||||
delete_after_upload: true,
|
||||
ffmpeg_path: PathBuf::from("ffmpeg"),
|
||||
youtube_rtmp_url: "rtmp://a.rtmp.youtube.com/live2".into(),
|
||||
youtube_stream_key: String::new(),
|
||||
youtube_enabled: false,
|
||||
hls_dir: PathBuf::from("/root/stream/hls"),
|
||||
hls_enabled: true, // Enable HLS by default for web playback
|
||||
hls_segment_duration: 4,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Serialize)]
|
||||
struct Stats {
|
||||
receiving: bool,
|
||||
segments_uploaded: u64,
|
||||
bytes_uploaded: u64,
|
||||
last_segment: Option<String>,
|
||||
errors: Vec<String>,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter(
|
||||
tracing_subscriber::EnvFilter::from_default_env()
|
||||
.add_directive("stream_server=info".parse().unwrap()),
|
||||
)
|
||||
.init();
|
||||
|
||||
let config = load_config()?;
|
||||
info!("Starting stream server with config: {:?}", config);
|
||||
|
||||
// Ensure segment directory exists
|
||||
tokio::fs::create_dir_all(&config.segment_dir)
|
||||
.await
|
||||
.with_context(|| format!("create segment dir {}", config.segment_dir.display()))?;
|
||||
|
||||
// Initialize S3 uploader only when configured
|
||||
let uploader = if config.s3_bucket.trim().is_empty() {
|
||||
info!("S3 upload disabled (S3_BUCKET not set)");
|
||||
None
|
||||
} else {
|
||||
let uploader = S3Uploader::new(&config.s3_bucket, &config.s3_prefix).await?;
|
||||
Some(Arc::new(uploader))
|
||||
};
|
||||
|
||||
let state = AppState {
|
||||
receiver: Arc::new(RwLock::new(None)),
|
||||
uploader: uploader.clone(),
|
||||
config: Arc::new(config.clone()),
|
||||
stats: Arc::new(RwLock::new(Stats::default())),
|
||||
};
|
||||
|
||||
// Start segment watcher only when S3 upload is enabled
|
||||
if state.uploader.is_some() {
|
||||
let watcher_state = state.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = run_watcher(watcher_state).await {
|
||||
tracing::error!("Watcher error: {e}");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Auto-start receiver
|
||||
let receiver_state = state.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = start_receiver_internal(&receiver_state).await {
|
||||
tracing::error!("Failed to start receiver: {e}");
|
||||
}
|
||||
});
|
||||
|
||||
// Ensure HLS directory exists
|
||||
if config.hls_enabled {
|
||||
tokio::fs::create_dir_all(&config.hls_dir)
|
||||
.await
|
||||
.with_context(|| format!("create HLS dir {}", config.hls_dir.display()))?;
|
||||
}
|
||||
|
||||
// Build HTTP API
|
||||
let mut app = Router::new()
|
||||
.route("/", get(index))
|
||||
.route("/status", get(status))
|
||||
.route("/start", get(start_receiver))
|
||||
.route("/stop", get(stop_receiver))
|
||||
.route("/health", get(health));
|
||||
|
||||
// Serve HLS files if enabled
|
||||
if config.hls_enabled {
|
||||
info!("Serving HLS files from {} at /hls/", config.hls_dir.display());
|
||||
app = app.nest_service("/hls", ServeDir::new(&config.hls_dir));
|
||||
}
|
||||
|
||||
let app = app
|
||||
.layer(TraceLayer::new_for_http())
|
||||
.layer(CorsLayer::permissive())
|
||||
.with_state(state);
|
||||
|
||||
let addr = SocketAddr::from(([0, 0, 0, 0], config.api_port));
|
||||
info!("HTTP API listening on {}", addr);
|
||||
|
||||
let listener = tokio::net::TcpListener::bind(addr).await?;
|
||||
axum::serve(listener, app).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn load_config() -> Result<Config> {
|
||||
let mut config = Config::default();
|
||||
|
||||
// Load from environment
|
||||
if let Ok(port) = std::env::var("SRT_PORT") {
|
||||
config.srt_port = port.parse().context("parse SRT_PORT")?;
|
||||
}
|
||||
if let Ok(dir) = std::env::var("SEGMENT_DIR") {
|
||||
config.segment_dir = PathBuf::from(dir);
|
||||
}
|
||||
if let Ok(duration) = std::env::var("SEGMENT_DURATION") {
|
||||
config.segment_duration = duration.parse().context("parse SEGMENT_DURATION")?;
|
||||
}
|
||||
if let Ok(bucket) = std::env::var("S3_BUCKET") {
|
||||
config.s3_bucket = bucket;
|
||||
}
|
||||
if let Ok(prefix) = std::env::var("S3_PREFIX") {
|
||||
config.s3_prefix = prefix;
|
||||
}
|
||||
if let Ok(port) = std::env::var("API_PORT") {
|
||||
config.api_port = port.parse().context("parse API_PORT")?;
|
||||
}
|
||||
if let Ok(delete) = std::env::var("DELETE_AFTER_UPLOAD") {
|
||||
config.delete_after_upload = delete == "true" || delete == "1";
|
||||
}
|
||||
if let Ok(path) = std::env::var("FFMPEG_PATH") {
|
||||
config.ffmpeg_path = PathBuf::from(path);
|
||||
}
|
||||
if let Ok(url) = std::env::var("YOUTUBE_RTMP_URL") {
|
||||
config.youtube_rtmp_url = url;
|
||||
}
|
||||
if let Ok(key) = std::env::var("YOUTUBE_STREAM_KEY") {
|
||||
config.youtube_stream_key = key;
|
||||
config.youtube_enabled = true;
|
||||
}
|
||||
if let Ok(enabled) = std::env::var("YOUTUBE_ENABLED") {
|
||||
config.youtube_enabled = enabled == "true" || enabled == "1";
|
||||
}
|
||||
if let Ok(dir) = std::env::var("HLS_DIR") {
|
||||
config.hls_dir = PathBuf::from(dir);
|
||||
}
|
||||
if let Ok(enabled) = std::env::var("HLS_ENABLED") {
|
||||
config.hls_enabled = enabled == "true" || enabled == "1";
|
||||
}
|
||||
if let Ok(duration) = std::env::var("HLS_SEGMENT_DURATION") {
|
||||
config.hls_segment_duration = duration.parse().context("parse HLS_SEGMENT_DURATION")?;
|
||||
}
|
||||
|
||||
Ok(config)
|
||||
}
|
||||
|
||||
async fn run_watcher(state: AppState) -> Result<()> {
|
||||
let uploader = match state.uploader.clone() {
|
||||
Some(uploader) => uploader,
|
||||
None => {
|
||||
info!("S3 upload disabled; segment watcher not started");
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
let mut watcher = SegmentWatcher::new(&state.config.segment_dir)?;
|
||||
|
||||
loop {
|
||||
if let Some(segment_path) = watcher.next_segment().await {
|
||||
let filename = segment_path
|
||||
.file_name()
|
||||
.and_then(|n| n.to_str())
|
||||
.unwrap_or("unknown")
|
||||
.to_string();
|
||||
|
||||
info!("Uploading segment: {}", filename);
|
||||
|
||||
match uploader.upload_file(&segment_path).await {
|
||||
Ok(bytes) => {
|
||||
let mut stats = state.stats.write().await;
|
||||
stats.segments_uploaded += 1;
|
||||
stats.bytes_uploaded += bytes;
|
||||
stats.last_segment = Some(filename.clone());
|
||||
|
||||
if state.config.delete_after_upload {
|
||||
if let Err(e) = tokio::fs::remove_file(&segment_path).await {
|
||||
tracing::warn!("Failed to delete {}: {e}", segment_path.display());
|
||||
} else {
|
||||
info!("Deleted local segment: {}", filename);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("Upload failed for {}: {e}", filename);
|
||||
let mut stats = state.stats.write().await;
|
||||
stats.errors.push(format!("Upload {}: {e}", filename));
|
||||
if stats.errors.len() > 100 {
|
||||
stats.errors.remove(0);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn start_receiver_internal(state: &AppState) -> Result<()> {
|
||||
let mut receiver_guard = state.receiver.write().await;
|
||||
if receiver_guard.is_some() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let handle = if state.config.youtube_enabled && !state.config.youtube_stream_key.is_empty() {
|
||||
info!("Starting YouTube streaming mode");
|
||||
receiver::start_youtube(
|
||||
&state.config.ffmpeg_path,
|
||||
state.config.srt_port,
|
||||
&state.config.youtube_rtmp_url,
|
||||
&state.config.youtube_stream_key,
|
||||
)
|
||||
.await?
|
||||
} else if state.config.hls_enabled {
|
||||
info!("Starting HLS streaming mode");
|
||||
receiver::start_hls(
|
||||
&state.config.ffmpeg_path,
|
||||
state.config.srt_port,
|
||||
&state.config.hls_dir,
|
||||
state.config.hls_segment_duration,
|
||||
)
|
||||
.await?
|
||||
} else {
|
||||
receiver::start(
|
||||
&state.config.ffmpeg_path,
|
||||
state.config.srt_port,
|
||||
&state.config.segment_dir,
|
||||
state.config.segment_duration,
|
||||
)
|
||||
.await?
|
||||
};
|
||||
|
||||
*receiver_guard = Some(handle);
|
||||
state.stats.write().await.receiving = true;
|
||||
|
||||
info!(
|
||||
"SRT receiver started on port {}",
|
||||
state.config.srt_port
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn index() -> &'static str {
|
||||
"Stream Server - POST /start, /stop, GET /status, /health"
|
||||
}
|
||||
|
||||
async fn health() -> StatusCode {
|
||||
StatusCode::OK
|
||||
}
|
||||
|
||||
async fn status(State(state): State<AppState>) -> Json<StatusResponse> {
|
||||
let stats = state.stats.read().await;
|
||||
let receiver = state.receiver.read().await;
|
||||
|
||||
Json(StatusResponse {
|
||||
receiving: receiver.is_some(),
|
||||
srt_port: state.config.srt_port,
|
||||
s3_bucket: state.config.s3_bucket.clone(),
|
||||
youtube_enabled: state.config.youtube_enabled,
|
||||
youtube_rtmp_url: if state.config.youtube_enabled {
|
||||
Some(state.config.youtube_rtmp_url.clone())
|
||||
} else {
|
||||
None
|
||||
},
|
||||
hls_enabled: state.config.hls_enabled,
|
||||
hls_url: if state.config.hls_enabled {
|
||||
Some(format!("/hls/stream.m3u8"))
|
||||
} else {
|
||||
None
|
||||
},
|
||||
segments_uploaded: stats.segments_uploaded,
|
||||
bytes_uploaded: stats.bytes_uploaded,
|
||||
bytes_uploaded_human: human_bytes(stats.bytes_uploaded),
|
||||
last_segment: stats.last_segment.clone(),
|
||||
recent_errors: stats.errors.iter().rev().take(5).cloned().collect(),
|
||||
})
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct StatusResponse {
|
||||
receiving: bool,
|
||||
srt_port: u16,
|
||||
s3_bucket: String,
|
||||
youtube_enabled: bool,
|
||||
youtube_rtmp_url: Option<String>,
|
||||
hls_enabled: bool,
|
||||
hls_url: Option<String>,
|
||||
segments_uploaded: u64,
|
||||
bytes_uploaded: u64,
|
||||
bytes_uploaded_human: String,
|
||||
last_segment: Option<String>,
|
||||
recent_errors: Vec<String>,
|
||||
}
|
||||
|
||||
async fn start_receiver(State(state): State<AppState>) -> Result<Json<serde_json::Value>, StatusCode> {
|
||||
match start_receiver_internal(&state).await {
|
||||
Ok(()) => Ok(Json(serde_json::json!({"status": "started"}))),
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to start receiver: {e}");
|
||||
Err(StatusCode::INTERNAL_SERVER_ERROR)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn stop_receiver(State(state): State<AppState>) -> Json<serde_json::Value> {
|
||||
let mut receiver_guard = state.receiver.write().await;
|
||||
if let Some(handle) = receiver_guard.take() {
|
||||
handle.stop().await;
|
||||
state.stats.write().await.receiving = false;
|
||||
info!("SRT receiver stopped");
|
||||
}
|
||||
Json(serde_json::json!({"status": "stopped"}))
|
||||
}
|
||||
|
||||
fn human_bytes(bytes: u64) -> String {
|
||||
const KB: u64 = 1024;
|
||||
const MB: u64 = KB * 1024;
|
||||
const GB: u64 = MB * 1024;
|
||||
|
||||
if bytes >= GB {
|
||||
format!("{:.2} GB", bytes as f64 / GB as f64)
|
||||
} else if bytes >= MB {
|
||||
format!("{:.2} MB", bytes as f64 / MB as f64)
|
||||
} else if bytes >= KB {
|
||||
format!("{:.2} KB", bytes as f64 / KB as f64)
|
||||
} else {
|
||||
format!("{} B", bytes)
|
||||
}
|
||||
}
|
||||
297
packages/worker/stream/src/receiver.rs
Normal file
297
packages/worker/stream/src/receiver.rs
Normal file
@@ -0,0 +1,297 @@
|
||||
use std::path::Path;
|
||||
use std::process::Stdio;
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use tokio::process::{Child, Command};
|
||||
use tracing::info;
|
||||
|
||||
pub struct ReceiverHandle {
|
||||
child: Child,
|
||||
}
|
||||
|
||||
impl ReceiverHandle {
|
||||
pub async fn stop(mut self) {
|
||||
// Send SIGTERM
|
||||
if let Err(e) = self.child.kill().await {
|
||||
tracing::warn!("Failed to kill ffmpeg: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Start ffmpeg to receive SRT stream and segment into files.
|
||||
///
|
||||
/// Output files: segment_dir/stream-YYYYMMDD-HHMMSS-NNN.ts
|
||||
pub async fn start(
|
||||
ffmpeg_path: &Path,
|
||||
srt_port: u16,
|
||||
segment_dir: &Path,
|
||||
segment_duration: u32,
|
||||
) -> Result<ReceiverHandle> {
|
||||
let srt_url = format!("srt://0.0.0.0:{srt_port}?mode=listener");
|
||||
let output_pattern = segment_dir.join("stream-%Y%m%d-%H%M%S-%%03d.ts");
|
||||
|
||||
// ffmpeg command:
|
||||
// - Listen for SRT input
|
||||
// - Copy video and audio (no re-encoding)
|
||||
// - Segment into files with timestamps
|
||||
let mut cmd = Command::new(ffmpeg_path);
|
||||
cmd.args([
|
||||
"-hide_banner",
|
||||
"-loglevel",
|
||||
"warning",
|
||||
// Input
|
||||
"-i",
|
||||
&srt_url,
|
||||
// Copy codecs (no re-encoding)
|
||||
"-c",
|
||||
"copy",
|
||||
// Segment muxer
|
||||
"-f",
|
||||
"segment",
|
||||
"-segment_time",
|
||||
&segment_duration.to_string(),
|
||||
"-segment_format",
|
||||
"mpegts",
|
||||
// Use strftime for timestamp in filename
|
||||
"-strftime",
|
||||
"1",
|
||||
// Reset timestamps for each segment
|
||||
"-reset_timestamps",
|
||||
"1",
|
||||
// Output pattern
|
||||
output_pattern.to_str().unwrap(),
|
||||
]);
|
||||
|
||||
cmd.stdin(Stdio::null())
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped());
|
||||
|
||||
info!("Starting ffmpeg receiver: {:?}", cmd);
|
||||
|
||||
let child = cmd
|
||||
.spawn()
|
||||
.with_context(|| format!("spawn ffmpeg at {}", ffmpeg_path.display()))?;
|
||||
|
||||
Ok(ReceiverHandle { child })
|
||||
}
|
||||
|
||||
/// Start ffmpeg to receive SRT stream and output HLS for web playback.
|
||||
///
|
||||
/// Generates a live HLS playlist (.m3u8) and segment files (.ts)
|
||||
/// that can be served via HTTP for web players.
|
||||
pub async fn start_hls(
|
||||
ffmpeg_path: &Path,
|
||||
srt_port: u16,
|
||||
hls_dir: &Path,
|
||||
segment_duration: u32,
|
||||
) -> Result<ReceiverHandle> {
|
||||
let srt_url = format!("srt://0.0.0.0:{srt_port}?mode=listener");
|
||||
let playlist_path = hls_dir.join("stream.m3u8");
|
||||
let segment_pattern = hls_dir.join("stream%03d.ts");
|
||||
|
||||
// Ensure HLS directory exists
|
||||
std::fs::create_dir_all(hls_dir)?;
|
||||
|
||||
// ffmpeg command for HLS output:
|
||||
// - Receive SRT input (already H.264 from Mac hardware encoder)
|
||||
// - Copy video codec (no re-encoding)
|
||||
// - Ensure audio is AAC (required for HLS)
|
||||
// - Output HLS playlist and segments
|
||||
let mut cmd = Command::new(ffmpeg_path);
|
||||
cmd.args([
|
||||
"-hide_banner",
|
||||
"-loglevel",
|
||||
"warning",
|
||||
// Input from SRT
|
||||
"-i",
|
||||
&srt_url,
|
||||
// Video: copy (no re-encoding)
|
||||
"-c:v",
|
||||
"copy",
|
||||
// Audio: AAC for HLS compatibility
|
||||
"-c:a",
|
||||
"aac",
|
||||
"-b:a",
|
||||
"128k",
|
||||
"-ar",
|
||||
"44100",
|
||||
// HLS output settings
|
||||
"-f",
|
||||
"hls",
|
||||
"-hls_time",
|
||||
&segment_duration.to_string(),
|
||||
"-hls_list_size",
|
||||
"10", // Keep 10 segments in playlist
|
||||
"-hls_flags",
|
||||
"delete_segments+append_list",
|
||||
"-hls_segment_filename",
|
||||
segment_pattern.to_str().unwrap(),
|
||||
// Output playlist
|
||||
playlist_path.to_str().unwrap(),
|
||||
]);
|
||||
|
||||
cmd.stdin(Stdio::null())
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped());
|
||||
|
||||
info!("Starting HLS output to {}", hls_dir.display());
|
||||
|
||||
let child = cmd
|
||||
.spawn()
|
||||
.with_context(|| format!("spawn ffmpeg at {}", ffmpeg_path.display()))?;
|
||||
|
||||
Ok(ReceiverHandle { child })
|
||||
}
|
||||
|
||||
/// Start ffmpeg to receive SRT stream and forward to YouTube RTMP.
|
||||
///
|
||||
/// Receives hardware-encoded H.264 from Mac, applies optional filters,
|
||||
/// and streams directly to YouTube with minimal re-encoding.
|
||||
pub async fn start_youtube(
|
||||
ffmpeg_path: &Path,
|
||||
srt_port: u16,
|
||||
rtmp_url: &str,
|
||||
stream_key: &str,
|
||||
) -> Result<ReceiverHandle> {
|
||||
let srt_url = format!("srt://0.0.0.0:{srt_port}?mode=listener");
|
||||
let youtube_url = format!("{}/{}", rtmp_url, stream_key);
|
||||
|
||||
// ffmpeg command for YouTube streaming:
|
||||
// - Receive SRT input (already H.264 from Mac hardware encoder)
|
||||
// - Copy video if already H.264, or re-encode if filtering needed
|
||||
// - Ensure audio is AAC (YouTube requirement)
|
||||
// - Output to RTMP
|
||||
let mut cmd = Command::new(ffmpeg_path);
|
||||
cmd.args([
|
||||
"-hide_banner",
|
||||
"-loglevel",
|
||||
"warning",
|
||||
// Reconnect on errors
|
||||
"-reconnect",
|
||||
"1",
|
||||
"-reconnect_streamed",
|
||||
"1",
|
||||
"-reconnect_delay_max",
|
||||
"5",
|
||||
// Input from SRT
|
||||
"-i",
|
||||
&srt_url,
|
||||
// Video: copy if already H.264 (from Mac VideoToolbox)
|
||||
"-c:v",
|
||||
"copy",
|
||||
// Audio: ensure AAC for YouTube
|
||||
"-c:a",
|
||||
"aac",
|
||||
"-b:a",
|
||||
"128k",
|
||||
"-ar",
|
||||
"44100",
|
||||
// FLV container for RTMP
|
||||
"-f",
|
||||
"flv",
|
||||
// Buffer settings for stable streaming
|
||||
"-flvflags",
|
||||
"no_duration_filesize",
|
||||
"-max_muxing_queue_size",
|
||||
"1024",
|
||||
// Output to YouTube
|
||||
&youtube_url,
|
||||
]);
|
||||
|
||||
cmd.stdin(Stdio::null())
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped());
|
||||
|
||||
info!("Starting YouTube stream to {}", rtmp_url);
|
||||
|
||||
let child = cmd
|
||||
.spawn()
|
||||
.with_context(|| format!("spawn ffmpeg at {}", ffmpeg_path.display()))?;
|
||||
|
||||
Ok(ReceiverHandle { child })
|
||||
}
|
||||
|
||||
/// Start ffmpeg to receive SRT, apply filters, and stream to YouTube.
|
||||
///
|
||||
/// This variant decodes, applies filters, and re-encodes.
|
||||
/// Use when you need video processing on the Linux side.
|
||||
pub async fn start_youtube_with_filter(
|
||||
ffmpeg_path: &Path,
|
||||
srt_port: u16,
|
||||
rtmp_url: &str,
|
||||
stream_key: &str,
|
||||
video_filter: Option<&str>,
|
||||
) -> Result<ReceiverHandle> {
|
||||
let srt_url = format!("srt://0.0.0.0:{srt_port}?mode=listener");
|
||||
let youtube_url = format!("{}/{}", rtmp_url, stream_key);
|
||||
|
||||
let mut cmd = Command::new(ffmpeg_path);
|
||||
let mut args = vec![
|
||||
"-hide_banner".to_string(),
|
||||
"-loglevel".to_string(),
|
||||
"warning".to_string(),
|
||||
"-reconnect".to_string(),
|
||||
"1".to_string(),
|
||||
"-reconnect_streamed".to_string(),
|
||||
"1".to_string(),
|
||||
"-reconnect_delay_max".to_string(),
|
||||
"5".to_string(),
|
||||
"-i".to_string(),
|
||||
srt_url,
|
||||
];
|
||||
|
||||
// Add video filter if specified
|
||||
if let Some(filter) = video_filter {
|
||||
args.extend(["-vf".to_string(), filter.to_string()]);
|
||||
}
|
||||
|
||||
// Video encoding (x264 with fast preset for low latency)
|
||||
args.extend([
|
||||
"-c:v".to_string(),
|
||||
"libx264".to_string(),
|
||||
"-preset".to_string(),
|
||||
"veryfast".to_string(),
|
||||
"-tune".to_string(),
|
||||
"zerolatency".to_string(),
|
||||
"-b:v".to_string(),
|
||||
"4500k".to_string(),
|
||||
"-maxrate".to_string(),
|
||||
"4500k".to_string(),
|
||||
"-bufsize".to_string(),
|
||||
"9000k".to_string(),
|
||||
"-g".to_string(),
|
||||
"60".to_string(), // Keyframe every 2s at 30fps
|
||||
]);
|
||||
|
||||
// Audio
|
||||
args.extend([
|
||||
"-c:a".to_string(),
|
||||
"aac".to_string(),
|
||||
"-b:a".to_string(),
|
||||
"128k".to_string(),
|
||||
"-ar".to_string(),
|
||||
"44100".to_string(),
|
||||
]);
|
||||
|
||||
// Output
|
||||
args.extend([
|
||||
"-f".to_string(),
|
||||
"flv".to_string(),
|
||||
"-flvflags".to_string(),
|
||||
"no_duration_filesize".to_string(),
|
||||
youtube_url,
|
||||
]);
|
||||
|
||||
cmd.args(&args);
|
||||
cmd.stdin(Stdio::null())
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped());
|
||||
|
||||
info!("Starting YouTube stream with filter");
|
||||
|
||||
let child = cmd
|
||||
.spawn()
|
||||
.with_context(|| format!("spawn ffmpeg at {}", ffmpeg_path.display()))?;
|
||||
|
||||
Ok(ReceiverHandle { child })
|
||||
}
|
||||
88
packages/worker/stream/src/s3.rs
Normal file
88
packages/worker/stream/src/s3.rs
Normal file
@@ -0,0 +1,88 @@
|
||||
use std::path::Path;
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use aws_sdk_s3::primitives::ByteStream;
|
||||
use aws_sdk_s3::Client;
|
||||
use chrono::{Datelike, Utc};
|
||||
use tracing::info;
|
||||
|
||||
pub struct S3Uploader {
|
||||
client: Client,
|
||||
bucket: String,
|
||||
prefix: String,
|
||||
}
|
||||
|
||||
impl S3Uploader {
|
||||
pub async fn new(bucket: &str, prefix: &str) -> Result<Self> {
|
||||
let config = aws_config::load_from_env().await;
|
||||
let client = Client::new(&config);
|
||||
|
||||
// Verify bucket access if bucket is configured
|
||||
if !bucket.is_empty() {
|
||||
client
|
||||
.head_bucket()
|
||||
.bucket(bucket)
|
||||
.send()
|
||||
.await
|
||||
.with_context(|| format!("verify access to S3 bucket: {bucket}"))?;
|
||||
info!("S3 bucket verified: {bucket}");
|
||||
}
|
||||
|
||||
Ok(Self {
|
||||
client,
|
||||
bucket: bucket.to_string(),
|
||||
prefix: prefix.trim_matches('/').to_string(),
|
||||
})
|
||||
}
|
||||
|
||||
/// Upload a file to S3. Returns the number of bytes uploaded.
|
||||
pub async fn upload_file(&self, path: &Path) -> Result<u64> {
|
||||
if self.bucket.is_empty() {
|
||||
// No bucket configured, skip upload
|
||||
info!("S3 bucket not configured, skipping upload");
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
let filename = path
|
||||
.file_name()
|
||||
.and_then(|n| n.to_str())
|
||||
.context("get filename")?;
|
||||
|
||||
// Organize by date: prefix/YYYY/MM/DD/filename
|
||||
let now = Utc::now();
|
||||
let key = format!(
|
||||
"{}/{}/{:02}/{:02}/{}",
|
||||
self.prefix,
|
||||
now.format("%Y"),
|
||||
now.month(),
|
||||
now.day(),
|
||||
filename
|
||||
);
|
||||
|
||||
let metadata = tokio::fs::metadata(path)
|
||||
.await
|
||||
.with_context(|| format!("stat {}", path.display()))?;
|
||||
let size = metadata.len();
|
||||
|
||||
let body = ByteStream::from_path(path)
|
||||
.await
|
||||
.with_context(|| format!("read {}", path.display()))?;
|
||||
|
||||
self.client
|
||||
.put_object()
|
||||
.bucket(&self.bucket)
|
||||
.key(&key)
|
||||
.body(body)
|
||||
.content_type("video/mp2t")
|
||||
.send()
|
||||
.await
|
||||
.with_context(|| format!("upload to s3://{}/{}", self.bucket, key))?;
|
||||
|
||||
info!(
|
||||
"Uploaded {} ({} bytes) to s3://{}/{}",
|
||||
filename, size, self.bucket, key
|
||||
);
|
||||
|
||||
Ok(size)
|
||||
}
|
||||
}
|
||||
100
packages/worker/stream/src/watcher.rs
Normal file
100
packages/worker/stream/src/watcher.rs
Normal file
@@ -0,0 +1,100 @@
|
||||
use std::collections::HashSet;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
|
||||
use tokio::sync::mpsc;
|
||||
use tracing::debug;
|
||||
|
||||
pub struct SegmentWatcher {
|
||||
rx: mpsc::Receiver<PathBuf>,
|
||||
#[allow(dead_code)]
|
||||
watcher: RecommendedWatcher,
|
||||
}
|
||||
|
||||
impl SegmentWatcher {
|
||||
pub fn new(dir: &Path) -> Result<Self> {
|
||||
let (tx, rx) = mpsc::channel(100);
|
||||
let seen = std::sync::Arc::new(std::sync::Mutex::new(HashSet::<PathBuf>::new()));
|
||||
|
||||
let tx_clone = tx.clone();
|
||||
let seen_clone = seen.clone();
|
||||
|
||||
let mut watcher = notify::recommended_watcher(move |res: Result<Event, _>| {
|
||||
if let Ok(event) = res {
|
||||
// We're interested in write completions and file creations
|
||||
match event.kind {
|
||||
EventKind::Create(_) | EventKind::Modify(_) => {
|
||||
for path in event.paths {
|
||||
// Only handle .ts files
|
||||
if path.extension().is_some_and(|e| e == "ts") {
|
||||
// Debounce: only send once per file
|
||||
let mut seen_guard = seen_clone.lock().unwrap();
|
||||
if !seen_guard.contains(&path) {
|
||||
// Wait a bit to ensure file is complete
|
||||
let tx = tx_clone.clone();
|
||||
let path_clone = path.clone();
|
||||
seen_guard.insert(path.clone());
|
||||
drop(seen_guard);
|
||||
|
||||
std::thread::spawn(move || {
|
||||
// Wait for file to be fully written
|
||||
std::thread::sleep(Duration::from_secs(2));
|
||||
let _ = tx.blocking_send(path_clone);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
})
|
||||
.context("create file watcher")?;
|
||||
|
||||
watcher
|
||||
.watch(dir, RecursiveMode::NonRecursive)
|
||||
.with_context(|| format!("watch directory {}", dir.display()))?;
|
||||
|
||||
// Scan for existing files
|
||||
let tx_clone = tx.clone();
|
||||
let dir_clone = dir.to_path_buf();
|
||||
tokio::spawn(async move {
|
||||
if let Ok(mut entries) = tokio::fs::read_dir(&dir_clone).await {
|
||||
while let Ok(Some(entry)) = entries.next_entry().await {
|
||||
let path = entry.path();
|
||||
if path.extension().is_some_and(|e| e == "ts") {
|
||||
// Check if file is old enough (not currently being written)
|
||||
if let Ok(meta) = tokio::fs::metadata(&path).await {
|
||||
if let Ok(modified) = meta.modified() {
|
||||
if modified.elapsed().unwrap_or_default() > Duration::from_secs(5) {
|
||||
let should_send = {
|
||||
let mut seen_guard = seen.lock().unwrap();
|
||||
if !seen_guard.contains(&path) {
|
||||
seen_guard.insert(path.clone());
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
};
|
||||
if should_send {
|
||||
let _ = tx_clone.send(path).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
debug!("Watching for segments in {}", dir.display());
|
||||
|
||||
Ok(Self { rx, watcher })
|
||||
}
|
||||
|
||||
pub async fn next_segment(&mut self) -> Option<PathBuf> {
|
||||
self.rx.recv().await
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user