diff --git a/packages/web/drizzle/0004_add_stream_replays.sql b/packages/web/drizzle/0004_add_stream_replays.sql new file mode 100644 index 00000000..adef8763 --- /dev/null +++ b/packages/web/drizzle/0004_add_stream_replays.sql @@ -0,0 +1,17 @@ +CREATE TABLE "stream_replays" ( + "id" uuid PRIMARY KEY DEFAULT gen_random_uuid(), + "stream_id" uuid NOT NULL REFERENCES "streams"(id) ON DELETE CASCADE, + "user_id" text NOT NULL REFERENCES "users"(id) ON DELETE CASCADE, + "title" text NOT NULL DEFAULT 'Stream Replay', + "description" text, + "status" varchar(32) NOT NULL DEFAULT 'processing', + "jazz_replay_id" text, + "playback_url" text, + "thumbnail_url" text, + "duration_seconds" integer, + "started_at" timestamp with time zone, + "ended_at" timestamp with time zone, + "is_public" boolean NOT NULL DEFAULT false, + "created_at" timestamp with time zone DEFAULT now() NOT NULL, + "updated_at" timestamp with time zone DEFAULT now() NOT NULL +); diff --git a/packages/web/drizzle/meta/_journal.json b/packages/web/drizzle/meta/_journal.json index b875be4a..672f766a 100644 --- a/packages/web/drizzle/meta/_journal.json +++ b/packages/web/drizzle/meta/_journal.json @@ -29,6 +29,13 @@ "when": 1766456542436, "tag": "0003_add_webrtc_url_to_streams", "breakpoints": true + }, + { + "idx": 4, + "version": "7", + "when": 1769000000000, + "tag": "0004_add_stream_replays", + "breakpoints": true } ] } diff --git a/packages/web/src/db/schema.ts b/packages/web/src/db/schema.ts index 1e59a2e3..0dafd09b 100644 --- a/packages/web/src/db/schema.ts +++ b/packages/web/src/db/schema.ts @@ -264,6 +264,39 @@ export const streams = pgTable("streams", { export const selectStreamsSchema = createSelectSchema(streams) export type Stream = z.infer +// ============================================================================= +// Stream Replays (saved live streams, stored in Jazz) +// ============================================================================= + +export const stream_replays = pgTable("stream_replays", { + id: uuid("id").primaryKey().defaultRandom(), + stream_id: uuid("stream_id") + .notNull() + .references(() => streams.id, { onDelete: "cascade" }), + user_id: text("user_id") + .notNull() + .references(() => users.id, { onDelete: "cascade" }), + title: text("title").notNull().default("Stream Replay"), + description: text("description"), + status: varchar("status", { length: 32 }).notNull().default("processing"), + jazz_replay_id: text("jazz_replay_id"), + playback_url: text("playback_url"), + thumbnail_url: text("thumbnail_url"), + duration_seconds: integer("duration_seconds"), + started_at: timestamp("started_at", { withTimezone: true }), + ended_at: timestamp("ended_at", { withTimezone: true }), + is_public: boolean("is_public").notNull().default(false), + created_at: timestamp("created_at", { withTimezone: true }) + .defaultNow() + .notNull(), + updated_at: timestamp("updated_at", { withTimezone: true }) + .defaultNow() + .notNull(), +}) + +export const selectStreamReplaySchema = createSelectSchema(stream_replays) +export type StreamReplay = z.infer + // ============================================================================= // Stripe Billing // ============================================================================= diff --git a/packages/web/src/lib/stream/replays.ts b/packages/web/src/lib/stream/replays.ts new file mode 100644 index 00000000..cdb59eed --- /dev/null +++ b/packages/web/src/lib/stream/replays.ts @@ -0,0 +1,51 @@ +export type StreamReplayRecord = { + id: string + stream_id: string + user_id: string + title: string + description: string | null + status: string + jazz_replay_id: string | null + playback_url: string | null + thumbnail_url: string | null + duration_seconds: number | null + started_at: string | null + ended_at: string | null + is_public: boolean + created_at: string + updated_at: string +} + +export async function getStreamReplaysByUsername( + username: string, +): Promise { + const response = await fetch(`/api/streams/${username}/replays`, { + credentials: "include", + }) + + if (!response.ok) { + throw new Error("Failed to load stream replays") + } + + const data = (await response.json()) as { replays?: StreamReplayRecord[] } + return data.replays ?? [] +} + +export async function getStreamReplay( + replayId: string, +): Promise { + const response = await fetch(`/api/stream-replays/${replayId}`, { + credentials: "include", + }) + + if (!response.ok) { + throw new Error("Failed to load stream replay") + } + + const data = (await response.json()) as { replay?: StreamReplayRecord } + if (!data.replay) { + throw new Error("Replay not found") + } + + return data.replay +} diff --git a/packages/web/src/lib/stream/status.ts b/packages/web/src/lib/stream/status.ts new file mode 100644 index 00000000..eab9473e --- /dev/null +++ b/packages/web/src/lib/stream/status.ts @@ -0,0 +1,27 @@ +export interface StreamStatus { + isLive: boolean + updatedAt?: string +} + +/** + * Fetches stream status from nikiv.dev/api/stream-status + * This is set by Lin when streaming starts/stops + */ +export async function getStreamStatus(): Promise { + try { + const response = await fetch("https://nikiv.dev/api/stream-status", { + cache: "no-store", + }) + if (!response.ok) { + return { isLive: false } + } + const data = await response.json() + return { + isLive: Boolean(data.isLive), + updatedAt: data.updatedAt, + } + } catch (error) { + console.error("Failed to fetch stream status:", error) + return { isLive: false } + } +} diff --git a/packages/web/src/routes/$username.tsx b/packages/web/src/routes/$username.tsx index 03360833..4eee4247 100644 --- a/packages/web/src/routes/$username.tsx +++ b/packages/web/src/routes/$username.tsx @@ -11,6 +11,7 @@ import { getSpotifyNowPlaying, type SpotifyNowPlayingResponse, } from "@/lib/spotify/now-playing" +import { getStreamStatus } from "@/lib/stream/status" export const Route = createFileRoute("/$username")({ ssr: false, @@ -56,6 +57,7 @@ function StreamPage() { ) const [nowPlayingLoading, setNowPlayingLoading] = useState(false) const [nowPlayingError, setNowPlayingError] = useState(false) + const [streamLive, setStreamLive] = useState(false) useEffect(() => { let isActive = true @@ -118,6 +120,33 @@ function StreamPage() { } }, [username]) + // Poll stream status for nikiv from nikiv.dev/api/stream-status + useEffect(() => { + if (username !== "nikiv") { + return + } + + let isActive = true + + const fetchStatus = async () => { + const status = await getStreamStatus() + if (isActive) { + setStreamLive(status.isLive) + } + } + + // Fetch immediately + fetchStatus() + + // Poll every 10 seconds + const interval = setInterval(fetchStatus, 10000) + + return () => { + isActive = false + clearInterval(interval) + } + }, [username]) + const stream = data?.stream ?? null const playback = stream?.playback ?? null const fallbackPlayback = stream?.hls_url @@ -208,9 +237,11 @@ function StreamPage() { } }, [activePlayback?.type, stream?.hls_url]) + // For nikiv, use streamLive from the polled API status + // For other users, use stream?.is_live from the database + const isLiveStatus = username === "nikiv" ? streamLive : Boolean(stream?.is_live) const isActuallyLive = - Boolean(stream?.is_live) && - (activePlayback?.type !== "hls" || hlsLive !== false) + isLiveStatus && (activePlayback?.type !== "hls" || hlsLive !== false) const shouldFetchSpotify = username === "nikiv" && !isActuallyLive useEffect(() => { diff --git a/packages/web/src/routes/api/stream-replays.$replayId.ts b/packages/web/src/routes/api/stream-replays.$replayId.ts new file mode 100644 index 00000000..13c1a57e --- /dev/null +++ b/packages/web/src/routes/api/stream-replays.$replayId.ts @@ -0,0 +1,231 @@ +import { createFileRoute } from "@tanstack/react-router" +import { and, eq } from "drizzle-orm" +import { db } from "@/db/connection" +import { getAuth } from "@/lib/auth" +import { stream_replays, streams } from "@/db/schema" + +const json = (data: unknown, status = 200) => + new Response(JSON.stringify(data), { + status, + headers: { "content-type": "application/json" }, + }) + +const REPLAY_STATUSES = ["recording", "processing", "ready", "failed"] as const +type ReplayStatus = (typeof REPLAY_STATUSES)[number] + +const parseStatus = (value: unknown): ReplayStatus | null => { + if (typeof value !== "string") return null + if ((REPLAY_STATUSES as readonly string[]).includes(value)) { + return value as ReplayStatus + } + return null +} + +const parseDate = (value: unknown): Date | null => { + if (value === null || value === undefined) return null + if (value instanceof Date) return value + if (typeof value === "string" || typeof value === "number") { + const parsed = new Date(value) + if (!Number.isNaN(parsed.valueOf())) return parsed + } + return null +} + +const resolveStreamKey = (request: Request, body?: Record) => { + const headerKey = request.headers.get("x-stream-key")?.trim() + if (headerKey) return headerKey + if (body && typeof body.stream_key === "string") { + const key = body.stream_key.trim() + if (key) return key + } + return null +} + +const canAccessReplay = async (request: Request, replayUserId: string) => { + const auth = getAuth() + const session = await auth.api.getSession({ headers: request.headers }) + if (session?.user?.id && session.user.id === replayUserId) { + return true + } + return false +} + +const canEditReplay = async ( + request: Request, + replayStreamId: string, + replayUserId: string, + body?: Record, +) => { + if (await canAccessReplay(request, replayUserId)) { + return true + } + + const streamKey = resolveStreamKey(request, body) + if (!streamKey) return false + + const database = db() + const stream = await database.query.streams.findFirst({ + where: and(eq(streams.id, replayStreamId), eq(streams.stream_key, streamKey)), + }) + + return Boolean(stream) +} + +// GET /api/stream-replays/:replayId +const handleGet = async ({ + request, + params, +}: { + request: Request + params: { replayId: string } +}) => { + const database = db() + + const replay = await database.query.stream_replays.findFirst({ + where: eq(stream_replays.id, params.replayId), + }) + + if (!replay) { + return json({ error: "Replay not found" }, 404) + } + + const isOwner = await canAccessReplay(request, replay.user_id) + if (!isOwner && (!replay.is_public || replay.status !== "ready")) { + return json({ error: "Forbidden" }, 403) + } + + return json({ replay }) +} + +// PATCH /api/stream-replays/:replayId +const handlePatch = async ({ + request, + params, +}: { + request: Request + params: { replayId: string } +}) => { + let body: Record + try { + body = (await request.json()) as Record + } catch { + return json({ error: "Invalid JSON body" }, 400) + } + + const database = db() + const replay = await database.query.stream_replays.findFirst({ + where: eq(stream_replays.id, params.replayId), + }) + + if (!replay) { + return json({ error: "Replay not found" }, 404) + } + + const canEdit = await canEditReplay( + request, + replay.stream_id, + replay.user_id, + body, + ) + + if (!canEdit) { + return json({ error: "Unauthorized" }, 401) + } + + const updateData: Partial = { + updated_at: new Date(), + } + + if (typeof body.title === "string") { + const title = body.title.trim() + if (!title) { + return json({ error: "Title cannot be empty" }, 400) + } + updateData.title = title + } + + if (typeof body.description === "string") { + updateData.description = body.description.trim() + } + + if (body.status !== undefined) { + const status = parseStatus(body.status) + if (!status) { + return json({ error: "Invalid status" }, 400) + } + updateData.status = status + } + + if (typeof body.jazz_replay_id === "string") { + updateData.jazz_replay_id = body.jazz_replay_id.trim() + } + + if (typeof body.playback_url === "string") { + updateData.playback_url = body.playback_url.trim() + } + + if (typeof body.thumbnail_url === "string") { + updateData.thumbnail_url = body.thumbnail_url.trim() + } + + if (body.started_at !== undefined) { + const startedAt = parseDate(body.started_at) + if (!startedAt) { + return json({ error: "Invalid started_at" }, 400) + } + updateData.started_at = startedAt + } + + if (body.ended_at !== undefined) { + const endedAt = parseDate(body.ended_at) + if (!endedAt) { + return json({ error: "Invalid ended_at" }, 400) + } + updateData.ended_at = endedAt + } + + if (typeof body.duration_seconds === "number") { + updateData.duration_seconds = Math.max(0, Math.floor(body.duration_seconds)) + } else if ( + updateData.duration_seconds === undefined && + updateData.started_at && + updateData.ended_at + ) { + updateData.duration_seconds = Math.max( + 0, + Math.floor( + (updateData.ended_at.getTime() - updateData.started_at.getTime()) / 1000, + ), + ) + } + + if (typeof body.is_public === "boolean") { + updateData.is_public = body.is_public + } + + if (Object.keys(updateData).length === 1) { + return json({ error: "No fields to update" }, 400) + } + + try { + const [updated] = await database + .update(stream_replays) + .set(updateData) + .where(eq(stream_replays.id, params.replayId)) + .returning() + + return json({ replay: updated }) + } catch (error) { + console.error("[stream-replays] Error updating replay:", error) + return json({ error: "Failed to update replay" }, 500) + } +} + +export const Route = createFileRoute("/api/stream-replays/$replayId")({ + server: { + handlers: { + GET: handleGet, + PATCH: handlePatch, + }, + }, +}) diff --git a/packages/web/src/routes/api/stream-replays.ts b/packages/web/src/routes/api/stream-replays.ts new file mode 100644 index 00000000..1207d081 --- /dev/null +++ b/packages/web/src/routes/api/stream-replays.ts @@ -0,0 +1,178 @@ +import { createFileRoute } from "@tanstack/react-router" +import { desc, eq } from "drizzle-orm" +import { db } from "@/db/connection" +import { getAuth } from "@/lib/auth" +import { stream_replays, streams } from "@/db/schema" + +const json = (data: unknown, status = 200) => + new Response(JSON.stringify(data), { + status, + headers: { "content-type": "application/json" }, + }) + +const REPLAY_STATUSES = ["recording", "processing", "ready", "failed"] as const +type ReplayStatus = (typeof REPLAY_STATUSES)[number] + +const parseStatus = (value: unknown): ReplayStatus | null => { + if (typeof value !== "string") return null + if ((REPLAY_STATUSES as readonly string[]).includes(value)) { + return value as ReplayStatus + } + return null +} + +const parseDate = (value: unknown): Date | null => { + if (value === null || value === undefined) return null + if (value instanceof Date) return value + if (typeof value === "string" || typeof value === "number") { + const parsed = new Date(value) + if (!Number.isNaN(parsed.valueOf())) return parsed + } + return null +} + +const resolveStream = async ( + request: Request, + body: Record, +) => { + const database = db() + const headerKey = request.headers.get("x-stream-key")?.trim() + const bodyKey = + typeof body.stream_key === "string" ? body.stream_key.trim() : null + const streamKey = headerKey || bodyKey + + if (streamKey) { + return database.query.streams.findFirst({ + where: eq(streams.stream_key, streamKey), + }) + } + + const auth = getAuth() + const session = await auth.api.getSession({ headers: request.headers }) + if (!session?.user?.id) { + return null + } + + return database.query.streams.findFirst({ + where: eq(streams.user_id, session.user.id), + }) +} + +// GET /api/stream-replays - list current user's replays +const handleGet = async ({ request }: { request: Request }) => { + const auth = getAuth() + const session = await auth.api.getSession({ headers: request.headers }) + + if (!session?.user?.id) { + return json({ error: "Unauthorized" }, 401) + } + + const database = db() + + try { + const replays = await database + .select() + .from(stream_replays) + .where(eq(stream_replays.user_id, session.user.id)) + .orderBy(desc(stream_replays.started_at), desc(stream_replays.created_at)) + + return json({ replays }) + } catch (error) { + console.error("[stream-replays] Error fetching replays:", error) + return json({ error: "Failed to fetch replays" }, 500) + } +} + +// POST /api/stream-replays - create replay (stream-key or owner session) +const handlePost = async ({ request }: { request: Request }) => { + let body: Record + try { + body = (await request.json()) as Record + } catch { + return json({ error: "Invalid JSON body" }, 400) + } + + const stream = await resolveStream(request, body) + if (!stream) { + return json({ error: "Unauthorized" }, 401) + } + + const title = + typeof body.title === "string" && body.title.trim() + ? body.title.trim() + : "Stream Replay" + const description = + typeof body.description === "string" ? body.description.trim() : undefined + const statusValue = + body.status !== undefined ? parseStatus(body.status) : null + if (body.status !== undefined && !statusValue) { + return json({ error: "Invalid status" }, 400) + } + const status = statusValue ?? "processing" + const jazzReplayId = + typeof body.jazz_replay_id === "string" && body.jazz_replay_id.trim() + ? body.jazz_replay_id.trim() + : undefined + const playbackUrl = + typeof body.playback_url === "string" && body.playback_url.trim() + ? body.playback_url.trim() + : undefined + const thumbnailUrl = + typeof body.thumbnail_url === "string" && body.thumbnail_url.trim() + ? body.thumbnail_url.trim() + : undefined + const startedAtRaw = body.started_at + const endedAtRaw = body.ended_at + const startedAt = parseDate(startedAtRaw) + const endedAt = parseDate(endedAtRaw) + if (startedAtRaw !== undefined && !startedAt) { + return json({ error: "Invalid started_at" }, 400) + } + if (endedAtRaw !== undefined && !endedAt) { + return json({ error: "Invalid ended_at" }, 400) + } + const durationSeconds = + typeof body.duration_seconds === "number" + ? Math.max(0, Math.floor(body.duration_seconds)) + : startedAt && endedAt + ? Math.max(0, Math.floor((endedAt.getTime() - startedAt.getTime()) / 1000)) + : undefined + const isPublic = + typeof body.is_public === "boolean" ? body.is_public : undefined + + const database = db() + + try { + const [replay] = await database + .insert(stream_replays) + .values({ + stream_id: stream.id, + user_id: stream.user_id, + title, + description, + status, + jazz_replay_id: jazzReplayId, + playback_url: playbackUrl, + thumbnail_url: thumbnailUrl, + started_at: startedAt ?? undefined, + ended_at: endedAt ?? undefined, + duration_seconds: durationSeconds, + is_public: isPublic ?? false, + }) + .returning() + + return json({ replay }, 201) + } catch (error) { + console.error("[stream-replays] Error creating replay:", error) + return json({ error: "Failed to create replay" }, 500) + } +} + +export const Route = createFileRoute("/api/stream-replays")({ + server: { + handlers: { + GET: handleGet, + POST: handlePost, + }, + }, +}) diff --git a/packages/web/src/routes/api/streams.$username.replays.ts b/packages/web/src/routes/api/streams.$username.replays.ts new file mode 100644 index 00000000..ef199c89 --- /dev/null +++ b/packages/web/src/routes/api/streams.$username.replays.ts @@ -0,0 +1,66 @@ +import { createFileRoute } from "@tanstack/react-router" +import { and, desc, eq } from "drizzle-orm" +import { db } from "@/db/connection" +import { getAuth } from "@/lib/auth" +import { stream_replays, users } from "@/db/schema" + +const json = (data: unknown, status = 200) => + new Response(JSON.stringify(data), { + status, + headers: { "content-type": "application/json" }, + }) + +const handleGet = async ({ + request, + params, +}: { + request: Request + params: { username: string } +}) => { + const { username } = params + + if (!username) { + return json({ error: "Username required" }, 400) + } + + const database = db() + + const user = await database.query.users.findFirst({ + where: eq(users.username, username), + }) + + if (!user) { + return json({ error: "User not found" }, 404) + } + + const auth = getAuth() + const session = await auth.api.getSession({ headers: request.headers }) + const isOwner = session?.user?.id === user.id + + const conditions = [eq(stream_replays.user_id, user.id)] + if (!isOwner) { + conditions.push(eq(stream_replays.is_public, true)) + conditions.push(eq(stream_replays.status, "ready")) + } + + try { + const replays = await database + .select() + .from(stream_replays) + .where(and(...conditions)) + .orderBy(desc(stream_replays.started_at), desc(stream_replays.created_at)) + + return json({ replays }) + } catch (error) { + console.error("[stream-replays] Error fetching replays:", error) + return json({ error: "Failed to fetch replays" }, 500) + } +} + +export const Route = createFileRoute("/api/streams/$username/replays")({ + server: { + handlers: { + GET: handleGet, + }, + }, +})