Files
comfyui-serverless/handler.py
Debian 85a07fcc5f
All checks were successful
Build and Push Docker Image / build (push) Successful in 3m25s
Add ComfyUI output capture for crash debugging
- Add background thread to read ComfyUI stdout in real-time
- Store last 200 lines in circular buffer
- Echo output to RunPod logs with [ComfyUI] prefix
- Include last 100 lines in error responses for debugging
- Add comfyui_output field to error responses

This will help diagnose why ComfyUI crashes during generation.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-11 03:15:10 +00:00

662 lines
22 KiB
Python

"""
ComfyUI RunPod Serverless Handler
Handles image/video generation workflows with ComfyUI API
Wan22-I2V-Remix Workflow Node Mapping:
- Node 148: LoadImage - image input
- Node 134: CLIPTextEncode - positive prompt
- Node 137: CLIPTextEncode - negative prompt
- Node 147: easy int - resolution (720 default)
- Node 150: INTConstant - steps (8 default)
- Node 151: INTConstant - split_step (4 default)
- Node 117: SaveVideo - output
"""
import os
import sys
import json
import time
import base64
import uuid
import subprocess
import signal
import threading
from collections import deque
import requests
from pathlib import Path
import runpod
# Configuration
COMFYUI_DIR = "/workspace/ComfyUI"
COMFYUI_PORT = 8188
COMFYUI_HOST = f"http://127.0.0.1:{COMFYUI_PORT}"
MAX_TIMEOUT = 1200 # 20 minutes max for video generation
POLL_INTERVAL = 1.0
STARTUP_TIMEOUT = 120
DEFAULT_WORKFLOW_PATH = "/workspace/workflows/Wan22-I2V-Remix-API.json"
# Wan22-I2V-Remix node IDs
NODE_IMAGE_INPUT = "148"
NODE_POSITIVE_PROMPT = "134"
NODE_NEGATIVE_PROMPT = "137"
NODE_RESOLUTION = "147"
NODE_STEPS = "150"
NODE_SPLIT_STEP = "151"
NODE_SAVE_VIDEO = "117"
# Global ComfyUI process and output capture
comfyui_process = None
comfyui_output_buffer = deque(maxlen=200) # Keep last 200 lines
comfyui_output_lock = threading.Lock()
comfyui_reader_thread = None
def _read_comfyui_output():
"""Background thread to read ComfyUI stdout and store in buffer."""
global comfyui_process, comfyui_output_buffer
while comfyui_process is not None:
try:
line = comfyui_process.stdout.readline()
if line:
decoded = line.decode('utf-8', errors='replace').rstrip()
with comfyui_output_lock:
comfyui_output_buffer.append(decoded)
print(f"[ComfyUI] {decoded}") # Echo to RunPod logs
elif comfyui_process.poll() is not None:
# Process ended
break
except Exception:
break
def get_comfyui_output(last_n: int = 50) -> list:
"""Get the last N lines of ComfyUI output."""
with comfyui_output_lock:
lines = list(comfyui_output_buffer)
return lines[-last_n:] if len(lines) > last_n else lines
class JobLogger:
"""Accumulates log messages with timestamps for returning to the client."""
def __init__(self):
self.logs = []
self.start_time = time.time()
def log(self, message: str):
"""Log a message with timestamp."""
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
elapsed = time.time() - self.start_time
entry = f"[{timestamp}] [{elapsed:.1f}s] {message}"
self.logs.append(entry)
print(entry) # Also print for RunPod console
def get_logs(self) -> list:
"""Return all accumulated logs."""
return self.logs
def start_comfyui(logger: JobLogger = None):
"""Start ComfyUI server if not already running."""
global comfyui_process, comfyui_reader_thread, comfyui_output_buffer
def log(msg):
if logger:
logger.log(msg)
else:
print(msg)
if comfyui_process is not None and comfyui_process.poll() is None:
log("ComfyUI server already running")
return True
# Clear output buffer for fresh start
with comfyui_output_lock:
comfyui_output_buffer.clear()
log("Starting ComfyUI server...")
comfyui_process = subprocess.Popen(
[
sys.executable, "main.py",
"--listen", "127.0.0.1",
"--port", str(COMFYUI_PORT),
"--disable-auto-launch"
],
cwd=COMFYUI_DIR,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
preexec_fn=os.setsid if hasattr(os, 'setsid') else None
)
# Start background thread to capture output
comfyui_reader_thread = threading.Thread(target=_read_comfyui_output, daemon=True)
comfyui_reader_thread.start()
# Wait for server to be ready
start_time = time.time()
last_status_time = start_time
while time.time() - start_time < STARTUP_TIMEOUT:
try:
resp = requests.get(f"{COMFYUI_HOST}/system_stats", timeout=2)
if resp.status_code == 200:
log("ComfyUI server ready")
return True
except requests.exceptions.RequestException:
pass
# Log progress every 15 seconds
if time.time() - last_status_time >= 15:
elapsed = int(time.time() - start_time)
log(f"Waiting for ComfyUI startup... ({elapsed}s)")
last_status_time = time.time()
time.sleep(1)
log("ComfyUI server failed to start after {STARTUP_TIMEOUT}s")
return False
def stop_comfyui():
"""Stop ComfyUI server."""
global comfyui_process
if comfyui_process is not None:
try:
os.killpg(os.getpgid(comfyui_process.pid), signal.SIGTERM)
except (OSError, ProcessLookupError):
comfyui_process.terminate()
comfyui_process = None
def load_default_workflow() -> dict:
"""Load the default Wan22-I2V-Remix workflow."""
workflow_path = Path(DEFAULT_WORKFLOW_PATH)
if not workflow_path.exists():
raise FileNotFoundError(f"Default workflow not found: {DEFAULT_WORKFLOW_PATH}")
with open(workflow_path) as f:
return json.load(f)
def convert_frontend_to_api(frontend_workflow: dict) -> dict:
"""Convert ComfyUI frontend format to API format."""
# If already in API format (no 'nodes' key), return as-is
if "nodes" not in frontend_workflow:
return frontend_workflow
api_workflow = {}
nodes = frontend_workflow.get("nodes", [])
links = frontend_workflow.get("links", [])
# Build set of active (non-bypassed) node IDs
active_nodes = {str(node["id"]) for node in nodes if node.get("mode") != 4}
# Build link lookup: link_id -> (source_node_id, source_slot)
# Only include links from active nodes
link_map = {}
for link in links:
link_id, src_node, src_slot, dst_node, dst_slot, link_type = link[:6]
if str(src_node) in active_nodes:
link_map[link_id] = (str(src_node), src_slot)
# Widget mappings for each node type: list of input names in order
WIDGET_MAPPINGS = {
"LoadImage": ["image", "upload"],
"CLIPTextEncode": ["text"],
"easy int": ["value"],
"INTConstant": ["value"],
"SaveVideo": ["filename_prefix", "format", "codec"],
"CreateVideo": ["fps"],
"RIFE VFI": ["ckpt_name", "clear_cache_after_n_frames", "multiplier", "fast_mode", "ensemble", "scale_factor"],
"CLIPLoader": ["clip_name", "type", "device"],
"GetImageSize": [],
"WanVideoTorchCompileSettings": [
"backend", "fullgraph", "mode", "dynamic", "dynamo_cache_size_limit",
"compile_transformer_blocks_only", "compile_cache_max_entries",
"compile_single_blocks", "compile_double_blocks"
],
"WanVideoBlockSwap": [
"blocks_to_swap", "offload_txt_emb", "offload_img_emb", "offload_txt_clip_emb",
"offload_modulation", "cpu_offload_streams", "use_async_transfer"
],
"WanVideoModelLoader": [
"model", "base_precision", "quantization", "load_device", "attention_mode", "lora_scale_mode"
],
"WanVideoLoraSelect": ["lora", "strength", "use_lora_sparse", "sparse_lora_blocks"],
"WanVideoVAELoader": ["model_name", "precision"],
"WanVideoTextEmbedBridge": [],
"WanVideoImageToVideoEncode": [
"width", "height", "num_frames", "noise_aug_strength",
"start_latent_strength", "end_latent_strength", "image_noise_aug",
"mask_noise_aug", "force_offload"
],
"WanVideoSampler": [
"shift", "cfg", "steps", "seed", "seed_mode", "force_offload", "scheduler",
"riflex_freq_index", "riflex_freq_dim", "riflex_freq_scale",
"use_comfy_pbar", "start_step", "end_step", "denoise"
],
"WanVideoDecode": [
"enable_tiling", "tile_x", "tile_y", "tile_stride_x", "tile_stride_y", "tiling_decoder"
],
"MathExpression|pysssss": ["expression"],
}
for node in nodes:
# Skip bypassed/muted nodes (mode 4)
if node.get("mode") == 4:
continue
node_id = str(node["id"])
class_type = node.get("type", "")
inputs = {}
# Process widget values using mappings
widgets_values = node.get("widgets_values", [])
widget_names = WIDGET_MAPPINGS.get(class_type, [])
for i, value in enumerate(widgets_values):
if i < len(widget_names) and widget_names[i]:
inputs[widget_names[i]] = value
# Process node inputs (connections) - these override widget values
for inp in node.get("inputs", []):
inp_name = inp["name"]
link_id = inp.get("link")
if link_id is not None and link_id in link_map:
src_node, src_slot = link_map[link_id]
inputs[inp_name] = [src_node, src_slot]
api_workflow[node_id] = {
"class_type": class_type,
"inputs": inputs
}
# Add meta if title exists
if "title" in node:
api_workflow[node_id]["_meta"] = {"title": node["title"]}
return api_workflow
def upload_image(image_base64: str, filename: str = None) -> str:
"""Upload base64 image to ComfyUI and return the filename."""
if filename is None:
filename = f"input_{uuid.uuid4().hex[:8]}.png"
# Decode base64
image_data = base64.b64decode(image_base64)
# Upload to ComfyUI
files = {
"image": (filename, image_data, "image/png"),
}
data = {
"overwrite": "true"
}
resp = requests.post(
f"{COMFYUI_HOST}/upload/image",
files=files,
data=data
)
if resp.status_code != 200:
raise Exception(f"Failed to upload image: {resp.text}")
result = resp.json()
return result.get("name", filename)
def inject_wan22_params(workflow: dict, params: dict) -> dict:
"""Inject parameters into Wan22-I2V-Remix workflow nodes."""
workflow = json.loads(json.dumps(workflow)) # Deep copy
# Image input (node 148)
if "image_filename" in params and NODE_IMAGE_INPUT in workflow:
workflow[NODE_IMAGE_INPUT]["inputs"]["image"] = params["image_filename"]
# Positive prompt (node 134)
if "prompt" in params and NODE_POSITIVE_PROMPT in workflow:
workflow[NODE_POSITIVE_PROMPT]["inputs"]["text"] = params["prompt"]
# Negative prompt (node 137) - optional override
if "negative_prompt" in params and NODE_NEGATIVE_PROMPT in workflow:
workflow[NODE_NEGATIVE_PROMPT]["inputs"]["text"] = params["negative_prompt"]
# Resolution (node 147)
if "resolution" in params and NODE_RESOLUTION in workflow:
workflow[NODE_RESOLUTION]["inputs"]["value"] = params["resolution"]
# Steps (node 150)
if "steps" in params and NODE_STEPS in workflow:
workflow[NODE_STEPS]["inputs"]["value"] = params["steps"]
# Split step (node 151)
if "split_step" in params and NODE_SPLIT_STEP in workflow:
workflow[NODE_SPLIT_STEP]["inputs"]["value"] = params["split_step"]
return workflow
def queue_workflow(workflow: dict, client_id: str = None) -> str:
"""Queue workflow and return prompt_id."""
if client_id is None:
client_id = uuid.uuid4().hex
payload = {
"prompt": workflow,
"client_id": client_id
}
resp = requests.post(
f"{COMFYUI_HOST}/prompt",
json=payload
)
if resp.status_code != 200:
raise Exception(f"Failed to queue workflow: {resp.text}")
result = resp.json()
# Debug: print full queue response
print(f"Queue response keys: {result.keys()}")
if "node_errors" in result and result["node_errors"]:
print(f"Node errors: {result['node_errors']}")
if "error" in result:
print(f"Queue error: {result['error']}")
return result["prompt_id"]
def get_history(prompt_id: str) -> dict:
"""Get execution history for a prompt."""
resp = requests.get(f"{COMFYUI_HOST}/history/{prompt_id}")
if resp.status_code != 200:
return {}
return resp.json()
def poll_for_completion(prompt_id: str, timeout: int = MAX_TIMEOUT, logger: JobLogger = None) -> dict:
"""Poll until workflow completes or timeout."""
start_time = time.time()
last_log_time = start_time
def log(msg):
if logger:
logger.log(msg)
else:
print(msg)
while time.time() - start_time < timeout:
history = get_history(prompt_id)
if prompt_id in history:
status = history[prompt_id].get("status", {})
# Check for completion
if status.get("completed", False):
return history[prompt_id]
# Check for error
if status.get("status_str") == "error":
raise Exception(f"Workflow execution failed: {status}")
# Log progress every 30 seconds
if time.time() - last_log_time >= 30:
elapsed = int(time.time() - start_time)
remaining = timeout - elapsed
log(f"Generating... ({elapsed}s elapsed, {remaining}s remaining)")
last_log_time = time.time()
time.sleep(POLL_INTERVAL)
raise TimeoutError(f"Workflow execution timed out after {timeout}s")
def get_file_type(filename: str) -> str:
"""Determine file type from extension."""
ext = Path(filename).suffix.lower()
if ext in [".mp4", ".webm", ".gif", ".mov", ".avi", ".mkv"]:
return "video"
return "image"
def get_output_files(history: dict) -> list:
"""Extract output file info from history."""
outputs = []
if "outputs" not in history:
return outputs
for node_id, node_output in history["outputs"].items():
# Handle image outputs (note: SaveVideo sometimes puts videos here)
if "images" in node_output:
for img in node_output["images"]:
filename = img["filename"]
outputs.append({
"type": get_file_type(filename),
"filename": filename,
"subfolder": img.get("subfolder", ""),
"type_folder": img.get("type", "output")
})
# Handle video outputs (SaveVideo node)
if "videos" in node_output:
for vid in node_output["videos"]:
outputs.append({
"type": "video",
"filename": vid["filename"],
"subfolder": vid.get("subfolder", ""),
"type_folder": vid.get("type", "output")
})
# Handle video outputs (VideoHelperSuite gifs)
if "gifs" in node_output:
for vid in node_output["gifs"]:
outputs.append({
"type": "video",
"filename": vid["filename"],
"subfolder": vid.get("subfolder", ""),
"type_folder": vid.get("type", "output")
})
# Handle generic files
if "files" in node_output:
for f in node_output["files"]:
filename = f.get("filename", "")
ext = Path(filename).suffix.lower()
file_type = "video" if ext in [".mp4", ".webm", ".gif", ".mov"] else "image"
outputs.append({
"type": file_type,
"filename": filename,
"subfolder": f.get("subfolder", ""),
"type_folder": f.get("type", "output")
})
return outputs
def fetch_output(output_info: dict) -> bytes:
"""Fetch output file from ComfyUI."""
params = {
"filename": output_info["filename"],
"subfolder": output_info["subfolder"],
"type": output_info["type_folder"]
}
resp = requests.get(f"{COMFYUI_HOST}/view", params=params)
if resp.status_code != 200:
raise Exception(f"Failed to fetch output: {resp.status_code}")
return resp.content
def handler(job: dict) -> dict:
"""
RunPod serverless handler.
Input schema:
{
"image": "base64 encoded image (required)",
"prompt": "positive prompt text (required)",
"negative_prompt": "negative prompt (optional)",
"resolution": 720 (optional, default 720),
"steps": 8 (optional, default 8),
"split_step": 4 (optional, default 4),
"timeout": 1200 (optional, max 1200),
"workflow": {} (optional, override default workflow)
}
"""
logger = JobLogger()
job_input = job.get("input", {})
logger.log("Job started")
# Validate required inputs
if "image" not in job_input or not job_input["image"]:
logger.log("ERROR: Missing required 'image' (base64) in input")
return {"error": "Missing required 'image' (base64) in input", "logs": logger.get_logs()}
if "prompt" not in job_input or not job_input["prompt"]:
logger.log("ERROR: Missing required 'prompt' in input")
return {"error": "Missing required 'prompt' in input", "logs": logger.get_logs()}
# Log input parameters
image_size_kb = len(job_input["image"]) * 3 / 4 / 1024 # Approximate decoded size
logger.log(f"Input image size: ~{image_size_kb:.1f} KB (base64)")
logger.log(f"Prompt: {job_input['prompt'][:100]}{'...' if len(job_input['prompt']) > 100 else ''}")
if job_input.get("negative_prompt"):
logger.log(f"Negative prompt: {job_input['negative_prompt'][:50]}...")
logger.log(f"Resolution: {job_input.get('resolution', 720)}")
logger.log(f"Steps: {job_input.get('steps', 8)}")
logger.log(f"Split step: {job_input.get('split_step', 4)}")
# Ensure ComfyUI is running
if not start_comfyui(logger):
logger.log("ERROR: Failed to start ComfyUI server")
return {"error": "Failed to start ComfyUI server", "logs": logger.get_logs()}
try:
# Load workflow (custom or default)
if "workflow" in job_input and job_input["workflow"]:
logger.log("Loading custom workflow")
workflow = job_input["workflow"]
workflow = convert_frontend_to_api(workflow)
else:
logger.log("Loading default workflow")
frontend_workflow = load_default_workflow()
workflow = convert_frontend_to_api(frontend_workflow)
logger.log(f"Workflow loaded with {len(workflow)} nodes")
# Upload image
image_filename = upload_image(job_input["image"])
logger.log(f"Uploaded image: {image_filename}")
# Build params for injection
params = {
"image_filename": image_filename,
"prompt": job_input["prompt"]
}
if "negative_prompt" in job_input:
params["negative_prompt"] = job_input["negative_prompt"]
if "resolution" in job_input:
params["resolution"] = int(job_input["resolution"])
if "steps" in job_input:
params["steps"] = int(job_input["steps"])
if "split_step" in job_input:
params["split_step"] = int(job_input["split_step"])
# Inject parameters into workflow
workflow = inject_wan22_params(workflow, params)
logger.log("Parameters injected into workflow")
# Queue workflow
client_id = uuid.uuid4().hex
prompt_id = queue_workflow(workflow, client_id)
logger.log(f"Workflow queued: {prompt_id}")
# Poll for completion
timeout = min(job_input.get("timeout", MAX_TIMEOUT), MAX_TIMEOUT)
logger.log(f"Starting generation (timeout: {timeout}s)")
history = poll_for_completion(prompt_id, timeout, logger)
logger.log("Workflow completed successfully")
# Get output files
outputs = get_output_files(history)
if not outputs:
logger.log("ERROR: No outputs generated")
return {"error": "No outputs generated", "logs": logger.get_logs()}
logger.log(f"Found {len(outputs)} output file(s)")
# Fetch and encode outputs
results = []
for output_info in outputs:
data = fetch_output(output_info)
size_mb = len(data) / 1024 / 1024
logger.log(f"Fetched output: {output_info['filename']} ({size_mb:.2f} MB)")
# Check size for video files
if output_info["type"] == "video" and len(data) > 10 * 1024 * 1024:
# For large videos, save to network volume and return path
output_path = Path("/runpod-volume/outputs") / output_info["filename"]
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_bytes(data)
logger.log(f"Large video saved to volume: {output_path}")
results.append({
"type": output_info["type"],
"filename": output_info["filename"],
"path": str(output_path),
"size": len(data)
})
else:
# Return as base64
results.append({
"type": output_info["type"],
"filename": output_info["filename"],
"data": base64.b64encode(data).decode("utf-8"),
"size": len(data)
})
logger.log("Job completed successfully")
return {
"status": "success",
"prompt_id": prompt_id,
"outputs": results,
"logs": logger.get_logs()
}
except TimeoutError as e:
logger.log(f"ERROR: Timeout - {str(e)}")
comfyui_logs = get_comfyui_output(100)
logger.log(f"ComfyUI output (last 100 lines):")
for line in comfyui_logs:
logger.log(f" {line}")
return {"error": str(e), "status": "timeout", "logs": logger.get_logs(), "comfyui_output": comfyui_logs}
except Exception as e:
import traceback
tb = traceback.format_exc()
logger.log(f"ERROR: {str(e)}")
logger.log(f"Traceback:\n{tb}")
comfyui_logs = get_comfyui_output(100)
logger.log(f"ComfyUI output (last 100 lines):")
for line in comfyui_logs:
logger.log(f" {line}")
return {"error": str(e), "status": "error", "logs": logger.get_logs(), "comfyui_output": comfyui_logs}
# RunPod serverless entry point
if __name__ == "__main__":
print("Starting ComfyUI RunPod Handler...")
runpod.serverless.start({"handler": handler})