Fix stuck processing jobs and increase timeouts
Background Job Processor: - Add src/services/jobProcessor.ts that polls RunPod every 30s for stuck jobs - Automatically completes or fails jobs that were abandoned (user navigated away) - Times out jobs after 25 minutes Client-Side Resume: - Add GET /api/generate/pending endpoint to fetch user's processing jobs - Add checkPendingJobs() that runs on login/page load - Show notification banner when user has jobs generating in background - Add "View Progress" button to resume polling for a job Timeout Increases (10min → 25min): - src/utils/validators.ts: request validation max/default - src/config.ts: RUNPOD_MAX_TIMEOUT_MS default - public/js/app.js: client-side polling maxTime - src/services/jobProcessor.ts: background processor timeout CI/CD Optimization: - Add paths-ignore to backend build.yaml to skip rebuilds on frontend-only changes 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -64,7 +64,7 @@ export const config = {
|
||||
endpointId: requireEnv('RUNPOD_ENDPOINT_ID'),
|
||||
baseUrl: 'https://api.runpod.ai/v2',
|
||||
pollIntervalMs: optionalEnvInt('RUNPOD_POLL_INTERVAL_MS', 5000),
|
||||
maxTimeoutMs: optionalEnvInt('RUNPOD_MAX_TIMEOUT_MS', 600000),
|
||||
maxTimeoutMs: optionalEnvInt('RUNPOD_MAX_TIMEOUT_MS', 1500000),
|
||||
},
|
||||
|
||||
// WebAuthn
|
||||
|
||||
@@ -8,6 +8,7 @@ import { config } from './config.js';
|
||||
import { initDatabase, closeDatabase } from './db/index.js';
|
||||
import { createInitialAdmin } from './services/initService.js';
|
||||
import { SQLiteSessionStore } from './services/sessionService.js';
|
||||
import { startJobProcessor, stopJobProcessor } from './services/jobProcessor.js';
|
||||
import { apiRateLimiter } from './middleware/rateLimit.js';
|
||||
import { errorHandler } from './middleware/errorHandler.js';
|
||||
import { logger } from './utils/logger.js';
|
||||
@@ -101,6 +102,9 @@ async function start() {
|
||||
// Create initial admin user if needed
|
||||
await createInitialAdmin();
|
||||
|
||||
// Start background job processor
|
||||
startJobProcessor();
|
||||
|
||||
// Start server
|
||||
const server = app.listen(config.port, () => {
|
||||
logger.info({ port: config.port, env: config.nodeEnv }, 'Server started');
|
||||
@@ -112,6 +116,7 @@ async function start() {
|
||||
|
||||
server.close(() => {
|
||||
logger.info('HTTP server closed');
|
||||
stopJobProcessor();
|
||||
sessionStore.close();
|
||||
closeDatabase();
|
||||
logger.info('Database closed');
|
||||
|
||||
@@ -12,12 +12,28 @@ import {
|
||||
} from '../services/contentService.js';
|
||||
import { logger } from '../utils/logger.js';
|
||||
import type { AuthenticatedRequest } from '../types/index.js';
|
||||
import { getDb } from '../db/index.js';
|
||||
|
||||
const router = Router();
|
||||
|
||||
// All routes require auth
|
||||
router.use(requireAuth);
|
||||
|
||||
// Get user's pending jobs
|
||||
router.get('/pending', (req, res) => {
|
||||
const authReq = req as AuthenticatedRequest;
|
||||
const db = getDb();
|
||||
|
||||
const jobs = db.prepare(`
|
||||
SELECT id, runpod_job_id, prompt, created_at
|
||||
FROM generated_content
|
||||
WHERE user_id = ? AND status = 'processing' AND runpod_job_id IS NOT NULL
|
||||
ORDER BY created_at DESC
|
||||
`).all(authReq.user!.id) as { id: number; runpod_job_id: string; prompt: string; created_at: string }[];
|
||||
|
||||
res.json({ jobs });
|
||||
});
|
||||
|
||||
// Submit generation job
|
||||
router.post('/', generationRateLimiter, asyncHandler(async (req, res) => {
|
||||
const authReq = req as AuthenticatedRequest;
|
||||
@@ -96,7 +112,6 @@ router.get('/:jobId/status', asyncHandler(async (req, res) => {
|
||||
// If completed, process the output
|
||||
if (status.status === 'COMPLETED' && status.output) {
|
||||
// Find the content record for this job
|
||||
const { getDb } = await import('../db/index.js');
|
||||
const db = getDb();
|
||||
const row = db.prepare(
|
||||
'SELECT id FROM generated_content WHERE runpod_job_id = ? AND user_id = ?'
|
||||
@@ -115,7 +130,6 @@ router.get('/:jobId/status', asyncHandler(async (req, res) => {
|
||||
}
|
||||
} else if (status.status === 'FAILED') {
|
||||
// Update content status to failed
|
||||
const { getDb } = await import('../db/index.js');
|
||||
const db = getDb();
|
||||
const row = db.prepare(
|
||||
'SELECT id FROM generated_content WHERE runpod_job_id = ? AND user_id = ?'
|
||||
|
||||
75
frontend/src/services/jobProcessor.ts
Normal file
75
frontend/src/services/jobProcessor.ts
Normal file
@@ -0,0 +1,75 @@
|
||||
import { getDb, type GeneratedContentRow } from '../db/index.js';
|
||||
import { getJobStatus } from './runpodService.js';
|
||||
import { updateContentStatus, saveContentFile } from './contentService.js';
|
||||
import { logger } from '../utils/logger.js';
|
||||
|
||||
let processorInterval: NodeJS.Timeout | null = null;
|
||||
|
||||
const POLL_INTERVAL = 30000; // 30 seconds
|
||||
const JOB_TIMEOUT = 25 * 60 * 1000; // 25 minutes
|
||||
|
||||
export function startJobProcessor(): void {
|
||||
logger.info('Starting background job processor');
|
||||
processorInterval = setInterval(processStuckJobs, POLL_INTERVAL);
|
||||
// Run immediately on startup
|
||||
processStuckJobs();
|
||||
}
|
||||
|
||||
export function stopJobProcessor(): void {
|
||||
if (processorInterval) {
|
||||
clearInterval(processorInterval);
|
||||
processorInterval = null;
|
||||
logger.info('Stopped background job processor');
|
||||
}
|
||||
}
|
||||
|
||||
async function processStuckJobs(): Promise<void> {
|
||||
const db = getDb();
|
||||
|
||||
const pendingJobs = db.prepare(`
|
||||
SELECT * FROM generated_content
|
||||
WHERE status = 'processing' AND runpod_job_id IS NOT NULL
|
||||
`).all() as GeneratedContentRow[];
|
||||
|
||||
if (pendingJobs.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
logger.info({ count: pendingJobs.length }, 'Processing stuck jobs');
|
||||
|
||||
for (const job of pendingJobs) {
|
||||
try {
|
||||
const createdAt = new Date(job.created_at).getTime();
|
||||
const age = Date.now() - createdAt;
|
||||
|
||||
// Timeout check
|
||||
if (age > JOB_TIMEOUT) {
|
||||
logger.warn({ contentId: job.id, age }, 'Job timed out');
|
||||
updateContentStatus(job.id, 'failed', {
|
||||
errorMessage: 'Job timed out after 15 minutes',
|
||||
});
|
||||
continue;
|
||||
}
|
||||
|
||||
// Poll RunPod
|
||||
const status = await getJobStatus(job.runpod_job_id!);
|
||||
|
||||
if (status.status === 'COMPLETED' && status.output?.outputs?.[0]) {
|
||||
const output = status.output.outputs[0];
|
||||
if (output.data) {
|
||||
saveContentFile(job.id, output.data);
|
||||
} else {
|
||||
updateContentStatus(job.id, 'completed', { fileSize: output.size });
|
||||
}
|
||||
logger.info({ contentId: job.id }, 'Background processor completed job');
|
||||
} else if (status.status === 'FAILED') {
|
||||
updateContentStatus(job.id, 'failed', {
|
||||
errorMessage: status.error || status.output?.error || 'Job failed',
|
||||
});
|
||||
logger.info({ contentId: job.id }, 'Background processor marked job as failed');
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error({ error, contentId: job.id }, 'Error processing stuck job');
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -64,7 +64,7 @@ export const generationRequestSchema = z.object({
|
||||
resolution: z.number().int().min(480).max(1080).optional().default(720),
|
||||
steps: z.number().int().min(1).max(50).optional().default(8),
|
||||
splitStep: z.number().int().min(1).max(20).optional().default(4),
|
||||
timeout: z.number().int().min(60).max(600).optional().default(600),
|
||||
timeout: z.number().int().min(60).max(1500).optional().default(1500),
|
||||
});
|
||||
|
||||
// MFA schemas
|
||||
|
||||
Reference in New Issue
Block a user