From 8f050b41a024d95cb3b727a251a0556d9a384ba6 Mon Sep 17 00:00:00 2001 From: Debian Date: Wed, 7 Jan 2026 05:36:53 +0000 Subject: [PATCH] Fix stuck processing jobs and increase timeouts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Background Job Processor: - Add src/services/jobProcessor.ts that polls RunPod every 30s for stuck jobs - Automatically completes or fails jobs that were abandoned (user navigated away) - Times out jobs after 25 minutes Client-Side Resume: - Add GET /api/generate/pending endpoint to fetch user's processing jobs - Add checkPendingJobs() that runs on login/page load - Show notification banner when user has jobs generating in background - Add "View Progress" button to resume polling for a job Timeout Increases (10min → 25min): - src/utils/validators.ts: request validation max/default - src/config.ts: RUNPOD_MAX_TIMEOUT_MS default - public/js/app.js: client-side polling maxTime - src/services/jobProcessor.ts: background processor timeout CI/CD Optimization: - Add paths-ignore to backend build.yaml to skip rebuilds on frontend-only changes 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .gitea/workflows/build.yaml | 3 + frontend/IMPLEMENTATION_PLAN.md | 227 ++++++++++++++++++++++++++ frontend/public/css/style.css | 44 +++++ frontend/public/js/app.js | 58 ++++++- frontend/src/config.ts | 2 +- frontend/src/index.ts | 5 + frontend/src/routes/generate.ts | 18 +- frontend/src/services/jobProcessor.ts | 75 +++++++++ frontend/src/utils/validators.ts | 2 +- 9 files changed, 429 insertions(+), 5 deletions(-) create mode 100644 frontend/IMPLEMENTATION_PLAN.md create mode 100644 frontend/src/services/jobProcessor.ts diff --git a/.gitea/workflows/build.yaml b/.gitea/workflows/build.yaml index f27fe05..a31eaa2 100644 --- a/.gitea/workflows/build.yaml +++ b/.gitea/workflows/build.yaml @@ -4,6 +4,9 @@ on: push: branches: - main + paths-ignore: + - 'frontend/**' + - '.gitea/workflows/build-frontend.yaml' jobs: build: diff --git a/frontend/IMPLEMENTATION_PLAN.md b/frontend/IMPLEMENTATION_PLAN.md new file mode 100644 index 0000000..b17cf68 --- /dev/null +++ b/frontend/IMPLEMENTATION_PLAN.md @@ -0,0 +1,227 @@ +# Fix: Handle Navigation Away During Video Generation + +## Problem + +When a user submits a video generation job and navigates away from the page: +1. Client-side polling stops +2. RunPod job continues but results are never fetched +3. Content stays stuck as "processing" forever +4. Video file never gets saved to disk + +## Solution: Two-Part Fix + +### Part 1: Background Job Processor (Server-Side) + +Create a background worker that periodically checks for stuck "processing" jobs and completes them. + +**New file: `src/services/jobProcessor.ts`** + +```typescript +// Runs every 30 seconds +// Queries: SELECT * FROM generated_content WHERE status = 'processing' AND runpod_job_id IS NOT NULL +// For each job: +// 1. Poll RunPod status +// 2. If COMPLETED: download file, update status to 'completed' +// 3. If FAILED: update status to 'failed' with error message +// 4. If still running and created_at > 15 minutes ago: mark as 'failed' (timeout) +``` + +**Modify: `src/index.ts`** +- Import and start the job processor on server startup +- Clean shutdown handling + +### Part 2: Resume Polling on Page Load (Client-Side) + +When user returns to the app, check for their in-progress jobs and resume polling. + +**Modify: `public/js/app.js`** + +```javascript +// On login/page load: +// 1. Call GET /api/content?status=processing to find pending jobs +// 2. For each processing job with a runpod_job_id: +// - Show notification "You have X jobs in progress" +// - Optionally auto-resume polling for most recent one +// 3. Update gallery to show real-time status +``` + +**New API endpoint: `GET /api/generate/pending`** +- Returns user's jobs that are still processing +- Include runpod_job_id so client can poll + +## Files to Modify + +1. **`src/services/jobProcessor.ts`** (NEW) + - `startJobProcessor()` - starts interval + - `stopJobProcessor()` - cleanup + - `processStuckJobs()` - main logic + +2. **`src/index.ts`** + - Import jobProcessor + - Call `startJobProcessor()` after DB init + - Call `stopJobProcessor()` in shutdown handler + +3. **`src/routes/generate.ts`** + - Add `GET /pending` endpoint for user's processing jobs + +4. **`public/js/app.js`** + - Add `checkPendingJobs()` function + - Call it after successful login in `showMainApp()` + - Show UI notification for pending jobs + - Add "Resume" button or auto-resume latest + +## Implementation Details + +### jobProcessor.ts + +```typescript +import { getDb } from '../db/index.js'; +import { getJobStatus } from './runpodService.js'; +import { updateContentStatus, saveContentFile } from './contentService.js'; +import { logger } from '../utils/logger.js'; + +let processorInterval: NodeJS.Timeout | null = null; + +const POLL_INTERVAL = 30000; // 30 seconds +const JOB_TIMEOUT = 15 * 60 * 1000; // 15 minutes + +export function startJobProcessor(): void { + logger.info('Starting background job processor'); + processorInterval = setInterval(processStuckJobs, POLL_INTERVAL); + // Run immediately on startup + processStuckJobs(); +} + +export function stopJobProcessor(): void { + if (processorInterval) { + clearInterval(processorInterval); + processorInterval = null; + logger.info('Stopped background job processor'); + } +} + +async function processStuckJobs(): Promise { + const db = getDb(); + + const pendingJobs = db.prepare(` + SELECT * FROM generated_content + WHERE status = 'processing' AND runpod_job_id IS NOT NULL + `).all(); + + for (const job of pendingJobs) { + try { + const createdAt = new Date(job.created_at).getTime(); + const age = Date.now() - createdAt; + + // Timeout check + if (age > JOB_TIMEOUT) { + updateContentStatus(job.id, 'failed', { + errorMessage: 'Job timed out' + }); + continue; + } + + // Poll RunPod + const status = await getJobStatus(job.runpod_job_id); + + if (status.status === 'COMPLETED' && status.output?.outputs?.[0]) { + const output = status.output.outputs[0]; + if (output.data) { + saveContentFile(job.id, output.data); + } else { + updateContentStatus(job.id, 'completed', { fileSize: output.size }); + } + logger.info({ contentId: job.id }, 'Background processor completed job'); + } else if (status.status === 'FAILED') { + updateContentStatus(job.id, 'failed', { + errorMessage: status.error || 'Job failed' + }); + } + } catch (error) { + logger.error({ error, contentId: job.id }, 'Error processing stuck job'); + } + } +} +``` + +### Frontend changes (app.js) + +Add after `showMainApp()` is called: + +```javascript +async function checkPendingJobs() { + try { + const data = await api('/generate/pending'); + if (data.jobs && data.jobs.length > 0) { + showPendingJobsNotification(data.jobs); + } + } catch (error) { + console.error('Failed to check pending jobs:', error); + } +} + +function showPendingJobsNotification(jobs) { + // Create a notification banner + const banner = document.createElement('div'); + banner.className = 'pending-jobs-banner'; + banner.innerHTML = ` + You have ${jobs.length} video(s) generating + + + `; + document.querySelector('.main-content').prepend(banner); +} +``` + +### New endpoint in generate.ts + +```typescript +// Get user's pending jobs +router.get('/pending', (req, res) => { + const authReq = req as AuthenticatedRequest; + const db = getDb(); + + const jobs = db.prepare(` + SELECT id, runpod_job_id, prompt, created_at + FROM generated_content + WHERE user_id = ? AND status = 'processing' AND runpod_job_id IS NOT NULL + ORDER BY created_at DESC + `).all(authReq.user!.id); + + res.json({ jobs }); +}); +``` + +## CSS Addition (style.css) + +```css +.pending-jobs-banner { + background: linear-gradient(135deg, var(--primary), var(--secondary)); + color: white; + padding: 12px 20px; + border-radius: var(--radius); + margin-bottom: 20px; + display: flex; + align-items: center; + justify-content: space-between; + gap: 15px; +} + +.pending-jobs-banner button { + background: rgba(255,255,255,0.2); + border: 1px solid rgba(255,255,255,0.3); + color: white; + padding: 6px 12px; + border-radius: 4px; + cursor: pointer; +} +``` + +## Testing + +1. Start a generation job +2. Navigate to Gallery while processing +3. Verify background processor picks it up within 30 seconds +4. Verify job completes and file is saved +5. Test timeout scenario (mock a stuck job) +6. Test page reload shows pending jobs notification diff --git a/frontend/public/css/style.css b/frontend/public/css/style.css index 00a576e..bee2cc2 100644 --- a/frontend/public/css/style.css +++ b/frontend/public/css/style.css @@ -602,6 +602,45 @@ body { to { transform: rotate(360deg); } } +/* Pending Jobs Banner */ +.pending-jobs-banner { + background: linear-gradient(135deg, var(--primary), var(--secondary)); + color: white; + padding: 12px 20px; + border-radius: var(--radius); + margin-bottom: 20px; + display: flex; + align-items: center; + justify-content: space-between; + gap: 15px; + animation: fadeIn 0.3s ease; +} + +.pending-jobs-banner span { + font-weight: 500; +} + +.pending-jobs-actions { + display: flex; + gap: 10px; +} + +.pending-jobs-banner .btn { + background: rgba(255, 255, 255, 0.2); + border: 1px solid rgba(255, 255, 255, 0.3); + color: white; + padding: 6px 12px; + border-radius: 4px; + cursor: pointer; + font-size: 12px; + font-weight: 500; + transition: background 0.2s; +} + +.pending-jobs-banner .btn:hover { + background: rgba(255, 255, 255, 0.3); +} + /* Responsive */ @media (max-width: 768px) { .section-grid { @@ -621,4 +660,9 @@ body { .form-row { grid-template-columns: 1fr; } + + .pending-jobs-banner { + flex-direction: column; + text-align: center; + } } diff --git a/frontend/public/js/app.js b/frontend/public/js/app.js index 9ce096c..84370bd 100644 --- a/frontend/public/js/app.js +++ b/frontend/public/js/app.js @@ -75,6 +75,62 @@ function showMainApp() { } showSection('generate'); + checkPendingJobs(); +} + +// Pending Jobs +async function checkPendingJobs() { + try { + const data = await api('/generate/pending'); + if (data.jobs && data.jobs.length > 0) { + showPendingJobsNotification(data.jobs); + } + } catch (error) { + console.error('Failed to check pending jobs:', error); + } +} + +function showPendingJobsNotification(jobs) { + // Remove existing banner if any + const existingBanner = document.querySelector('.pending-jobs-banner'); + if (existingBanner) existingBanner.remove(); + + const banner = document.createElement('div'); + banner.className = 'pending-jobs-banner'; + banner.innerHTML = ` + You have ${jobs.length} video${jobs.length > 1 ? 's' : ''} generating in the background +
+ + +
+ `; + document.querySelector('.main-content').prepend(banner); +} + +async function resumeLatestJob(contentId, jobId) { + // Switch to generate tab and show progress + showSection('generate'); + + const statusEl = document.getElementById('generation-status'); + const videoEl = document.getElementById('output-video'); + const btn = document.getElementById('generate-btn'); + + btn.disabled = true; + btn.textContent = 'Generating...'; + statusEl.className = 'status-message info'; + statusEl.textContent = 'Resuming job...'; + statusEl.classList.remove('hidden'); + videoEl.classList.add('hidden'); + + // Remove the banner + const banner = document.querySelector('.pending-jobs-banner'); + if (banner) banner.remove(); + + // Poll for completion + await pollJob(jobId, contentId, statusEl, videoEl); + + btn.disabled = false; + btn.textContent = 'Generate Video'; } // Login @@ -268,7 +324,7 @@ document.getElementById('generate-form').addEventListener('submit', async (e) => async function pollJob(jobId, contentId, statusEl, videoEl) { const startTime = Date.now(); - const maxTime = 10 * 60 * 1000; // 10 minutes + const maxTime = 25 * 60 * 1000; // 25 minutes while (Date.now() - startTime < maxTime) { const elapsed = Math.floor((Date.now() - startTime) / 1000); diff --git a/frontend/src/config.ts b/frontend/src/config.ts index db0b686..626db06 100644 --- a/frontend/src/config.ts +++ b/frontend/src/config.ts @@ -64,7 +64,7 @@ export const config = { endpointId: requireEnv('RUNPOD_ENDPOINT_ID'), baseUrl: 'https://api.runpod.ai/v2', pollIntervalMs: optionalEnvInt('RUNPOD_POLL_INTERVAL_MS', 5000), - maxTimeoutMs: optionalEnvInt('RUNPOD_MAX_TIMEOUT_MS', 600000), + maxTimeoutMs: optionalEnvInt('RUNPOD_MAX_TIMEOUT_MS', 1500000), }, // WebAuthn diff --git a/frontend/src/index.ts b/frontend/src/index.ts index 7cd3175..363f925 100644 --- a/frontend/src/index.ts +++ b/frontend/src/index.ts @@ -8,6 +8,7 @@ import { config } from './config.js'; import { initDatabase, closeDatabase } from './db/index.js'; import { createInitialAdmin } from './services/initService.js'; import { SQLiteSessionStore } from './services/sessionService.js'; +import { startJobProcessor, stopJobProcessor } from './services/jobProcessor.js'; import { apiRateLimiter } from './middleware/rateLimit.js'; import { errorHandler } from './middleware/errorHandler.js'; import { logger } from './utils/logger.js'; @@ -101,6 +102,9 @@ async function start() { // Create initial admin user if needed await createInitialAdmin(); + // Start background job processor + startJobProcessor(); + // Start server const server = app.listen(config.port, () => { logger.info({ port: config.port, env: config.nodeEnv }, 'Server started'); @@ -112,6 +116,7 @@ async function start() { server.close(() => { logger.info('HTTP server closed'); + stopJobProcessor(); sessionStore.close(); closeDatabase(); logger.info('Database closed'); diff --git a/frontend/src/routes/generate.ts b/frontend/src/routes/generate.ts index c7bf158..fdb5350 100644 --- a/frontend/src/routes/generate.ts +++ b/frontend/src/routes/generate.ts @@ -12,12 +12,28 @@ import { } from '../services/contentService.js'; import { logger } from '../utils/logger.js'; import type { AuthenticatedRequest } from '../types/index.js'; +import { getDb } from '../db/index.js'; const router = Router(); // All routes require auth router.use(requireAuth); +// Get user's pending jobs +router.get('/pending', (req, res) => { + const authReq = req as AuthenticatedRequest; + const db = getDb(); + + const jobs = db.prepare(` + SELECT id, runpod_job_id, prompt, created_at + FROM generated_content + WHERE user_id = ? AND status = 'processing' AND runpod_job_id IS NOT NULL + ORDER BY created_at DESC + `).all(authReq.user!.id) as { id: number; runpod_job_id: string; prompt: string; created_at: string }[]; + + res.json({ jobs }); +}); + // Submit generation job router.post('/', generationRateLimiter, asyncHandler(async (req, res) => { const authReq = req as AuthenticatedRequest; @@ -96,7 +112,6 @@ router.get('/:jobId/status', asyncHandler(async (req, res) => { // If completed, process the output if (status.status === 'COMPLETED' && status.output) { // Find the content record for this job - const { getDb } = await import('../db/index.js'); const db = getDb(); const row = db.prepare( 'SELECT id FROM generated_content WHERE runpod_job_id = ? AND user_id = ?' @@ -115,7 +130,6 @@ router.get('/:jobId/status', asyncHandler(async (req, res) => { } } else if (status.status === 'FAILED') { // Update content status to failed - const { getDb } = await import('../db/index.js'); const db = getDb(); const row = db.prepare( 'SELECT id FROM generated_content WHERE runpod_job_id = ? AND user_id = ?' diff --git a/frontend/src/services/jobProcessor.ts b/frontend/src/services/jobProcessor.ts new file mode 100644 index 0000000..a321841 --- /dev/null +++ b/frontend/src/services/jobProcessor.ts @@ -0,0 +1,75 @@ +import { getDb, type GeneratedContentRow } from '../db/index.js'; +import { getJobStatus } from './runpodService.js'; +import { updateContentStatus, saveContentFile } from './contentService.js'; +import { logger } from '../utils/logger.js'; + +let processorInterval: NodeJS.Timeout | null = null; + +const POLL_INTERVAL = 30000; // 30 seconds +const JOB_TIMEOUT = 25 * 60 * 1000; // 25 minutes + +export function startJobProcessor(): void { + logger.info('Starting background job processor'); + processorInterval = setInterval(processStuckJobs, POLL_INTERVAL); + // Run immediately on startup + processStuckJobs(); +} + +export function stopJobProcessor(): void { + if (processorInterval) { + clearInterval(processorInterval); + processorInterval = null; + logger.info('Stopped background job processor'); + } +} + +async function processStuckJobs(): Promise { + const db = getDb(); + + const pendingJobs = db.prepare(` + SELECT * FROM generated_content + WHERE status = 'processing' AND runpod_job_id IS NOT NULL + `).all() as GeneratedContentRow[]; + + if (pendingJobs.length === 0) { + return; + } + + logger.info({ count: pendingJobs.length }, 'Processing stuck jobs'); + + for (const job of pendingJobs) { + try { + const createdAt = new Date(job.created_at).getTime(); + const age = Date.now() - createdAt; + + // Timeout check + if (age > JOB_TIMEOUT) { + logger.warn({ contentId: job.id, age }, 'Job timed out'); + updateContentStatus(job.id, 'failed', { + errorMessage: 'Job timed out after 15 minutes', + }); + continue; + } + + // Poll RunPod + const status = await getJobStatus(job.runpod_job_id!); + + if (status.status === 'COMPLETED' && status.output?.outputs?.[0]) { + const output = status.output.outputs[0]; + if (output.data) { + saveContentFile(job.id, output.data); + } else { + updateContentStatus(job.id, 'completed', { fileSize: output.size }); + } + logger.info({ contentId: job.id }, 'Background processor completed job'); + } else if (status.status === 'FAILED') { + updateContentStatus(job.id, 'failed', { + errorMessage: status.error || status.output?.error || 'Job failed', + }); + logger.info({ contentId: job.id }, 'Background processor marked job as failed'); + } + } catch (error) { + logger.error({ error, contentId: job.id }, 'Error processing stuck job'); + } + } +} diff --git a/frontend/src/utils/validators.ts b/frontend/src/utils/validators.ts index 01b0cbc..9c39222 100644 --- a/frontend/src/utils/validators.ts +++ b/frontend/src/utils/validators.ts @@ -64,7 +64,7 @@ export const generationRequestSchema = z.object({ resolution: z.number().int().min(480).max(1080).optional().default(720), steps: z.number().int().min(1).max(50).optional().default(8), splitStep: z.number().int().min(1).max(20).optional().default(4), - timeout: z.number().int().min(60).max(600).optional().default(600), + timeout: z.number().int().min(60).max(1500).optional().default(1500), }); // MFA schemas