Add job logging and increase timeout to 20 minutes
- Add JobLogger class to handler.py for structured timestamped logging - Increase MAX_TIMEOUT from 600s to 1200s (20 minutes) - Add logs column to generated_content table via migration - Store and display job execution logs in gallery UI - Add Logs button to gallery items with modal display Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
146
handler.py
146
handler.py
@@ -28,7 +28,7 @@ import runpod
|
||||
COMFYUI_DIR = "/workspace/ComfyUI"
|
||||
COMFYUI_PORT = 8188
|
||||
COMFYUI_HOST = f"http://127.0.0.1:{COMFYUI_PORT}"
|
||||
MAX_TIMEOUT = 600 # 10 minutes max for video generation
|
||||
MAX_TIMEOUT = 1200 # 20 minutes max for video generation
|
||||
POLL_INTERVAL = 1.0
|
||||
STARTUP_TIMEOUT = 120
|
||||
DEFAULT_WORKFLOW_PATH = "/workspace/workflows/Wan22-I2V-Remix-API.json"
|
||||
@@ -46,14 +46,41 @@ NODE_SAVE_VIDEO = "117"
|
||||
comfyui_process = None
|
||||
|
||||
|
||||
def start_comfyui():
|
||||
class JobLogger:
|
||||
"""Accumulates log messages with timestamps for returning to the client."""
|
||||
|
||||
def __init__(self):
|
||||
self.logs = []
|
||||
self.start_time = time.time()
|
||||
|
||||
def log(self, message: str):
|
||||
"""Log a message with timestamp."""
|
||||
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
|
||||
elapsed = time.time() - self.start_time
|
||||
entry = f"[{timestamp}] [{elapsed:.1f}s] {message}"
|
||||
self.logs.append(entry)
|
||||
print(entry) # Also print for RunPod console
|
||||
|
||||
def get_logs(self) -> list:
|
||||
"""Return all accumulated logs."""
|
||||
return self.logs
|
||||
|
||||
|
||||
def start_comfyui(logger: JobLogger = None):
|
||||
"""Start ComfyUI server if not already running."""
|
||||
global comfyui_process
|
||||
|
||||
def log(msg):
|
||||
if logger:
|
||||
logger.log(msg)
|
||||
else:
|
||||
print(msg)
|
||||
|
||||
if comfyui_process is not None and comfyui_process.poll() is None:
|
||||
log("ComfyUI server already running")
|
||||
return True
|
||||
|
||||
print("Starting ComfyUI server...")
|
||||
log("Starting ComfyUI server...")
|
||||
|
||||
comfyui_process = subprocess.Popen(
|
||||
[
|
||||
@@ -70,17 +97,25 @@ def start_comfyui():
|
||||
|
||||
# Wait for server to be ready
|
||||
start_time = time.time()
|
||||
last_status_time = start_time
|
||||
while time.time() - start_time < STARTUP_TIMEOUT:
|
||||
try:
|
||||
resp = requests.get(f"{COMFYUI_HOST}/system_stats", timeout=2)
|
||||
if resp.status_code == 200:
|
||||
print("ComfyUI server ready")
|
||||
log("ComfyUI server ready")
|
||||
return True
|
||||
except requests.exceptions.RequestException:
|
||||
pass
|
||||
|
||||
# Log progress every 15 seconds
|
||||
if time.time() - last_status_time >= 15:
|
||||
elapsed = int(time.time() - start_time)
|
||||
log(f"Waiting for ComfyUI startup... ({elapsed}s)")
|
||||
last_status_time = time.time()
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
print("ComfyUI server failed to start")
|
||||
log("ComfyUI server failed to start after {STARTUP_TIMEOUT}s")
|
||||
return False
|
||||
|
||||
|
||||
@@ -305,9 +340,16 @@ def get_history(prompt_id: str) -> dict:
|
||||
return resp.json()
|
||||
|
||||
|
||||
def poll_for_completion(prompt_id: str, timeout: int = MAX_TIMEOUT) -> dict:
|
||||
def poll_for_completion(prompt_id: str, timeout: int = MAX_TIMEOUT, logger: JobLogger = None) -> dict:
|
||||
"""Poll until workflow completes or timeout."""
|
||||
start_time = time.time()
|
||||
last_log_time = start_time
|
||||
|
||||
def log(msg):
|
||||
if logger:
|
||||
logger.log(msg)
|
||||
else:
|
||||
print(msg)
|
||||
|
||||
while time.time() - start_time < timeout:
|
||||
history = get_history(prompt_id)
|
||||
@@ -323,6 +365,13 @@ def poll_for_completion(prompt_id: str, timeout: int = MAX_TIMEOUT) -> dict:
|
||||
if status.get("status_str") == "error":
|
||||
raise Exception(f"Workflow execution failed: {status}")
|
||||
|
||||
# Log progress every 30 seconds
|
||||
if time.time() - last_log_time >= 30:
|
||||
elapsed = int(time.time() - start_time)
|
||||
remaining = timeout - elapsed
|
||||
log(f"Generating... ({elapsed}s elapsed, {remaining}s remaining)")
|
||||
last_log_time = time.time()
|
||||
|
||||
time.sleep(POLL_INTERVAL)
|
||||
|
||||
raise TimeoutError(f"Workflow execution timed out after {timeout}s")
|
||||
@@ -419,37 +468,55 @@ def handler(job: dict) -> dict:
|
||||
"resolution": 720 (optional, default 720),
|
||||
"steps": 8 (optional, default 8),
|
||||
"split_step": 4 (optional, default 4),
|
||||
"timeout": 600 (optional, max 600),
|
||||
"timeout": 1200 (optional, max 1200),
|
||||
"workflow": {} (optional, override default workflow)
|
||||
}
|
||||
"""
|
||||
logger = JobLogger()
|
||||
job_input = job.get("input", {})
|
||||
|
||||
logger.log("Job started")
|
||||
|
||||
# Validate required inputs
|
||||
if "image" not in job_input or not job_input["image"]:
|
||||
return {"error": "Missing required 'image' (base64) in input"}
|
||||
logger.log("ERROR: Missing required 'image' (base64) in input")
|
||||
return {"error": "Missing required 'image' (base64) in input", "logs": logger.get_logs()}
|
||||
|
||||
if "prompt" not in job_input or not job_input["prompt"]:
|
||||
return {"error": "Missing required 'prompt' in input"}
|
||||
logger.log("ERROR: Missing required 'prompt' in input")
|
||||
return {"error": "Missing required 'prompt' in input", "logs": logger.get_logs()}
|
||||
|
||||
# Log input parameters
|
||||
image_size_kb = len(job_input["image"]) * 3 / 4 / 1024 # Approximate decoded size
|
||||
logger.log(f"Input image size: ~{image_size_kb:.1f} KB (base64)")
|
||||
logger.log(f"Prompt: {job_input['prompt'][:100]}{'...' if len(job_input['prompt']) > 100 else ''}")
|
||||
if job_input.get("negative_prompt"):
|
||||
logger.log(f"Negative prompt: {job_input['negative_prompt'][:50]}...")
|
||||
logger.log(f"Resolution: {job_input.get('resolution', 720)}")
|
||||
logger.log(f"Steps: {job_input.get('steps', 8)}")
|
||||
logger.log(f"Split step: {job_input.get('split_step', 4)}")
|
||||
|
||||
# Ensure ComfyUI is running
|
||||
if not start_comfyui():
|
||||
return {"error": "Failed to start ComfyUI server"}
|
||||
if not start_comfyui(logger):
|
||||
logger.log("ERROR: Failed to start ComfyUI server")
|
||||
return {"error": "Failed to start ComfyUI server", "logs": logger.get_logs()}
|
||||
|
||||
try:
|
||||
# Load workflow (custom or default)
|
||||
if "workflow" in job_input and job_input["workflow"]:
|
||||
logger.log("Loading custom workflow")
|
||||
workflow = job_input["workflow"]
|
||||
# Convert frontend format if needed
|
||||
workflow = convert_frontend_to_api(workflow)
|
||||
else:
|
||||
# Load and convert default workflow
|
||||
logger.log("Loading default workflow")
|
||||
frontend_workflow = load_default_workflow()
|
||||
workflow = convert_frontend_to_api(frontend_workflow)
|
||||
|
||||
logger.log(f"Workflow loaded with {len(workflow)} nodes")
|
||||
|
||||
# Upload image
|
||||
image_filename = upload_image(job_input["image"])
|
||||
print(f"Uploaded image: {image_filename}")
|
||||
logger.log(f"Uploaded image: {image_filename}")
|
||||
|
||||
# Build params for injection
|
||||
params = {
|
||||
@@ -471,48 +538,34 @@ def handler(job: dict) -> dict:
|
||||
|
||||
# Inject parameters into workflow
|
||||
workflow = inject_wan22_params(workflow, params)
|
||||
|
||||
# Debug: print output chain nodes to verify connections
|
||||
print("=== Workflow Output Chain ===")
|
||||
# Check the output chain: 117 <- 116 <- 115 <- 158 <- 140
|
||||
for node_id in ["117", "116", "115"]:
|
||||
if node_id in workflow:
|
||||
node = workflow[node_id]
|
||||
print(f"Node {node_id} ({node['class_type']}): {node['inputs']}")
|
||||
else:
|
||||
print(f"Node {node_id}: MISSING FROM WORKFLOW!")
|
||||
print(f"Total nodes in workflow: {len(workflow)}")
|
||||
logger.log("Parameters injected into workflow")
|
||||
|
||||
# Queue workflow
|
||||
client_id = uuid.uuid4().hex
|
||||
prompt_id = queue_workflow(workflow, client_id)
|
||||
print(f"Queued workflow: {prompt_id}")
|
||||
logger.log(f"Workflow queued: {prompt_id}")
|
||||
|
||||
# Poll for completion
|
||||
timeout = min(job_input.get("timeout", MAX_TIMEOUT), MAX_TIMEOUT)
|
||||
history = poll_for_completion(prompt_id, timeout)
|
||||
print("Workflow completed")
|
||||
|
||||
# Debug: print history structure
|
||||
print(f"History keys: {history.keys()}")
|
||||
if "outputs" in history:
|
||||
print(f"Output nodes: {list(history['outputs'].keys())}")
|
||||
for node_id, node_out in history["outputs"].items():
|
||||
print(f" Node {node_id}: {list(node_out.keys())}")
|
||||
if "status" in history:
|
||||
print(f"Status: {history['status']}")
|
||||
logger.log(f"Starting generation (timeout: {timeout}s)")
|
||||
history = poll_for_completion(prompt_id, timeout, logger)
|
||||
logger.log("Workflow completed successfully")
|
||||
|
||||
# Get output files
|
||||
outputs = get_output_files(history)
|
||||
|
||||
if not outputs:
|
||||
return {"error": "No outputs generated"}
|
||||
logger.log("ERROR: No outputs generated")
|
||||
return {"error": "No outputs generated", "logs": logger.get_logs()}
|
||||
|
||||
logger.log(f"Found {len(outputs)} output file(s)")
|
||||
|
||||
# Fetch and encode outputs
|
||||
results = []
|
||||
for output_info in outputs:
|
||||
data = fetch_output(output_info)
|
||||
print(f"Fetched output: {output_info['filename']} ({len(data)} bytes)")
|
||||
size_mb = len(data) / 1024 / 1024
|
||||
logger.log(f"Fetched output: {output_info['filename']} ({size_mb:.2f} MB)")
|
||||
|
||||
# Check size for video files
|
||||
if output_info["type"] == "video" and len(data) > 10 * 1024 * 1024:
|
||||
@@ -520,6 +573,7 @@ def handler(job: dict) -> dict:
|
||||
output_path = Path("/runpod-volume/outputs") / output_info["filename"]
|
||||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
output_path.write_bytes(data)
|
||||
logger.log(f"Large video saved to volume: {output_path}")
|
||||
results.append({
|
||||
"type": output_info["type"],
|
||||
"filename": output_info["filename"],
|
||||
@@ -535,18 +589,24 @@ def handler(job: dict) -> dict:
|
||||
"size": len(data)
|
||||
})
|
||||
|
||||
logger.log("Job completed successfully")
|
||||
|
||||
return {
|
||||
"status": "success",
|
||||
"prompt_id": prompt_id,
|
||||
"outputs": results
|
||||
"outputs": results,
|
||||
"logs": logger.get_logs()
|
||||
}
|
||||
|
||||
except TimeoutError as e:
|
||||
return {"error": str(e), "status": "timeout"}
|
||||
logger.log(f"ERROR: Timeout - {str(e)}")
|
||||
return {"error": str(e), "status": "timeout", "logs": logger.get_logs()}
|
||||
except Exception as e:
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
return {"error": str(e), "status": "error"}
|
||||
tb = traceback.format_exc()
|
||||
logger.log(f"ERROR: {str(e)}")
|
||||
logger.log(f"Traceback:\n{tb}")
|
||||
return {"error": str(e), "status": "error", "logs": logger.get_logs()}
|
||||
|
||||
|
||||
# RunPod serverless entry point
|
||||
|
||||
Reference in New Issue
Block a user