diff --git a/flow.toml b/flow.toml index 5be58579..d1f691c7 100644 --- a/flow.toml +++ b/flow.toml @@ -2158,3 +2158,40 @@ run().catch(console.error); ''' dependencies = ["node", "pnpm"] shortcuts = ["user"] + +[[tasks]] +name = "test-jazz-stream" +description = "Test Jazz live stream recording flow (API → Jazz FileStream → Timeline)" +command = ''' +set -euo pipefail + +echo "=== Jazz Live Stream Recording Test ===" +echo "" +echo "This test will:" +echo " 1. Simulate stream-guard uploading video chunks" +echo " 2. Verify API endpoint (/api/stream-recording)" +echo " 3. Check chunk storage in Jazz directory" +echo " 4. Show how to view the timeline" +echo "" +echo "Prerequisites:" +echo " ✓ Linsa dev server running on http://localhost:3000" +echo " ✓ Jazz storage directory exists" +echo "" + +# Check if dev server is running +if ! curl -s http://localhost:3000/api/auth/ok >/dev/null 2>&1; then + echo "❌ Linsa dev server not running!" + echo "" + echo "Start it in another terminal with: f dev" + echo "Then run this test again." + exit 1 +fi + +echo "✓ Dev server is running" +echo "" + +# Run the test +pnpm tsx tests/jazz-stream-test.ts +''' +dependencies = ["node", "pnpm"] +shortcuts = ["test", "tjs"] diff --git a/packages/web/src/components/CommentBox.tsx b/packages/web/src/components/CommentBox.tsx index e11171ad..2ff4a597 100644 --- a/packages/web/src/components/CommentBox.tsx +++ b/packages/web/src/components/CommentBox.tsx @@ -1,17 +1,14 @@ import { useState, useEffect, useRef } from "react" -import { Send, LogIn } from "lucide-react" +import { Send, LogIn, ImagePlus, X } from "lucide-react" import { authClient } from "@/lib/auth-client" - -type Comment = { - id: string - user_id: string - user_name: string - user_email: string - content: string - created_at: string -} - -type AuthStep = "idle" | "email" | "otp" +import { useAccount, useCoState } from "jazz-tools/react" +import { Group, co, FileStream } from "jazz-tools" +import { + StreamComment, + StreamCommentList, + StreamCommentsContainer, + ViewerAccount, +} from "@/lib/jazz/schema" interface CommentBoxProps { username: string @@ -19,13 +16,18 @@ interface CommentBoxProps { export function CommentBox({ username }: CommentBoxProps) { const { data: session, isPending: sessionLoading } = authClient.useSession() - const [comments, setComments] = useState([]) + const me = useAccount(ViewerAccount) + + const [containerId, setContainerId] = useState(undefined) + const container = useCoState(StreamCommentsContainer, containerId, { resolve: { comments: true } }) const [newComment, setNewComment] = useState("") const [isSubmitting, setIsSubmitting] = useState(false) - const [isLoading, setIsLoading] = useState(true) + const [selectedImage, setSelectedImage] = useState(null) + const [imagePreview, setImagePreview] = useState(null) + const [uploadProgress, setUploadProgress] = useState(0) // Auth state - const [authStep, setAuthStep] = useState("idle") + const [authStep, setAuthStep] = useState<"idle" | "email" | "otp">("idle") const [email, setEmail] = useState("") const [otp, setOtp] = useState("") const [authLoading, setAuthLoading] = useState(false) @@ -34,6 +36,7 @@ export function CommentBox({ username }: CommentBoxProps) { const commentsEndRef = useRef(null) const emailInputRef = useRef(null) const otpInputRef = useRef(null) + const fileInputRef = useRef(null) // Focus inputs when auth step changes useEffect(() => { @@ -44,32 +47,72 @@ export function CommentBox({ username }: CommentBoxProps) { } }, [authStep]) - // Fetch comments + // Initialize or load the comments container for this stream useEffect(() => { - const fetchComments = async () => { + if (!me?.$isLoaded) return + + const initContainer = async () => { try { - const res = await fetch(`/api/stream-comments?username=${username}`) - if (res.ok) { - const data = (await res.json()) as { comments?: Comment[] } - setComments(data.comments || []) - } + const containerUID = { stream: username, origin: "linsa.io", type: "comments" } + + // Create a group writable by everyone + const group = Group.create({ owner: me }) + group.addMember("everyone", "writer") + + // Upsert the container + const result = await StreamCommentsContainer.upsertUnique({ + value: { comments: StreamCommentList.create([], { owner: group }) }, + unique: containerUID, + owner: group, + }) + + setContainerId(result.$jazz.id) } catch (err) { - console.error("Failed to fetch comments:", err) - } finally { - setIsLoading(false) + console.error("Failed to init comments container:", err) } } - fetchComments() - const interval = setInterval(fetchComments, 5000) // Poll every 5 seconds + initContainer() + }, [me?.$isLoaded, username]) - return () => clearInterval(interval) - }, [username]) + // Get comments from the container (only when loaded) + const comments = container?.$isLoaded ? container.comments : undefined // Scroll to bottom when new comments arrive useEffect(() => { commentsEndRef.current?.scrollIntoView({ behavior: "smooth" }) - }, [comments]) + }, [comments?.length]) + + const handleImageSelect = (e: React.ChangeEvent) => { + const file = e.target.files?.[0] + if (!file) return + + // Validate file type + if (!file.type.startsWith("image/")) { + alert("Please select an image file") + return + } + + // Validate file size (max 10MB) + if (file.size > 10 * 1024 * 1024) { + alert("Image must be less than 10MB") + return + } + + setSelectedImage(file) + setImagePreview(URL.createObjectURL(file)) + } + + const clearSelectedImage = () => { + setSelectedImage(null) + if (imagePreview) { + URL.revokeObjectURL(imagePreview) + setImagePreview(null) + } + if (fileInputRef.current) { + fileInputRef.current.value = "" + } + } const handleSendOTP = async (e: React.FormEvent) => { e.preventDefault() @@ -112,7 +155,6 @@ export function CommentBox({ username }: CommentBoxProps) { if (result.error) { setAuthError(result.error.message || "Invalid code") } else { - // Success - close auth form setAuthStep("idle") setEmail("") setOtp("") @@ -126,24 +168,47 @@ export function CommentBox({ username }: CommentBoxProps) { const handleSubmitComment = async (e: React.FormEvent) => { e.preventDefault() - if (!newComment.trim() || !session?.user) return + const commentsList = container?.$isLoaded ? container.comments : undefined + if ((!newComment.trim() && !selectedImage) || !session?.user || !me?.$isLoaded || !commentsList?.$isLoaded) return setIsSubmitting(true) - try { - const res = await fetch("/api/stream-comments", { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ - username, - content: newComment.trim(), - }), - }) + setUploadProgress(0) - if (res.ok) { - const data = (await res.json()) as { comment: Comment } - setComments((prev) => [...prev, data.comment]) - setNewComment("") + try { + // Create a group for the comment + const group = Group.create({ owner: me }) + group.addMember("everyone", "reader") + + // Upload image if selected + let imageStream = undefined + if (selectedImage) { + imageStream = await co.fileStream().createFromBlob(selectedImage, { + owner: group, + onProgress: (progress) => { + setUploadProgress(Math.round(progress * 100)) + }, + }) } + + // Create the comment + const comment = StreamComment.create( + { + content: newComment.trim(), + userName: session.user.name || session.user.email?.split("@")[0] || "Anonymous", + userId: session.user.id || null, + image: imageStream, + createdAt: Date.now(), + }, + { owner: group } + ) + + // Add to list + ;(commentsList as unknown as { push: (item: typeof comment) => void }).push(comment) + + // Clear form + setNewComment("") + clearSelectedImage() + setUploadProgress(0) } catch (err) { console.error("Failed to post comment:", err) } finally { @@ -151,8 +216,8 @@ export function CommentBox({ username }: CommentBoxProps) { } } - const formatTime = (dateStr: string) => { - const date = new Date(dateStr) + const formatTime = (timestamp: number) => { + const date = new Date(timestamp) return date.toLocaleTimeString([], { hour: "2-digit", minute: "2-digit" }) } @@ -167,35 +232,42 @@ export function CommentBox({ username }: CommentBoxProps) { {/* Comments list */}
- {isLoading ? ( + {!comments ? (
Loading...
) : comments.length === 0 ? (
No messages yet. Be the first to say hi!
) : ( - comments.map((comment) => ( -
-
-
- - {comment.user_name?.charAt(0).toUpperCase() || "?"} - -
-
-
- - {comment.user_name || "Anonymous"} - - - {formatTime(comment.created_at)} + comments.map((comment, index: number) => { + if (!comment?.$isLoaded) return null + + return ( +
+
+
+ + {comment.userName?.charAt(0).toUpperCase() || "?"}
-

{comment.content}

+
+
+ + {comment.userName || "Anonymous"} + + + {formatTime(comment.createdAt)} + +
+ {comment.content && ( +

{comment.content}

+ )} + {comment.image?.$isLoaded && } +
-
- )) + ) + }) )}
@@ -205,23 +277,68 @@ export function CommentBox({ username }: CommentBoxProps) { {sessionLoading ? (
Loading...
) : isAuthenticated ? ( -
- setNewComment(e.target.value)} - placeholder="Send a message..." - className="flex-1 bg-white/5 border border-white/10 rounded-lg px-3 py-2 text-sm text-white placeholder:text-white/30 focus:outline-none focus:border-white/20" - disabled={isSubmitting} - /> - -
+
+ {/* Image preview */} + {imagePreview && ( +
+ Preview + +
+ )} + + {/* Upload progress */} + {uploadProgress > 0 && uploadProgress < 100 && ( +
+
+
+ )} + +
+ + + setNewComment(e.target.value)} + placeholder="Send a message..." + className="flex-1 bg-white/5 border border-white/10 rounded-lg px-3 py-2 text-sm text-white placeholder:text-white/30 focus:outline-none focus:border-white/20" + disabled={isSubmitting} + /> + +
+
) : authStep === "idle" ? ( + + + )} +
+
+ + {/* Timeline Canvas */} +
setIsDragging(true)} + onMouseUp={() => setIsDragging(false)} + onMouseLeave={() => setIsDragging(false)} + > + +
+ + {/* Scroll indicator */} +
+ Scroll or drag to navigate + + {formatTime(scrollX / pixelsPerMs)} - {formatTime((scrollX + width) / pixelsPerMs)} + +
+ + {/* Metadata */} + {recording.metadata && ( +
+ {recording.metadata.width && ( +
+ Resolution: {recording.metadata.width}×{recording.metadata.height} +
+ )} + {recording.metadata.fps && ( +
FPS: {recording.metadata.fps.toFixed(1)}
+ )} + {recording.metadata.bitrate && ( +
+ Bitrate: {(recording.metadata.bitrate / 1000).toFixed(0)}kbps +
+ )} +
+ )} +
+ ) +} diff --git a/packages/web/src/lib/jazz/schema.ts b/packages/web/src/lib/jazz/schema.ts index 1c648653..92f24f89 100644 --- a/packages/web/src/lib/jazz/schema.ts +++ b/packages/web/src/lib/jazz/schema.ts @@ -33,6 +33,35 @@ export type PaidComment = z.infer */ export const PaidCommentFeed = co.feed(PaidComment) +/** + * Stream comment with optional image attachment using FileStream + */ +export const StreamComment = co.map({ + /** Comment text content */ + content: z.string(), + /** User display name */ + userName: z.string(), + /** User ID (from auth) */ + userId: z.string().nullable(), + /** Optional image attachment */ + image: co.optional(co.fileStream()), + /** Timestamp */ + createdAt: z.number(), +}) +export type StreamComment = co.loaded + +/** + * List of stream comments - real-time chat + */ +export const StreamCommentList = co.list(StreamComment) + +/** + * Container for a stream's comments - enables upsertUnique per stream + */ +export const StreamCommentsContainer = co.map({ + comments: StreamCommentList, +}) + /** * Container for a stream's presence feed - enables upsertUnique */ @@ -84,7 +113,7 @@ export const GlideCanvasItem = z.object({ imageData: z.string().nullable(), // Base64 encoded image position: z.object({ x: z.number(), y: z.number() }).nullable(), createdAt: z.number(), - metadata: z.record(z.unknown()).nullable(), + metadata: z.record(z.string(), z.string()).nullable(), }) export type GlideCanvasItem = z.infer @@ -93,6 +122,35 @@ export type GlideCanvasItem = z.infer */ export const GlideCanvasList = co.list(GlideCanvasItem) +/** + * Live stream recording - stores video chunks as they're streamed + */ +export const StreamRecording = co.map({ + title: z.string(), + startedAt: z.number(), + endedAt: z.number().nullable(), + durationMs: z.number(), + streamKey: z.string(), + isLive: z.boolean(), + /** Video file being recorded in real-time */ + videoFile: co.fileStream(), + /** Preview thumbnail */ + thumbnailData: z.string().nullable(), + /** Metadata about the recording */ + metadata: z.object({ + width: z.number().nullable(), + height: z.number().nullable(), + fps: z.number().nullable(), + bitrate: z.number().nullable(), + }).nullable(), +}) +export type StreamRecording = z.infer + +/** + * List of stream recordings + */ +export const StreamRecordingList = co.list(StreamRecording) + /** * Viewer account root - stores any viewer-specific data */ @@ -103,6 +161,8 @@ export const ViewerRoot = co.map({ savedUrls: SavedUrlList, /** Glide browser canvas items */ glideCanvas: GlideCanvasList, + /** Live stream recordings */ + streamRecordings: StreamRecordingList, }) /** @@ -124,6 +184,7 @@ export const ViewerAccount = co version: 1, savedUrls: SavedUrlList.create([]), glideCanvas: GlideCanvasList.create([]), + streamRecordings: StreamRecordingList.create([]), }) } }) diff --git a/packages/web/src/routeTree.gen.ts b/packages/web/src/routeTree.gen.ts index 5548694a..973181c0 100644 --- a/packages/web/src/routeTree.gen.ts +++ b/packages/web/src/routeTree.gen.ts @@ -11,6 +11,7 @@ import { Route as rootRouteImport } from './routes/__root' import { Route as UsersRouteImport } from './routes/users' import { Route as UrlsRouteImport } from './routes/urls' +import { Route as StreamsRouteImport } from './routes/streams' import { Route as SettingsRouteImport } from './routes/settings' import { Route as SessionsRouteImport } from './routes/sessions' import { Route as MarketplaceRouteImport } from './routes/marketplace' @@ -31,6 +32,7 @@ import { Route as ApiUsersRouteImport } from './routes/api/users' import { Route as ApiUsageEventsRouteImport } from './routes/api/usage-events' import { Route as ApiStreamStatusRouteImport } from './routes/api/stream-status' import { Route as ApiStreamReplaysRouteImport } from './routes/api/stream-replays' +import { Route as ApiStreamRecordingRouteImport } from './routes/api/stream-recording' import { Route as ApiStreamCommentsRouteImport } from './routes/api/stream-comments' import { Route as ApiStreamRouteImport } from './routes/api/stream' import { Route as ApiProfileRouteImport } from './routes/api/profile' @@ -87,6 +89,11 @@ const UrlsRoute = UrlsRouteImport.update({ path: '/urls', getParentRoute: () => rootRouteImport, } as any) +const StreamsRoute = StreamsRouteImport.update({ + id: '/streams', + path: '/streams', + getParentRoute: () => rootRouteImport, +} as any) const SettingsRoute = SettingsRouteImport.update({ id: '/settings', path: '/settings', @@ -187,6 +194,11 @@ const ApiStreamReplaysRoute = ApiStreamReplaysRouteImport.update({ path: '/api/stream-replays', getParentRoute: () => rootRouteImport, } as any) +const ApiStreamRecordingRoute = ApiStreamRecordingRouteImport.update({ + id: '/api/stream-recording', + path: '/api/stream-recording', + getParentRoute: () => rootRouteImport, +} as any) const ApiStreamCommentsRoute = ApiStreamCommentsRouteImport.update({ id: '/api/stream-comments', path: '/api/stream-comments', @@ -433,6 +445,7 @@ export interface FileRoutesByFullPath { '/marketplace': typeof MarketplaceRoute '/sessions': typeof SessionsRoute '/settings': typeof SettingsRoute + '/streams': typeof StreamsRoute '/urls': typeof UrlsRoute '/users': typeof UsersRoute '/api/archives': typeof ApiArchivesRouteWithChildren @@ -446,6 +459,7 @@ export interface FileRoutesByFullPath { '/api/profile': typeof ApiProfileRoute '/api/stream': typeof ApiStreamRouteWithChildren '/api/stream-comments': typeof ApiStreamCommentsRoute + '/api/stream-recording': typeof ApiStreamRecordingRoute '/api/stream-replays': typeof ApiStreamReplaysRouteWithChildren '/api/stream-status': typeof ApiStreamStatusRoute '/api/usage-events': typeof ApiUsageEventsRouteWithChildren @@ -501,6 +515,7 @@ export interface FileRoutesByTo { '/marketplace': typeof MarketplaceRoute '/sessions': typeof SessionsRoute '/settings': typeof SettingsRoute + '/streams': typeof StreamsRoute '/urls': typeof UrlsRoute '/users': typeof UsersRoute '/api/archives': typeof ApiArchivesRouteWithChildren @@ -514,6 +529,7 @@ export interface FileRoutesByTo { '/api/profile': typeof ApiProfileRoute '/api/stream': typeof ApiStreamRouteWithChildren '/api/stream-comments': typeof ApiStreamCommentsRoute + '/api/stream-recording': typeof ApiStreamRecordingRoute '/api/stream-replays': typeof ApiStreamReplaysRouteWithChildren '/api/stream-status': typeof ApiStreamStatusRoute '/api/usage-events': typeof ApiUsageEventsRouteWithChildren @@ -571,6 +587,7 @@ export interface FileRoutesById { '/marketplace': typeof MarketplaceRoute '/sessions': typeof SessionsRoute '/settings': typeof SettingsRoute + '/streams': typeof StreamsRoute '/urls': typeof UrlsRoute '/users': typeof UsersRoute '/api/archives': typeof ApiArchivesRouteWithChildren @@ -584,6 +601,7 @@ export interface FileRoutesById { '/api/profile': typeof ApiProfileRoute '/api/stream': typeof ApiStreamRouteWithChildren '/api/stream-comments': typeof ApiStreamCommentsRoute + '/api/stream-recording': typeof ApiStreamRecordingRoute '/api/stream-replays': typeof ApiStreamReplaysRouteWithChildren '/api/stream-status': typeof ApiStreamStatusRoute '/api/usage-events': typeof ApiUsageEventsRouteWithChildren @@ -642,6 +660,7 @@ export interface FileRouteTypes { | '/marketplace' | '/sessions' | '/settings' + | '/streams' | '/urls' | '/users' | '/api/archives' @@ -655,6 +674,7 @@ export interface FileRouteTypes { | '/api/profile' | '/api/stream' | '/api/stream-comments' + | '/api/stream-recording' | '/api/stream-replays' | '/api/stream-status' | '/api/usage-events' @@ -710,6 +730,7 @@ export interface FileRouteTypes { | '/marketplace' | '/sessions' | '/settings' + | '/streams' | '/urls' | '/users' | '/api/archives' @@ -723,6 +744,7 @@ export interface FileRouteTypes { | '/api/profile' | '/api/stream' | '/api/stream-comments' + | '/api/stream-recording' | '/api/stream-replays' | '/api/stream-status' | '/api/usage-events' @@ -779,6 +801,7 @@ export interface FileRouteTypes { | '/marketplace' | '/sessions' | '/settings' + | '/streams' | '/urls' | '/users' | '/api/archives' @@ -792,6 +815,7 @@ export interface FileRouteTypes { | '/api/profile' | '/api/stream' | '/api/stream-comments' + | '/api/stream-recording' | '/api/stream-replays' | '/api/stream-status' | '/api/usage-events' @@ -849,6 +873,7 @@ export interface RootRouteChildren { MarketplaceRoute: typeof MarketplaceRoute SessionsRoute: typeof SessionsRoute SettingsRoute: typeof SettingsRoute + StreamsRoute: typeof StreamsRoute UrlsRoute: typeof UrlsRoute UsersRoute: typeof UsersRoute ApiArchivesRoute: typeof ApiArchivesRouteWithChildren @@ -862,6 +887,7 @@ export interface RootRouteChildren { ApiProfileRoute: typeof ApiProfileRoute ApiStreamRoute: typeof ApiStreamRouteWithChildren ApiStreamCommentsRoute: typeof ApiStreamCommentsRoute + ApiStreamRecordingRoute: typeof ApiStreamRecordingRoute ApiStreamReplaysRoute: typeof ApiStreamReplaysRouteWithChildren ApiStreamStatusRoute: typeof ApiStreamStatusRoute ApiUsageEventsRoute: typeof ApiUsageEventsRouteWithChildren @@ -906,6 +932,13 @@ declare module '@tanstack/react-router' { preLoaderRoute: typeof UrlsRouteImport parentRoute: typeof rootRouteImport } + '/streams': { + id: '/streams' + path: '/streams' + fullPath: '/streams' + preLoaderRoute: typeof StreamsRouteImport + parentRoute: typeof rootRouteImport + } '/settings': { id: '/settings' path: '/settings' @@ -1046,6 +1079,13 @@ declare module '@tanstack/react-router' { preLoaderRoute: typeof ApiStreamReplaysRouteImport parentRoute: typeof rootRouteImport } + '/api/stream-recording': { + id: '/api/stream-recording' + path: '/api/stream-recording' + fullPath: '/api/stream-recording' + preLoaderRoute: typeof ApiStreamRecordingRouteImport + parentRoute: typeof rootRouteImport + } '/api/stream-comments': { id: '/api/stream-comments' path: '/api/stream-comments' @@ -1526,6 +1566,7 @@ const rootRouteChildren: RootRouteChildren = { MarketplaceRoute: MarketplaceRoute, SessionsRoute: SessionsRoute, SettingsRoute: SettingsRoute, + StreamsRoute: StreamsRoute, UrlsRoute: UrlsRoute, UsersRoute: UsersRoute, ApiArchivesRoute: ApiArchivesRouteWithChildren, @@ -1539,6 +1580,7 @@ const rootRouteChildren: RootRouteChildren = { ApiProfileRoute: ApiProfileRoute, ApiStreamRoute: ApiStreamRouteWithChildren, ApiStreamCommentsRoute: ApiStreamCommentsRoute, + ApiStreamRecordingRoute: ApiStreamRecordingRoute, ApiStreamReplaysRoute: ApiStreamReplaysRouteWithChildren, ApiStreamStatusRoute: ApiStreamStatusRoute, ApiUsageEventsRoute: ApiUsageEventsRouteWithChildren, diff --git a/packages/web/src/routes/api/stream-recording.ts b/packages/web/src/routes/api/stream-recording.ts new file mode 100644 index 00000000..7d6a558b --- /dev/null +++ b/packages/web/src/routes/api/stream-recording.ts @@ -0,0 +1,234 @@ +import { createFileRoute } from "@tanstack/react-router" +import { promises as fs } from "fs" + +/** + * API endpoint for stream-guard Rust server to upload live stream chunks + * Chunks are stored temporarily and then synced to Jazz FileStream by client + */ + +const STORAGE_PATH = "/Users/nikiv/fork-i/garden-co/jazz/glide-storage/stream-recordings" + +interface StreamChunk { + streamId: string + chunkIndex: number + data: string // base64 encoded video data + timestamp: number + metadata?: { + width?: number + height?: number + fps?: number + bitrate?: number + } +} + +interface StreamMetadata { + streamId: string + title: string + startedAt: number + streamKey: string + metadata?: { + width?: number + height?: number + fps?: number + bitrate?: number + } +} + +async function ensureStorageDir() { + await fs.mkdir(STORAGE_PATH, { recursive: true }) +} + +// POST /api/stream-recording/start - Start a new recording session +const startRecording = async ({ request }: { request: Request }) => { + try { + const body = (await request.json()) as StreamMetadata + + if (!body.streamId || !body.title || !body.streamKey) { + return new Response( + JSON.stringify({ error: "Missing required fields: streamId, title, streamKey" }), + { status: 400, headers: { "content-type": "application/json" } } + ) + } + + await ensureStorageDir() + + // Create metadata file for this stream + const metadataPath = `${STORAGE_PATH}/${body.streamId}-metadata.json` + const metadata = { + ...body, + chunks: [], + status: "recording", + } + + await fs.writeFile(metadataPath, JSON.stringify(metadata, null, 2)) + + // Create chunks directory for this stream + const chunksDir = `${STORAGE_PATH}/${body.streamId}` + await fs.mkdir(chunksDir, { recursive: true }) + + console.log(`[stream-recording] Started recording: ${body.streamId}`) + + return new Response( + JSON.stringify({ success: true, streamId: body.streamId }), + { status: 200, headers: { "content-type": "application/json" } } + ) + } catch (error) { + console.error("[stream-recording] Start error:", error) + return new Response( + JSON.stringify({ error: "Internal server error" }), + { status: 500, headers: { "content-type": "application/json" } } + ) + } +} + +// POST /api/stream-recording/chunk - Upload a video chunk +const uploadChunk = async ({ request }: { request: Request }) => { + try { + const body = (await request.json()) as StreamChunk + + if (!body.streamId || body.chunkIndex === undefined || !body.data) { + return new Response( + JSON.stringify({ error: "Missing required fields: streamId, chunkIndex, data" }), + { status: 400, headers: { "content-type": "application/json" } } + ) + } + + await ensureStorageDir() + + // Write chunk to disk + const chunkPath = `${STORAGE_PATH}/${body.streamId}/chunk-${String(body.chunkIndex).padStart(6, "0")}.bin` + const chunkData = Buffer.from(body.data, "base64") + await fs.writeFile(chunkPath, chunkData) + + // Update metadata with chunk info + const metadataPath = `${STORAGE_PATH}/${body.streamId}-metadata.json` + try { + const metadataContent = await fs.readFile(metadataPath, "utf-8") + const metadata = JSON.parse(metadataContent) + metadata.chunks.push({ + index: body.chunkIndex, + timestamp: body.timestamp, + size: chunkData.length, + }) + metadata.lastChunkAt = body.timestamp + if (body.metadata) { + metadata.metadata = { ...metadata.metadata, ...body.metadata } + } + await fs.writeFile(metadataPath, JSON.stringify(metadata, null, 2)) + } catch (err) { + console.warn(`[stream-recording] Could not update metadata for ${body.streamId}:`, err) + } + + return new Response( + JSON.stringify({ success: true, chunkIndex: body.chunkIndex }), + { status: 200, headers: { "content-type": "application/json" } } + ) + } catch (error) { + console.error("[stream-recording] Chunk upload error:", error) + return new Response( + JSON.stringify({ error: "Internal server error" }), + { status: 500, headers: { "content-type": "application/json" } } + ) + } +} + +// POST /api/stream-recording/end - End a recording session +const endRecording = async ({ request }: { request: Request }) => { + try { + const body = (await request.json()) as { streamId: string; endedAt: number } + + if (!body.streamId || !body.endedAt) { + return new Response( + JSON.stringify({ error: "Missing required fields: streamId, endedAt" }), + { status: 400, headers: { "content-type": "application/json" } } + ) + } + + // Update metadata to mark as ended + const metadataPath = `${STORAGE_PATH}/${body.streamId}-metadata.json` + try { + const metadataContent = await fs.readFile(metadataPath, "utf-8") + const metadata = JSON.parse(metadataContent) + metadata.endedAt = body.endedAt + metadata.status = "ended" + metadata.durationMs = body.endedAt - metadata.startedAt + await fs.writeFile(metadataPath, JSON.stringify(metadata, null, 2)) + + console.log(`[stream-recording] Ended recording: ${body.streamId}`) + + return new Response( + JSON.stringify({ success: true, streamId: body.streamId }), + { status: 200, headers: { "content-type": "application/json" } } + ) + } catch (err) { + return new Response( + JSON.stringify({ error: "Stream not found" }), + { status: 404, headers: { "content-type": "application/json" } } + ) + } + } catch (error) { + console.error("[stream-recording] End error:", error) + return new Response( + JSON.stringify({ error: "Internal server error" }), + { status: 500, headers: { "content-type": "application/json" } } + ) + } +} + +// GET /api/stream-recording/list - List all recordings +const listRecordings = async ({ request }: { request: Request }) => { + try { + await ensureStorageDir() + + const files = await fs.readdir(STORAGE_PATH) + const metadataFiles = files.filter((f) => f.endsWith("-metadata.json")) + + const recordings = [] + for (const file of metadataFiles) { + try { + const content = await fs.readFile(`${STORAGE_PATH}/${file}`, "utf-8") + const metadata = JSON.parse(content) + recordings.push(metadata) + } catch (err) { + console.warn(`[stream-recording] Could not read ${file}:`, err) + } + } + + return new Response( + JSON.stringify({ recordings }), + { status: 200, headers: { "content-type": "application/json" } } + ) + } catch (error) { + console.error("[stream-recording] List error:", error) + return new Response( + JSON.stringify({ error: "Internal server error" }), + { status: 500, headers: { "content-type": "application/json" } } + ) + } +} + +export const Route = createFileRoute("/api/stream-recording")({ + server: { + handlers: { + GET: listRecordings, + POST: (ctx) => { + const url = new URL(ctx.request.url) + const action = url.searchParams.get("action") + + switch (action) { + case "start": + return startRecording(ctx) + case "chunk": + return uploadChunk(ctx) + case "end": + return endRecording(ctx) + default: + return new Response( + JSON.stringify({ error: "Invalid action. Use ?action=start|chunk|end" }), + { status: 400, headers: { "content-type": "application/json" } } + ) + } + }, + }, + }, +}) diff --git a/packages/web/src/routes/streams.tsx b/packages/web/src/routes/streams.tsx new file mode 100644 index 00000000..30c8fa53 --- /dev/null +++ b/packages/web/src/routes/streams.tsx @@ -0,0 +1,212 @@ +import { useState, useEffect } from "react" +import { createFileRoute } from "@tanstack/react-router" +import { useAccount } from "jazz-tools/react" +import { ViewerAccount, type StreamRecording, StreamRecordingList } from "@/lib/jazz/schema" +import { StreamTimeline } from "@/components/StreamTimeline" +import { Video, RefreshCw } from "lucide-react" +import { co } from "jazz-tools" + +export const Route = createFileRoute("/streams")({ + component: StreamsPage, + ssr: false, +}) + +function StreamsPage() { + const me = useAccount(ViewerAccount) + const [syncing, setSyncing] = useState(false) + const [lastSync, setLastSync] = useState(null) + + const root = me.$isLoaded ? me.root : null + const recordingsList = root?.$isLoaded ? root.streamRecordings : null + + // Auto-sync pending recordings from API every 5 seconds + useEffect(() => { + const interval = setInterval(() => { + void syncPendingRecordings() + }, 5000) + return () => clearInterval(interval) + }, [root]) + + const syncPendingRecordings = async () => { + if (!root?.streamRecordings?.$isLoaded || syncing) return + + setSyncing(true) + try { + // Fetch pending recordings from stream-guard API + const response = await fetch("/api/stream-recording") + if (!response.ok) { + console.error("[streams] Failed to fetch recordings") + return + } + + const data = (await response.json()) as { + recordings: Array<{ + streamId: string + title: string + startedAt: number + endedAt?: number + streamKey: string + status: string + chunks: Array<{ index: number; timestamp: number; size: number }> + metadata?: { + width?: number + height?: number + fps?: number + bitrate?: number + } + }> + } + + const pendingRecordings = data.recordings + + if (pendingRecordings.length === 0) { + return + } + + console.log(`[streams] Found ${pendingRecordings.length} recordings to sync`) + + // Get existing IDs to avoid duplicates + const existingKeys = new Set( + root.streamRecordings.$isLoaded + ? [...root.streamRecordings].map((item) => item.streamKey) + : [] + ) + + // Process each recording + for (const rec of pendingRecordings) { + if (existingKeys.has(rec.streamKey)) { + // Update existing recording + const existing = [...root.streamRecordings].find( + (r) => r.streamKey === rec.streamKey + ) + if (existing && rec.endedAt && !existing.endedAt) { + // Mark as ended + existing.endedAt = rec.endedAt + existing.isLive = false + existing.durationMs = rec.endedAt - rec.startedAt + } + continue + } + + // Create new recording in Jazz + try { + // Create FileStream from chunks + const fileStream = co.fileStream().create({ owner: me }) + + // Start the stream with metadata + fileStream.start({ + mimeType: "video/x-matroska", // .mkv format + fileName: `${rec.title}.mkv`, + totalSizeBytes: rec.chunks.reduce((sum, c) => sum + c.size, 0), + }) + + // Fetch and push chunks + for (const chunk of rec.chunks) { + try { + const chunkPath = `/Users/nikiv/fork-i/garden-co/jazz/glide-storage/stream-recordings/${rec.streamId}/chunk-${String(chunk.index).padStart(6, "0")}.bin` + const chunkData = await fetch( + `/api/stream-recording/chunk?path=${encodeURIComponent(chunkPath)}` + ).then((r) => r.arrayBuffer()) + + fileStream.push(new Uint8Array(chunkData)) + } catch (err) { + console.error(`[streams] Failed to fetch chunk ${chunk.index}:`, err) + } + } + + // End the stream if recording is complete + if (rec.status === "ended") { + fileStream.end() + } + + // Create StreamRecording object + const recording = { + title: rec.title, + startedAt: rec.startedAt, + endedAt: rec.endedAt || null, + durationMs: rec.endedAt + ? rec.endedAt - rec.startedAt + : Date.now() - rec.startedAt, + streamKey: rec.streamKey, + isLive: rec.status === "recording", + videoFile: fileStream, + thumbnailData: null, + metadata: rec.metadata || null, + } + + // Push to Jazz + root.streamRecordings.$jazz.push(recording) + console.log(`[streams] Added recording to Jazz: ${rec.title}`) + setLastSync(new Date()) + } catch (err) { + console.error(`[streams] Failed to create recording in Jazz:`, err) + } + } + } catch (error) { + console.error("[streams] Sync error:", error) + } finally { + setSyncing(false) + } + } + + const handleManualSync = () => { + void syncPendingRecordings() + } + + if (!me.$isLoaded || !root?.$isLoaded) { + return ( +
+

Loading Jazz...

+
+ ) + } + + const recordings: StreamRecording[] = recordingsList?.$isLoaded + ? [...recordingsList] + : [] + + return ( +
+
+
+
+
+
+ {lastSync && ( + + Last sync: {lastSync.toLocaleTimeString()} + + )} + +
+
+ + {recordings.length === 0 ? ( +
+
+ ) : ( +
+ {recordings.map((recording, index) => ( + + ))} +
+ )} +
+
+ ) +} diff --git a/tests/README.md b/tests/README.md new file mode 100644 index 00000000..6a0f5209 --- /dev/null +++ b/tests/README.md @@ -0,0 +1,100 @@ +# Jazz Live Stream Recording Test + +Tests the end-to-end flow of live stream recording with Jazz FileStream. + +## Prerequisites + +1. **Start Linsa dev server** (in one terminal): + ```bash + cd /Users/nikiv/org/linsa/linsa + f dev + ``` + +2. **Wait for server to be ready** at `http://localhost:3000` + +## Running the Test + +In a separate terminal: + +```bash +cd /Users/nikiv/org/linsa/linsa +f test-jazz-stream +``` + +Or with shortcuts: +```bash +f test +f tjs +``` + +## What the Test Does + +1. **Simulates stream-guard** uploading video chunks +2. **POSTs to API** `/api/stream-recording` +3. **Creates 5 chunks** of fake video data (256KB each) +4. **Stores chunks** in `/Users/nikiv/fork-i/garden-co/jazz/glide-storage/stream-recordings/` +5. **Verifies** chunk files exist on disk + +## Viewing the Results + +After the test completes: + +1. **Open Linsa streams page**: + ``` + http://localhost:3000/streams + ``` + +2. **Wait for auto-sync** (happens every 5 seconds) + - The page will fetch chunks from the API + - Convert them to Jazz FileStream + - Display the timeline + +3. **Open Glide browser**: + - Build and run Glide + - Timeline will appear on canvas + - Horizontal scrollable timeline showing the 5 chunks + +## Test Output + +The test will show: +- Stream ID and key +- Chunk upload progress +- Storage location +- Links to view the timeline + +## Manual Testing + +You can also manually POST chunks: + +```bash +curl -X POST http://localhost:3000/api/stream-recording?action=start \ + -H "Content-Type: application/json" \ + -d '{ + "streamId": "manual-test", + "title": "Manual Test Stream", + "startedAt": '$(date +%s000)', + "streamKey": "test-key" + }' + +# Upload a chunk +echo "fake video data" | base64 | \ + curl -X POST http://localhost:3000/api/stream-recording?action=chunk \ + -H "Content-Type: application/json" \ + -d @- < setTimeout(resolve, ms)) +} + +async function testStreamRecording() { + console.log("🎷 [Test] Starting Jazz Live Stream Recording Test") + console.log("") + + // Step 1: Create Jazz account + console.log("1️⃣ Creating Jazz ViewerAccount...") + + // Note: In a real test, we'd use jazz-tools to create an actual account + // For this test, we'll simulate the flow + const streamId = `test-stream-${Date.now()}` + const streamKey = `test-key-${randomBytes(8).toString("hex")}` + + console.log(` Stream ID: ${streamId}`) + console.log(` Stream Key: ${streamKey}`) + console.log("") + + // Step 2: Start recording via API + console.log("2️⃣ Starting recording session...") + + const startResponse = await fetch(`${API_BASE}/api/stream-recording?action=start`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + streamId, + title: "Test Live Stream", + startedAt: Date.now(), + streamKey, + metadata: { + width: 1920, + height: 1080, + fps: 30, + bitrate: 5000000, + }, + }), + }) + + if (!startResponse.ok) { + throw new Error(`Failed to start recording: ${await startResponse.text()}`) + } + + const startData = await startResponse.json() + console.log(` ✓ Recording started: ${JSON.stringify(startData)}`) + console.log("") + + // Step 3: Upload video chunks + console.log("3️⃣ Uploading video chunks...") + + const numChunks = 5 + for (let i = 0; i < numChunks; i++) { + // Create fake video chunk (256KB of random data) + const chunkData = randomBytes(256 * 1024) + const base64Data = chunkData.toString("base64") + + const chunkResponse = await fetch(`${API_BASE}/api/stream-recording?action=chunk`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + streamId, + chunkIndex: i, + data: base64Data, + timestamp: Date.now() + (i * 1000), // 1 second apart + metadata: { + width: 1920, + height: 1080, + fps: 30, + bitrate: 5000000, + }, + }), + }) + + if (!chunkResponse.ok) { + throw new Error(`Failed to upload chunk ${i}: ${await chunkResponse.text()}`) + } + + const chunkData2 = await chunkResponse.json() + console.log(` ✓ Chunk ${i} uploaded (${Math.round(chunkData.length / 1024)}KB)`) + + // Wait a bit between chunks to simulate real streaming + await sleep(100) + } + console.log("") + + // Step 4: List recordings via API + console.log("4️⃣ Fetching recordings from API...") + + const listResponse = await fetch(`${API_BASE}/api/stream-recording`) + if (!listResponse.ok) { + throw new Error(`Failed to list recordings: ${await listResponse.text()}`) + } + + const listData = await listResponse.json() as { recordings: any[] } + console.log(` ✓ Found ${listData.recordings.length} recording(s)`) + + const ourRecording = listData.recordings.find(r => r.streamId === streamId) + if (!ourRecording) { + throw new Error("Our recording not found in list!") + } + + console.log(` ✓ Recording found with ${ourRecording.chunks?.length || 0} chunks`) + console.log("") + + // Step 5: Verify chunk files exist + console.log("5️⃣ Verifying chunk files...") + const fs = await import("fs/promises") + const chunksDir = `/Users/nikiv/fork-i/garden-co/jazz/glide-storage/stream-recordings/${streamId}` + + try { + const files = await fs.readdir(chunksDir) + const chunkFiles = files.filter(f => f.startsWith("chunk-")) + console.log(` ✓ ${chunkFiles.length} chunk files found in ${chunksDir}`) + + for (const file of chunkFiles.slice(0, 3)) { + const stats = await fs.stat(`${chunksDir}/${file}`) + console.log(` - ${file}: ${Math.round(stats.size / 1024)}KB`) + } + } catch (err) { + console.error(` ✗ Failed to read chunks directory: ${err}`) + } + console.log("") + + // Step 6: End recording + console.log("6️⃣ Ending recording session...") + + const endResponse = await fetch(`${API_BASE}/api/stream-recording?action=end`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + streamId, + endedAt: Date.now(), + }), + }) + + if (!endResponse.ok) { + throw new Error(`Failed to end recording: ${await endResponse.text()}`) + } + + const endData = await endResponse.json() + console.log(` ✓ Recording ended: ${JSON.stringify(endData)}`) + console.log("") + + // Step 7: Summary + console.log("7️⃣ Test Summary") + console.log(` Stream ID: ${streamId}`) + console.log(` Chunks uploaded: ${numChunks}`) + console.log(` Total data: ${Math.round((numChunks * 256))}KB`) + console.log(` Storage: /Users/nikiv/fork-i/garden-co/jazz/glide-storage/stream-recordings/${streamId}`) + console.log("") + + // Step 8: Next steps + console.log("📝 Next Steps:") + console.log(" 1. Open Linsa at http://localhost:3000/streams") + console.log(" 2. The page will auto-sync this recording to Jazz FileStream") + console.log(" 3. Timeline will appear showing the 5 chunks") + console.log(" 4. Open Glide browser to see timeline on canvas") + console.log("") + + console.log("✅ Test completed successfully!") + console.log("") + console.log("🎯 To see the timeline:") + console.log(" - Visit http://localhost:3000/streams") + console.log(" - Wait for auto-sync (5 seconds)") + console.log(" - Timeline will show the test stream") + console.log("") +} + +// Run the test +testStreamRecording().catch((error) => { + console.error("") + console.error("❌ Test failed:", error) + console.error("") + process.exit(1) +})