Add stream_replays table, related schema, and API endpoints for managing stream replays

This commit is contained in:
Nikita
2025-12-23 23:20:22 -08:00
parent 1bb0450d02
commit 7f6f7d2f37
9 changed files with 643 additions and 2 deletions

View File

@@ -0,0 +1,17 @@
CREATE TABLE "stream_replays" (
"id" uuid PRIMARY KEY DEFAULT gen_random_uuid(),
"stream_id" uuid NOT NULL REFERENCES "streams"(id) ON DELETE CASCADE,
"user_id" text NOT NULL REFERENCES "users"(id) ON DELETE CASCADE,
"title" text NOT NULL DEFAULT 'Stream Replay',
"description" text,
"status" varchar(32) NOT NULL DEFAULT 'processing',
"jazz_replay_id" text,
"playback_url" text,
"thumbnail_url" text,
"duration_seconds" integer,
"started_at" timestamp with time zone,
"ended_at" timestamp with time zone,
"is_public" boolean NOT NULL DEFAULT false,
"created_at" timestamp with time zone DEFAULT now() NOT NULL,
"updated_at" timestamp with time zone DEFAULT now() NOT NULL
);

View File

@@ -29,6 +29,13 @@
"when": 1766456542436,
"tag": "0003_add_webrtc_url_to_streams",
"breakpoints": true
},
{
"idx": 4,
"version": "7",
"when": 1769000000000,
"tag": "0004_add_stream_replays",
"breakpoints": true
}
]
}

View File

@@ -264,6 +264,39 @@ export const streams = pgTable("streams", {
export const selectStreamsSchema = createSelectSchema(streams)
export type Stream = z.infer<typeof selectStreamsSchema>
// =============================================================================
// Stream Replays (saved live streams, stored in Jazz)
// =============================================================================
export const stream_replays = pgTable("stream_replays", {
id: uuid("id").primaryKey().defaultRandom(),
stream_id: uuid("stream_id")
.notNull()
.references(() => streams.id, { onDelete: "cascade" }),
user_id: text("user_id")
.notNull()
.references(() => users.id, { onDelete: "cascade" }),
title: text("title").notNull().default("Stream Replay"),
description: text("description"),
status: varchar("status", { length: 32 }).notNull().default("processing"),
jazz_replay_id: text("jazz_replay_id"),
playback_url: text("playback_url"),
thumbnail_url: text("thumbnail_url"),
duration_seconds: integer("duration_seconds"),
started_at: timestamp("started_at", { withTimezone: true }),
ended_at: timestamp("ended_at", { withTimezone: true }),
is_public: boolean("is_public").notNull().default(false),
created_at: timestamp("created_at", { withTimezone: true })
.defaultNow()
.notNull(),
updated_at: timestamp("updated_at", { withTimezone: true })
.defaultNow()
.notNull(),
})
export const selectStreamReplaySchema = createSelectSchema(stream_replays)
export type StreamReplay = z.infer<typeof selectStreamReplaySchema>
// =============================================================================
// Stripe Billing
// =============================================================================

View File

@@ -0,0 +1,51 @@
export type StreamReplayRecord = {
id: string
stream_id: string
user_id: string
title: string
description: string | null
status: string
jazz_replay_id: string | null
playback_url: string | null
thumbnail_url: string | null
duration_seconds: number | null
started_at: string | null
ended_at: string | null
is_public: boolean
created_at: string
updated_at: string
}
export async function getStreamReplaysByUsername(
username: string,
): Promise<StreamReplayRecord[]> {
const response = await fetch(`/api/streams/${username}/replays`, {
credentials: "include",
})
if (!response.ok) {
throw new Error("Failed to load stream replays")
}
const data = (await response.json()) as { replays?: StreamReplayRecord[] }
return data.replays ?? []
}
export async function getStreamReplay(
replayId: string,
): Promise<StreamReplayRecord> {
const response = await fetch(`/api/stream-replays/${replayId}`, {
credentials: "include",
})
if (!response.ok) {
throw new Error("Failed to load stream replay")
}
const data = (await response.json()) as { replay?: StreamReplayRecord }
if (!data.replay) {
throw new Error("Replay not found")
}
return data.replay
}

View File

@@ -0,0 +1,27 @@
export interface StreamStatus {
isLive: boolean
updatedAt?: string
}
/**
* Fetches stream status from nikiv.dev/api/stream-status
* This is set by Lin when streaming starts/stops
*/
export async function getStreamStatus(): Promise<StreamStatus> {
try {
const response = await fetch("https://nikiv.dev/api/stream-status", {
cache: "no-store",
})
if (!response.ok) {
return { isLive: false }
}
const data = await response.json()
return {
isLive: Boolean(data.isLive),
updatedAt: data.updatedAt,
}
} catch (error) {
console.error("Failed to fetch stream status:", error)
return { isLive: false }
}
}

View File

@@ -11,6 +11,7 @@ import {
getSpotifyNowPlaying,
type SpotifyNowPlayingResponse,
} from "@/lib/spotify/now-playing"
import { getStreamStatus } from "@/lib/stream/status"
export const Route = createFileRoute("/$username")({
ssr: false,
@@ -56,6 +57,7 @@ function StreamPage() {
)
const [nowPlayingLoading, setNowPlayingLoading] = useState(false)
const [nowPlayingError, setNowPlayingError] = useState(false)
const [streamLive, setStreamLive] = useState(false)
useEffect(() => {
let isActive = true
@@ -118,6 +120,33 @@ function StreamPage() {
}
}, [username])
// Poll stream status for nikiv from nikiv.dev/api/stream-status
useEffect(() => {
if (username !== "nikiv") {
return
}
let isActive = true
const fetchStatus = async () => {
const status = await getStreamStatus()
if (isActive) {
setStreamLive(status.isLive)
}
}
// Fetch immediately
fetchStatus()
// Poll every 10 seconds
const interval = setInterval(fetchStatus, 10000)
return () => {
isActive = false
clearInterval(interval)
}
}, [username])
const stream = data?.stream ?? null
const playback = stream?.playback ?? null
const fallbackPlayback = stream?.hls_url
@@ -208,9 +237,11 @@ function StreamPage() {
}
}, [activePlayback?.type, stream?.hls_url])
// For nikiv, use streamLive from the polled API status
// For other users, use stream?.is_live from the database
const isLiveStatus = username === "nikiv" ? streamLive : Boolean(stream?.is_live)
const isActuallyLive =
Boolean(stream?.is_live) &&
(activePlayback?.type !== "hls" || hlsLive !== false)
isLiveStatus && (activePlayback?.type !== "hls" || hlsLive !== false)
const shouldFetchSpotify = username === "nikiv" && !isActuallyLive
useEffect(() => {

View File

@@ -0,0 +1,231 @@
import { createFileRoute } from "@tanstack/react-router"
import { and, eq } from "drizzle-orm"
import { db } from "@/db/connection"
import { getAuth } from "@/lib/auth"
import { stream_replays, streams } from "@/db/schema"
const json = (data: unknown, status = 200) =>
new Response(JSON.stringify(data), {
status,
headers: { "content-type": "application/json" },
})
const REPLAY_STATUSES = ["recording", "processing", "ready", "failed"] as const
type ReplayStatus = (typeof REPLAY_STATUSES)[number]
const parseStatus = (value: unknown): ReplayStatus | null => {
if (typeof value !== "string") return null
if ((REPLAY_STATUSES as readonly string[]).includes(value)) {
return value as ReplayStatus
}
return null
}
const parseDate = (value: unknown): Date | null => {
if (value === null || value === undefined) return null
if (value instanceof Date) return value
if (typeof value === "string" || typeof value === "number") {
const parsed = new Date(value)
if (!Number.isNaN(parsed.valueOf())) return parsed
}
return null
}
const resolveStreamKey = (request: Request, body?: Record<string, unknown>) => {
const headerKey = request.headers.get("x-stream-key")?.trim()
if (headerKey) return headerKey
if (body && typeof body.stream_key === "string") {
const key = body.stream_key.trim()
if (key) return key
}
return null
}
const canAccessReplay = async (request: Request, replayUserId: string) => {
const auth = getAuth()
const session = await auth.api.getSession({ headers: request.headers })
if (session?.user?.id && session.user.id === replayUserId) {
return true
}
return false
}
const canEditReplay = async (
request: Request,
replayStreamId: string,
replayUserId: string,
body?: Record<string, unknown>,
) => {
if (await canAccessReplay(request, replayUserId)) {
return true
}
const streamKey = resolveStreamKey(request, body)
if (!streamKey) return false
const database = db()
const stream = await database.query.streams.findFirst({
where: and(eq(streams.id, replayStreamId), eq(streams.stream_key, streamKey)),
})
return Boolean(stream)
}
// GET /api/stream-replays/:replayId
const handleGet = async ({
request,
params,
}: {
request: Request
params: { replayId: string }
}) => {
const database = db()
const replay = await database.query.stream_replays.findFirst({
where: eq(stream_replays.id, params.replayId),
})
if (!replay) {
return json({ error: "Replay not found" }, 404)
}
const isOwner = await canAccessReplay(request, replay.user_id)
if (!isOwner && (!replay.is_public || replay.status !== "ready")) {
return json({ error: "Forbidden" }, 403)
}
return json({ replay })
}
// PATCH /api/stream-replays/:replayId
const handlePatch = async ({
request,
params,
}: {
request: Request
params: { replayId: string }
}) => {
let body: Record<string, unknown>
try {
body = (await request.json()) as Record<string, unknown>
} catch {
return json({ error: "Invalid JSON body" }, 400)
}
const database = db()
const replay = await database.query.stream_replays.findFirst({
where: eq(stream_replays.id, params.replayId),
})
if (!replay) {
return json({ error: "Replay not found" }, 404)
}
const canEdit = await canEditReplay(
request,
replay.stream_id,
replay.user_id,
body,
)
if (!canEdit) {
return json({ error: "Unauthorized" }, 401)
}
const updateData: Partial<typeof stream_replays.$inferInsert> = {
updated_at: new Date(),
}
if (typeof body.title === "string") {
const title = body.title.trim()
if (!title) {
return json({ error: "Title cannot be empty" }, 400)
}
updateData.title = title
}
if (typeof body.description === "string") {
updateData.description = body.description.trim()
}
if (body.status !== undefined) {
const status = parseStatus(body.status)
if (!status) {
return json({ error: "Invalid status" }, 400)
}
updateData.status = status
}
if (typeof body.jazz_replay_id === "string") {
updateData.jazz_replay_id = body.jazz_replay_id.trim()
}
if (typeof body.playback_url === "string") {
updateData.playback_url = body.playback_url.trim()
}
if (typeof body.thumbnail_url === "string") {
updateData.thumbnail_url = body.thumbnail_url.trim()
}
if (body.started_at !== undefined) {
const startedAt = parseDate(body.started_at)
if (!startedAt) {
return json({ error: "Invalid started_at" }, 400)
}
updateData.started_at = startedAt
}
if (body.ended_at !== undefined) {
const endedAt = parseDate(body.ended_at)
if (!endedAt) {
return json({ error: "Invalid ended_at" }, 400)
}
updateData.ended_at = endedAt
}
if (typeof body.duration_seconds === "number") {
updateData.duration_seconds = Math.max(0, Math.floor(body.duration_seconds))
} else if (
updateData.duration_seconds === undefined &&
updateData.started_at &&
updateData.ended_at
) {
updateData.duration_seconds = Math.max(
0,
Math.floor(
(updateData.ended_at.getTime() - updateData.started_at.getTime()) / 1000,
),
)
}
if (typeof body.is_public === "boolean") {
updateData.is_public = body.is_public
}
if (Object.keys(updateData).length === 1) {
return json({ error: "No fields to update" }, 400)
}
try {
const [updated] = await database
.update(stream_replays)
.set(updateData)
.where(eq(stream_replays.id, params.replayId))
.returning()
return json({ replay: updated })
} catch (error) {
console.error("[stream-replays] Error updating replay:", error)
return json({ error: "Failed to update replay" }, 500)
}
}
export const Route = createFileRoute("/api/stream-replays/$replayId")({
server: {
handlers: {
GET: handleGet,
PATCH: handlePatch,
},
},
})

View File

@@ -0,0 +1,178 @@
import { createFileRoute } from "@tanstack/react-router"
import { desc, eq } from "drizzle-orm"
import { db } from "@/db/connection"
import { getAuth } from "@/lib/auth"
import { stream_replays, streams } from "@/db/schema"
const json = (data: unknown, status = 200) =>
new Response(JSON.stringify(data), {
status,
headers: { "content-type": "application/json" },
})
const REPLAY_STATUSES = ["recording", "processing", "ready", "failed"] as const
type ReplayStatus = (typeof REPLAY_STATUSES)[number]
const parseStatus = (value: unknown): ReplayStatus | null => {
if (typeof value !== "string") return null
if ((REPLAY_STATUSES as readonly string[]).includes(value)) {
return value as ReplayStatus
}
return null
}
const parseDate = (value: unknown): Date | null => {
if (value === null || value === undefined) return null
if (value instanceof Date) return value
if (typeof value === "string" || typeof value === "number") {
const parsed = new Date(value)
if (!Number.isNaN(parsed.valueOf())) return parsed
}
return null
}
const resolveStream = async (
request: Request,
body: Record<string, unknown>,
) => {
const database = db()
const headerKey = request.headers.get("x-stream-key")?.trim()
const bodyKey =
typeof body.stream_key === "string" ? body.stream_key.trim() : null
const streamKey = headerKey || bodyKey
if (streamKey) {
return database.query.streams.findFirst({
where: eq(streams.stream_key, streamKey),
})
}
const auth = getAuth()
const session = await auth.api.getSession({ headers: request.headers })
if (!session?.user?.id) {
return null
}
return database.query.streams.findFirst({
where: eq(streams.user_id, session.user.id),
})
}
// GET /api/stream-replays - list current user's replays
const handleGet = async ({ request }: { request: Request }) => {
const auth = getAuth()
const session = await auth.api.getSession({ headers: request.headers })
if (!session?.user?.id) {
return json({ error: "Unauthorized" }, 401)
}
const database = db()
try {
const replays = await database
.select()
.from(stream_replays)
.where(eq(stream_replays.user_id, session.user.id))
.orderBy(desc(stream_replays.started_at), desc(stream_replays.created_at))
return json({ replays })
} catch (error) {
console.error("[stream-replays] Error fetching replays:", error)
return json({ error: "Failed to fetch replays" }, 500)
}
}
// POST /api/stream-replays - create replay (stream-key or owner session)
const handlePost = async ({ request }: { request: Request }) => {
let body: Record<string, unknown>
try {
body = (await request.json()) as Record<string, unknown>
} catch {
return json({ error: "Invalid JSON body" }, 400)
}
const stream = await resolveStream(request, body)
if (!stream) {
return json({ error: "Unauthorized" }, 401)
}
const title =
typeof body.title === "string" && body.title.trim()
? body.title.trim()
: "Stream Replay"
const description =
typeof body.description === "string" ? body.description.trim() : undefined
const statusValue =
body.status !== undefined ? parseStatus(body.status) : null
if (body.status !== undefined && !statusValue) {
return json({ error: "Invalid status" }, 400)
}
const status = statusValue ?? "processing"
const jazzReplayId =
typeof body.jazz_replay_id === "string" && body.jazz_replay_id.trim()
? body.jazz_replay_id.trim()
: undefined
const playbackUrl =
typeof body.playback_url === "string" && body.playback_url.trim()
? body.playback_url.trim()
: undefined
const thumbnailUrl =
typeof body.thumbnail_url === "string" && body.thumbnail_url.trim()
? body.thumbnail_url.trim()
: undefined
const startedAtRaw = body.started_at
const endedAtRaw = body.ended_at
const startedAt = parseDate(startedAtRaw)
const endedAt = parseDate(endedAtRaw)
if (startedAtRaw !== undefined && !startedAt) {
return json({ error: "Invalid started_at" }, 400)
}
if (endedAtRaw !== undefined && !endedAt) {
return json({ error: "Invalid ended_at" }, 400)
}
const durationSeconds =
typeof body.duration_seconds === "number"
? Math.max(0, Math.floor(body.duration_seconds))
: startedAt && endedAt
? Math.max(0, Math.floor((endedAt.getTime() - startedAt.getTime()) / 1000))
: undefined
const isPublic =
typeof body.is_public === "boolean" ? body.is_public : undefined
const database = db()
try {
const [replay] = await database
.insert(stream_replays)
.values({
stream_id: stream.id,
user_id: stream.user_id,
title,
description,
status,
jazz_replay_id: jazzReplayId,
playback_url: playbackUrl,
thumbnail_url: thumbnailUrl,
started_at: startedAt ?? undefined,
ended_at: endedAt ?? undefined,
duration_seconds: durationSeconds,
is_public: isPublic ?? false,
})
.returning()
return json({ replay }, 201)
} catch (error) {
console.error("[stream-replays] Error creating replay:", error)
return json({ error: "Failed to create replay" }, 500)
}
}
export const Route = createFileRoute("/api/stream-replays")({
server: {
handlers: {
GET: handleGet,
POST: handlePost,
},
},
})

View File

@@ -0,0 +1,66 @@
import { createFileRoute } from "@tanstack/react-router"
import { and, desc, eq } from "drizzle-orm"
import { db } from "@/db/connection"
import { getAuth } from "@/lib/auth"
import { stream_replays, users } from "@/db/schema"
const json = (data: unknown, status = 200) =>
new Response(JSON.stringify(data), {
status,
headers: { "content-type": "application/json" },
})
const handleGet = async ({
request,
params,
}: {
request: Request
params: { username: string }
}) => {
const { username } = params
if (!username) {
return json({ error: "Username required" }, 400)
}
const database = db()
const user = await database.query.users.findFirst({
where: eq(users.username, username),
})
if (!user) {
return json({ error: "User not found" }, 404)
}
const auth = getAuth()
const session = await auth.api.getSession({ headers: request.headers })
const isOwner = session?.user?.id === user.id
const conditions = [eq(stream_replays.user_id, user.id)]
if (!isOwner) {
conditions.push(eq(stream_replays.is_public, true))
conditions.push(eq(stream_replays.status, "ready"))
}
try {
const replays = await database
.select()
.from(stream_replays)
.where(and(...conditions))
.orderBy(desc(stream_replays.started_at), desc(stream_replays.created_at))
return json({ replays })
} catch (error) {
console.error("[stream-replays] Error fetching replays:", error)
return json({ error: "Failed to fetch replays" }, 500)
}
}
export const Route = createFileRoute("/api/streams/$username/replays")({
server: {
handlers: {
GET: handleGet,
},
},
})