""" ComfyUI RunPod Serverless Handler Handles image/video generation workflows with ComfyUI API Wan22-I2V-Remix Workflow Node Mapping: - Node 148: LoadImage - image input - Node 134: CLIPTextEncode - positive prompt - Node 137: CLIPTextEncode - negative prompt - Node 147: easy int - resolution (720 default) - Node 150: INTConstant - steps (8 default) - Node 151: INTConstant - split_step (4 default) - Node 117: SaveVideo - output """ import os import sys import json import time import base64 import uuid import subprocess import signal import threading from collections import deque import requests from pathlib import Path import runpod # Configuration COMFYUI_DIR = "/workspace/ComfyUI" COMFYUI_PORT = 8188 COMFYUI_HOST = f"http://127.0.0.1:{COMFYUI_PORT}" MAX_TIMEOUT = 1200 # 20 minutes max for video generation POLL_INTERVAL = 1.0 STARTUP_TIMEOUT = 120 DEFAULT_WORKFLOW_PATH = "/workspace/workflows/Wan22-I2V-Remix-API.json" # Wan22-I2V-Remix node IDs NODE_IMAGE_INPUT = "148" NODE_POSITIVE_PROMPT = "134" NODE_NEGATIVE_PROMPT = "137" NODE_RESOLUTION = "147" NODE_STEPS = "150" NODE_SPLIT_STEP = "151" NODE_SAVE_VIDEO = "117" # Global ComfyUI process and output capture comfyui_process = None comfyui_output_buffer = deque(maxlen=200) # Keep last 200 lines comfyui_output_lock = threading.Lock() comfyui_reader_thread = None def _read_comfyui_output(): """Background thread to read ComfyUI stdout and store in buffer.""" global comfyui_process, comfyui_output_buffer while comfyui_process is not None: try: line = comfyui_process.stdout.readline() if line: decoded = line.decode('utf-8', errors='replace').rstrip() with comfyui_output_lock: comfyui_output_buffer.append(decoded) print(f"[ComfyUI] {decoded}") # Echo to RunPod logs elif comfyui_process.poll() is not None: # Process ended break except Exception: break def get_comfyui_output(last_n: int = 50) -> list: """Get the last N lines of ComfyUI output.""" with comfyui_output_lock: lines = list(comfyui_output_buffer) return lines[-last_n:] if len(lines) > last_n else lines class JobLogger: """Accumulates log messages with timestamps for returning to the client.""" def __init__(self): self.logs = [] self.start_time = time.time() def log(self, message: str): """Log a message with timestamp.""" timestamp = time.strftime("%Y-%m-%d %H:%M:%S") elapsed = time.time() - self.start_time entry = f"[{timestamp}] [{elapsed:.1f}s] {message}" self.logs.append(entry) print(entry) # Also print for RunPod console def get_logs(self) -> list: """Return all accumulated logs.""" return self.logs def start_comfyui(logger: JobLogger = None): """Start ComfyUI server if not already running.""" global comfyui_process, comfyui_reader_thread, comfyui_output_buffer def log(msg): if logger: logger.log(msg) else: print(msg) if comfyui_process is not None and comfyui_process.poll() is None: log("ComfyUI server already running") return True # Clear output buffer for fresh start with comfyui_output_lock: comfyui_output_buffer.clear() log("Starting ComfyUI server...") comfyui_process = subprocess.Popen( [ sys.executable, "main.py", "--listen", "127.0.0.1", "--port", str(COMFYUI_PORT), "--disable-auto-launch" ], cwd=COMFYUI_DIR, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, preexec_fn=os.setsid if hasattr(os, 'setsid') else None ) # Start background thread to capture output comfyui_reader_thread = threading.Thread(target=_read_comfyui_output, daemon=True) comfyui_reader_thread.start() # Wait for server to be ready start_time = time.time() last_status_time = start_time while time.time() - start_time < STARTUP_TIMEOUT: try: resp = requests.get(f"{COMFYUI_HOST}/system_stats", timeout=2) if resp.status_code == 200: log("ComfyUI server ready") return True except requests.exceptions.RequestException: pass # Log progress every 15 seconds if time.time() - last_status_time >= 15: elapsed = int(time.time() - start_time) log(f"Waiting for ComfyUI startup... ({elapsed}s)") last_status_time = time.time() time.sleep(1) log("ComfyUI server failed to start after {STARTUP_TIMEOUT}s") return False def stop_comfyui(): """Stop ComfyUI server.""" global comfyui_process if comfyui_process is not None: try: os.killpg(os.getpgid(comfyui_process.pid), signal.SIGTERM) except (OSError, ProcessLookupError): comfyui_process.terminate() comfyui_process = None def load_default_workflow() -> dict: """Load the default Wan22-I2V-Remix workflow.""" workflow_path = Path(DEFAULT_WORKFLOW_PATH) if not workflow_path.exists(): raise FileNotFoundError(f"Default workflow not found: {DEFAULT_WORKFLOW_PATH}") with open(workflow_path) as f: return json.load(f) def convert_frontend_to_api(frontend_workflow: dict) -> dict: """Convert ComfyUI frontend format to API format.""" # If already in API format (no 'nodes' key), return as-is if "nodes" not in frontend_workflow: return frontend_workflow api_workflow = {} nodes = frontend_workflow.get("nodes", []) links = frontend_workflow.get("links", []) # Build set of active (non-bypassed) node IDs active_nodes = {str(node["id"]) for node in nodes if node.get("mode") != 4} # Build link lookup: link_id -> (source_node_id, source_slot) # Only include links from active nodes link_map = {} for link in links: link_id, src_node, src_slot, dst_node, dst_slot, link_type = link[:6] if str(src_node) in active_nodes: link_map[link_id] = (str(src_node), src_slot) # Widget mappings for each node type: list of input names in order WIDGET_MAPPINGS = { "LoadImage": ["image", "upload"], "CLIPTextEncode": ["text"], "easy int": ["value"], "INTConstant": ["value"], "SaveVideo": ["filename_prefix", "format", "codec"], "CreateVideo": ["fps"], "RIFE VFI": ["ckpt_name", "clear_cache_after_n_frames", "multiplier", "fast_mode", "ensemble", "scale_factor"], "CLIPLoader": ["clip_name", "type", "device"], "GetImageSize": [], "WanVideoTorchCompileSettings": [ "backend", "fullgraph", "mode", "dynamic", "dynamo_cache_size_limit", "compile_transformer_blocks_only", "compile_cache_max_entries", "compile_single_blocks", "compile_double_blocks" ], "WanVideoBlockSwap": [ "blocks_to_swap", "offload_txt_emb", "offload_img_emb", "offload_txt_clip_emb", "offload_modulation", "cpu_offload_streams", "use_async_transfer" ], "WanVideoModelLoader": [ "model", "base_precision", "quantization", "load_device", "attention_mode", "lora_scale_mode" ], "WanVideoLoraSelect": ["lora", "strength", "use_lora_sparse", "sparse_lora_blocks"], "WanVideoVAELoader": ["model_name", "precision"], "WanVideoTextEmbedBridge": [], "WanVideoImageToVideoEncode": [ "width", "height", "num_frames", "noise_aug_strength", "start_latent_strength", "end_latent_strength", "image_noise_aug", "mask_noise_aug", "force_offload" ], "WanVideoSampler": [ "shift", "cfg", "steps", "seed", "seed_mode", "force_offload", "scheduler", "riflex_freq_index", "riflex_freq_dim", "riflex_freq_scale", "use_comfy_pbar", "start_step", "end_step", "denoise" ], "WanVideoDecode": [ "enable_tiling", "tile_x", "tile_y", "tile_stride_x", "tile_stride_y", "tiling_decoder" ], "MathExpression|pysssss": ["expression"], } for node in nodes: # Skip bypassed/muted nodes (mode 4) if node.get("mode") == 4: continue node_id = str(node["id"]) class_type = node.get("type", "") inputs = {} # Process widget values using mappings widgets_values = node.get("widgets_values", []) widget_names = WIDGET_MAPPINGS.get(class_type, []) for i, value in enumerate(widgets_values): if i < len(widget_names) and widget_names[i]: inputs[widget_names[i]] = value # Process node inputs (connections) - these override widget values for inp in node.get("inputs", []): inp_name = inp["name"] link_id = inp.get("link") if link_id is not None and link_id in link_map: src_node, src_slot = link_map[link_id] inputs[inp_name] = [src_node, src_slot] api_workflow[node_id] = { "class_type": class_type, "inputs": inputs } # Add meta if title exists if "title" in node: api_workflow[node_id]["_meta"] = {"title": node["title"]} return api_workflow def upload_image(image_base64: str, filename: str = None) -> str: """Upload base64 image to ComfyUI and return the filename.""" if filename is None: filename = f"input_{uuid.uuid4().hex[:8]}.png" # Decode base64 image_data = base64.b64decode(image_base64) # Upload to ComfyUI files = { "image": (filename, image_data, "image/png"), } data = { "overwrite": "true" } resp = requests.post( f"{COMFYUI_HOST}/upload/image", files=files, data=data ) if resp.status_code != 200: raise Exception(f"Failed to upload image: {resp.text}") result = resp.json() return result.get("name", filename) def inject_wan22_params(workflow: dict, params: dict) -> dict: """Inject parameters into Wan22-I2V-Remix workflow nodes.""" workflow = json.loads(json.dumps(workflow)) # Deep copy # Image input (node 148) if "image_filename" in params and NODE_IMAGE_INPUT in workflow: workflow[NODE_IMAGE_INPUT]["inputs"]["image"] = params["image_filename"] # Positive prompt (node 134) if "prompt" in params and NODE_POSITIVE_PROMPT in workflow: workflow[NODE_POSITIVE_PROMPT]["inputs"]["text"] = params["prompt"] # Negative prompt (node 137) - optional override if "negative_prompt" in params and NODE_NEGATIVE_PROMPT in workflow: workflow[NODE_NEGATIVE_PROMPT]["inputs"]["text"] = params["negative_prompt"] # Resolution (node 147) if "resolution" in params and NODE_RESOLUTION in workflow: workflow[NODE_RESOLUTION]["inputs"]["value"] = params["resolution"] # Steps (node 150) if "steps" in params and NODE_STEPS in workflow: workflow[NODE_STEPS]["inputs"]["value"] = params["steps"] # Split step (node 151) if "split_step" in params and NODE_SPLIT_STEP in workflow: workflow[NODE_SPLIT_STEP]["inputs"]["value"] = params["split_step"] return workflow def queue_workflow(workflow: dict, client_id: str = None) -> str: """Queue workflow and return prompt_id.""" if client_id is None: client_id = uuid.uuid4().hex payload = { "prompt": workflow, "client_id": client_id } resp = requests.post( f"{COMFYUI_HOST}/prompt", json=payload ) if resp.status_code != 200: raise Exception(f"Failed to queue workflow: {resp.text}") result = resp.json() # Debug: print full queue response print(f"Queue response keys: {result.keys()}") if "node_errors" in result and result["node_errors"]: print(f"Node errors: {result['node_errors']}") if "error" in result: print(f"Queue error: {result['error']}") return result["prompt_id"] def get_history(prompt_id: str) -> dict: """Get execution history for a prompt.""" resp = requests.get(f"{COMFYUI_HOST}/history/{prompt_id}") if resp.status_code != 200: return {} return resp.json() def poll_for_completion(prompt_id: str, timeout: int = MAX_TIMEOUT, logger: JobLogger = None) -> dict: """Poll until workflow completes or timeout.""" start_time = time.time() last_log_time = start_time def log(msg): if logger: logger.log(msg) else: print(msg) while time.time() - start_time < timeout: history = get_history(prompt_id) if prompt_id in history: status = history[prompt_id].get("status", {}) # Check for completion if status.get("completed", False): return history[prompt_id] # Check for error if status.get("status_str") == "error": raise Exception(f"Workflow execution failed: {status}") # Log progress every 30 seconds if time.time() - last_log_time >= 30: elapsed = int(time.time() - start_time) remaining = timeout - elapsed log(f"Generating... ({elapsed}s elapsed, {remaining}s remaining)") last_log_time = time.time() time.sleep(POLL_INTERVAL) raise TimeoutError(f"Workflow execution timed out after {timeout}s") def get_file_type(filename: str) -> str: """Determine file type from extension.""" ext = Path(filename).suffix.lower() if ext in [".mp4", ".webm", ".gif", ".mov", ".avi", ".mkv"]: return "video" return "image" def get_output_files(history: dict) -> list: """Extract output file info from history.""" outputs = [] if "outputs" not in history: return outputs for node_id, node_output in history["outputs"].items(): # Handle image outputs (note: SaveVideo sometimes puts videos here) if "images" in node_output: for img in node_output["images"]: filename = img["filename"] outputs.append({ "type": get_file_type(filename), "filename": filename, "subfolder": img.get("subfolder", ""), "type_folder": img.get("type", "output") }) # Handle video outputs (SaveVideo node) if "videos" in node_output: for vid in node_output["videos"]: outputs.append({ "type": "video", "filename": vid["filename"], "subfolder": vid.get("subfolder", ""), "type_folder": vid.get("type", "output") }) # Handle video outputs (VideoHelperSuite gifs) if "gifs" in node_output: for vid in node_output["gifs"]: outputs.append({ "type": "video", "filename": vid["filename"], "subfolder": vid.get("subfolder", ""), "type_folder": vid.get("type", "output") }) # Handle generic files if "files" in node_output: for f in node_output["files"]: filename = f.get("filename", "") ext = Path(filename).suffix.lower() file_type = "video" if ext in [".mp4", ".webm", ".gif", ".mov"] else "image" outputs.append({ "type": file_type, "filename": filename, "subfolder": f.get("subfolder", ""), "type_folder": f.get("type", "output") }) return outputs def fetch_output(output_info: dict) -> bytes: """Fetch output file from ComfyUI.""" params = { "filename": output_info["filename"], "subfolder": output_info["subfolder"], "type": output_info["type_folder"] } resp = requests.get(f"{COMFYUI_HOST}/view", params=params) if resp.status_code != 200: raise Exception(f"Failed to fetch output: {resp.status_code}") return resp.content def handler(job: dict) -> dict: """ RunPod serverless handler. Input schema: { "image": "base64 encoded image (required)", "prompt": "positive prompt text (required)", "negative_prompt": "negative prompt (optional)", "resolution": 720 (optional, default 720), "steps": 8 (optional, default 8), "split_step": 4 (optional, default 4), "timeout": 1200 (optional, max 1200), "workflow": {} (optional, override default workflow) } """ logger = JobLogger() job_input = job.get("input", {}) logger.log("Job started") # Validate required inputs if "image" not in job_input or not job_input["image"]: logger.log("ERROR: Missing required 'image' (base64) in input") return {"error": "Missing required 'image' (base64) in input", "logs": logger.get_logs()} if "prompt" not in job_input or not job_input["prompt"]: logger.log("ERROR: Missing required 'prompt' in input") return {"error": "Missing required 'prompt' in input", "logs": logger.get_logs()} # Log input parameters image_size_kb = len(job_input["image"]) * 3 / 4 / 1024 # Approximate decoded size logger.log(f"Input image size: ~{image_size_kb:.1f} KB (base64)") logger.log(f"Prompt: {job_input['prompt'][:100]}{'...' if len(job_input['prompt']) > 100 else ''}") if job_input.get("negative_prompt"): logger.log(f"Negative prompt: {job_input['negative_prompt'][:50]}...") logger.log(f"Resolution: {job_input.get('resolution', 720)}") logger.log(f"Steps: {job_input.get('steps', 8)}") logger.log(f"Split step: {job_input.get('split_step', 4)}") # Ensure ComfyUI is running if not start_comfyui(logger): logger.log("ERROR: Failed to start ComfyUI server") return {"error": "Failed to start ComfyUI server", "logs": logger.get_logs()} try: # Load workflow (custom or default) if "workflow" in job_input and job_input["workflow"]: logger.log("Loading custom workflow") workflow = job_input["workflow"] workflow = convert_frontend_to_api(workflow) else: logger.log("Loading default workflow") frontend_workflow = load_default_workflow() workflow = convert_frontend_to_api(frontend_workflow) logger.log(f"Workflow loaded with {len(workflow)} nodes") # Upload image image_filename = upload_image(job_input["image"]) logger.log(f"Uploaded image: {image_filename}") # Build params for injection params = { "image_filename": image_filename, "prompt": job_input["prompt"] } if "negative_prompt" in job_input: params["negative_prompt"] = job_input["negative_prompt"] if "resolution" in job_input: params["resolution"] = int(job_input["resolution"]) if "steps" in job_input: params["steps"] = int(job_input["steps"]) if "split_step" in job_input: params["split_step"] = int(job_input["split_step"]) # Inject parameters into workflow workflow = inject_wan22_params(workflow, params) logger.log("Parameters injected into workflow") # Queue workflow client_id = uuid.uuid4().hex prompt_id = queue_workflow(workflow, client_id) logger.log(f"Workflow queued: {prompt_id}") # Poll for completion timeout = min(job_input.get("timeout", MAX_TIMEOUT), MAX_TIMEOUT) logger.log(f"Starting generation (timeout: {timeout}s)") history = poll_for_completion(prompt_id, timeout, logger) logger.log("Workflow completed successfully") # Get output files outputs = get_output_files(history) if not outputs: logger.log("ERROR: No outputs generated") return {"error": "No outputs generated", "logs": logger.get_logs()} logger.log(f"Found {len(outputs)} output file(s)") # Fetch and encode outputs results = [] for output_info in outputs: data = fetch_output(output_info) size_mb = len(data) / 1024 / 1024 logger.log(f"Fetched output: {output_info['filename']} ({size_mb:.2f} MB)") # Check size for video files if output_info["type"] == "video" and len(data) > 10 * 1024 * 1024: # For large videos, save to network volume and return path output_path = Path("/runpod-volume/outputs") / output_info["filename"] output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_bytes(data) logger.log(f"Large video saved to volume: {output_path}") results.append({ "type": output_info["type"], "filename": output_info["filename"], "path": str(output_path), "size": len(data) }) else: # Return as base64 results.append({ "type": output_info["type"], "filename": output_info["filename"], "data": base64.b64encode(data).decode("utf-8"), "size": len(data) }) logger.log("Job completed successfully") return { "status": "success", "prompt_id": prompt_id, "outputs": results, "logs": logger.get_logs() } except TimeoutError as e: logger.log(f"ERROR: Timeout - {str(e)}") comfyui_logs = get_comfyui_output(100) logger.log(f"ComfyUI output (last 100 lines):") for line in comfyui_logs: logger.log(f" {line}") return {"error": str(e), "status": "timeout", "logs": logger.get_logs(), "comfyui_output": comfyui_logs} except Exception as e: import traceback tb = traceback.format_exc() logger.log(f"ERROR: {str(e)}") logger.log(f"Traceback:\n{tb}") comfyui_logs = get_comfyui_output(100) logger.log(f"ComfyUI output (last 100 lines):") for line in comfyui_logs: logger.log(f" {line}") return {"error": str(e), "status": "error", "logs": logger.get_logs(), "comfyui_output": comfyui_logs} # RunPod serverless entry point if __name__ == "__main__": print("Starting ComfyUI RunPod Handler...") runpod.serverless.start({"handler": handler})