""" ComfyUI RunPod Serverless Handler Handles image/video generation workflows with ComfyUI API """ import os import sys import json import time import base64 import uuid import subprocess import signal import requests from pathlib import Path from urllib.parse import urljoin import runpod # Configuration COMFYUI_DIR = "/workspace/ComfyUI" COMFYUI_PORT = 8188 COMFYUI_HOST = f"http://127.0.0.1:{COMFYUI_PORT}" MAX_TIMEOUT = 600 # 10 minutes max for video generation POLL_INTERVAL = 1.0 STARTUP_TIMEOUT = 120 # Global ComfyUI process comfyui_process = None def start_comfyui(): """Start ComfyUI server if not already running.""" global comfyui_process if comfyui_process is not None and comfyui_process.poll() is None: return True print("Starting ComfyUI server...") comfyui_process = subprocess.Popen( [ sys.executable, "main.py", "--listen", "127.0.0.1", "--port", str(COMFYUI_PORT), "--disable-auto-launch" ], cwd=COMFYUI_DIR, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, preexec_fn=os.setsid if hasattr(os, 'setsid') else None ) # Wait for server to be ready start_time = time.time() while time.time() - start_time < STARTUP_TIMEOUT: try: resp = requests.get(f"{COMFYUI_HOST}/system_stats", timeout=2) if resp.status_code == 200: print("ComfyUI server ready") return True except requests.exceptions.RequestException: pass time.sleep(1) print("ComfyUI server failed to start") return False def stop_comfyui(): """Stop ComfyUI server.""" global comfyui_process if comfyui_process is not None: try: os.killpg(os.getpgid(comfyui_process.pid), signal.SIGTERM) except (OSError, ProcessLookupError): comfyui_process.terminate() comfyui_process = None def upload_image(image_base64: str, filename: str = None) -> str: """Upload base64 image to ComfyUI and return the filename.""" if filename is None: filename = f"input_{uuid.uuid4().hex[:8]}.png" # Decode base64 image_data = base64.b64decode(image_base64) # Upload to ComfyUI files = { "image": (filename, image_data, "image/png"), } data = { "overwrite": "true" } resp = requests.post( f"{COMFYUI_HOST}/upload/image", files=files, data=data ) if resp.status_code != 200: raise Exception(f"Failed to upload image: {resp.text}") result = resp.json() return result.get("name", filename) def inject_prompt_into_workflow(workflow: dict, prompt: str, prompt_node_id: str = None) -> dict: """Inject prompt text into workflow at specified node or auto-detect.""" workflow = workflow.copy() # If specific node ID provided, use it if prompt_node_id and prompt_node_id in workflow: node = workflow[prompt_node_id] if "inputs" in node: # Common prompt input field names for field in ["text", "prompt", "positive", "string"]: if field in node["inputs"]: node["inputs"][field] = prompt return workflow # Auto-detect: find nodes that look like text/prompt inputs prompt_node_types = [ "CLIPTextEncode", "CLIPTextEncodeSDXL", "Text Multiline", "String", "TextInput" ] for node_id, node in workflow.items(): class_type = node.get("class_type", "") if class_type in prompt_node_types: if "inputs" in node: for field in ["text", "prompt", "positive", "string"]: if field in node["inputs"]: # Only inject into positive prompts, skip negative if "negative" not in node.get("_meta", {}).get("title", "").lower(): node["inputs"][field] = prompt return workflow return workflow def inject_image_into_workflow(workflow: dict, image_filename: str, image_node_id: str = None) -> dict: """Inject uploaded image filename into workflow.""" workflow = workflow.copy() # If specific node ID provided, use it if image_node_id and image_node_id in workflow: node = workflow[image_node_id] if "inputs" in node: node["inputs"]["image"] = image_filename return workflow # Auto-detect: find LoadImage nodes for node_id, node in workflow.items(): class_type = node.get("class_type", "") if class_type in ["LoadImage", "LoadImageFromPath"]: if "inputs" in node: node["inputs"]["image"] = image_filename return workflow return workflow def queue_workflow(workflow: dict, client_id: str = None) -> str: """Queue workflow and return prompt_id.""" if client_id is None: client_id = uuid.uuid4().hex payload = { "prompt": workflow, "client_id": client_id } resp = requests.post( f"{COMFYUI_HOST}/prompt", json=payload ) if resp.status_code != 200: raise Exception(f"Failed to queue workflow: {resp.text}") result = resp.json() return result["prompt_id"] def get_history(prompt_id: str) -> dict: """Get execution history for a prompt.""" resp = requests.get(f"{COMFYUI_HOST}/history/{prompt_id}") if resp.status_code != 200: return {} return resp.json() def poll_for_completion(prompt_id: str, timeout: int = MAX_TIMEOUT) -> dict: """Poll until workflow completes or timeout.""" start_time = time.time() while time.time() - start_time < timeout: history = get_history(prompt_id) if prompt_id in history: status = history[prompt_id].get("status", {}) # Check for completion if status.get("completed", False): return history[prompt_id] # Check for error if status.get("status_str") == "error": raise Exception(f"Workflow execution failed: {status}") time.sleep(POLL_INTERVAL) raise TimeoutError(f"Workflow execution timed out after {timeout}s") def get_output_files(history: dict) -> list: """Extract output file info from history.""" outputs = [] if "outputs" not in history: return outputs for node_id, node_output in history["outputs"].items(): # Handle image outputs if "images" in node_output: for img in node_output["images"]: outputs.append({ "type": "image", "filename": img["filename"], "subfolder": img.get("subfolder", ""), "type_folder": img.get("type", "output") }) # Handle video outputs (VideoHelperSuite and similar) if "gifs" in node_output: for vid in node_output["gifs"]: outputs.append({ "type": "video", "filename": vid["filename"], "subfolder": vid.get("subfolder", ""), "type_folder": vid.get("type", "output") }) # Handle generic files if "files" in node_output: for f in node_output["files"]: filename = f.get("filename", "") ext = Path(filename).suffix.lower() file_type = "video" if ext in [".mp4", ".webm", ".gif", ".mov"] else "image" outputs.append({ "type": file_type, "filename": filename, "subfolder": f.get("subfolder", ""), "type_folder": f.get("type", "output") }) return outputs def fetch_output(output_info: dict) -> bytes: """Fetch output file from ComfyUI.""" params = { "filename": output_info["filename"], "subfolder": output_info["subfolder"], "type": output_info["type_folder"] } resp = requests.get(f"{COMFYUI_HOST}/view", params=params) if resp.status_code != 200: raise Exception(f"Failed to fetch output: {resp.status_code}") return resp.content def handler(job: dict) -> dict: """RunPod serverless handler.""" job_input = job.get("input", {}) # Validate input workflow = job_input.get("workflow") if not workflow: return {"error": "Missing 'workflow' in input"} # Ensure ComfyUI is running if not start_comfyui(): return {"error": "Failed to start ComfyUI server"} try: # Handle image upload if provided if "image" in job_input and job_input["image"]: image_filename = upload_image( job_input["image"], job_input.get("image_filename") ) workflow = inject_image_into_workflow( workflow, image_filename, job_input.get("image_node_id") ) # Handle prompt injection if provided if "prompt" in job_input and job_input["prompt"]: workflow = inject_prompt_into_workflow( workflow, job_input["prompt"], job_input.get("prompt_node_id") ) # Queue workflow client_id = uuid.uuid4().hex prompt_id = queue_workflow(workflow, client_id) # Poll for completion timeout = min(job_input.get("timeout", MAX_TIMEOUT), MAX_TIMEOUT) history = poll_for_completion(prompt_id, timeout) # Get output files outputs = get_output_files(history) if not outputs: return {"error": "No outputs generated"} # Fetch and encode outputs results = [] for output_info in outputs: data = fetch_output(output_info) # Check size for video files if output_info["type"] == "video" and len(data) > 10 * 1024 * 1024: # For large videos, save to network volume and return path output_path = Path("/userdata/outputs") / output_info["filename"] output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_bytes(data) results.append({ "type": output_info["type"], "filename": output_info["filename"], "path": str(output_path), "size": len(data) }) else: # Return as base64 results.append({ "type": output_info["type"], "filename": output_info["filename"], "data": base64.b64encode(data).decode("utf-8"), "size": len(data) }) return { "status": "success", "prompt_id": prompt_id, "outputs": results } except TimeoutError as e: return {"error": str(e), "status": "timeout"} except Exception as e: return {"error": str(e), "status": "error"} # RunPod serverless entry point if __name__ == "__main__": print("Starting ComfyUI RunPod Handler...") runpod.serverless.start({"handler": handler})