Initial commit: ComfyUI RunPod Serverless endpoint

- Dockerfile with CUDA 12.8.1, Python 3.12, PyTorch 2.8.0+cu128
- SageAttention 2.2 compiled from source
- Nunchaku wheel installation
- 12 custom nodes pre-installed
- Handler with image/video output support
- Model symlinks to /userdata network volume

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Nick
2025-12-25 21:59:09 +13:00
commit a5adfe060e
5 changed files with 978 additions and 0 deletions

371
handler.py Normal file
View File

@@ -0,0 +1,371 @@
"""
ComfyUI RunPod Serverless Handler
Handles image/video generation workflows with ComfyUI API
"""
import os
import sys
import json
import time
import base64
import uuid
import subprocess
import signal
import requests
from pathlib import Path
from urllib.parse import urljoin
import runpod
# Configuration
COMFYUI_DIR = "/workspace/ComfyUI"
COMFYUI_PORT = 8188
COMFYUI_HOST = f"http://127.0.0.1:{COMFYUI_PORT}"
MAX_TIMEOUT = 600 # 10 minutes max for video generation
POLL_INTERVAL = 1.0
STARTUP_TIMEOUT = 120
# Global ComfyUI process
comfyui_process = None
def start_comfyui():
"""Start ComfyUI server if not already running."""
global comfyui_process
if comfyui_process is not None and comfyui_process.poll() is None:
return True
print("Starting ComfyUI server...")
comfyui_process = subprocess.Popen(
[
sys.executable, "main.py",
"--listen", "127.0.0.1",
"--port", str(COMFYUI_PORT),
"--disable-auto-launch"
],
cwd=COMFYUI_DIR,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
preexec_fn=os.setsid if hasattr(os, 'setsid') else None
)
# Wait for server to be ready
start_time = time.time()
while time.time() - start_time < STARTUP_TIMEOUT:
try:
resp = requests.get(f"{COMFYUI_HOST}/system_stats", timeout=2)
if resp.status_code == 200:
print("ComfyUI server ready")
return True
except requests.exceptions.RequestException:
pass
time.sleep(1)
print("ComfyUI server failed to start")
return False
def stop_comfyui():
"""Stop ComfyUI server."""
global comfyui_process
if comfyui_process is not None:
try:
os.killpg(os.getpgid(comfyui_process.pid), signal.SIGTERM)
except (OSError, ProcessLookupError):
comfyui_process.terminate()
comfyui_process = None
def upload_image(image_base64: str, filename: str = None) -> str:
"""Upload base64 image to ComfyUI and return the filename."""
if filename is None:
filename = f"input_{uuid.uuid4().hex[:8]}.png"
# Decode base64
image_data = base64.b64decode(image_base64)
# Upload to ComfyUI
files = {
"image": (filename, image_data, "image/png"),
}
data = {
"overwrite": "true"
}
resp = requests.post(
f"{COMFYUI_HOST}/upload/image",
files=files,
data=data
)
if resp.status_code != 200:
raise Exception(f"Failed to upload image: {resp.text}")
result = resp.json()
return result.get("name", filename)
def inject_prompt_into_workflow(workflow: dict, prompt: str, prompt_node_id: str = None) -> dict:
"""Inject prompt text into workflow at specified node or auto-detect."""
workflow = workflow.copy()
# If specific node ID provided, use it
if prompt_node_id and prompt_node_id in workflow:
node = workflow[prompt_node_id]
if "inputs" in node:
# Common prompt input field names
for field in ["text", "prompt", "positive", "string"]:
if field in node["inputs"]:
node["inputs"][field] = prompt
return workflow
# Auto-detect: find nodes that look like text/prompt inputs
prompt_node_types = [
"CLIPTextEncode",
"CLIPTextEncodeSDXL",
"Text Multiline",
"String",
"TextInput"
]
for node_id, node in workflow.items():
class_type = node.get("class_type", "")
if class_type in prompt_node_types:
if "inputs" in node:
for field in ["text", "prompt", "positive", "string"]:
if field in node["inputs"]:
# Only inject into positive prompts, skip negative
if "negative" not in node.get("_meta", {}).get("title", "").lower():
node["inputs"][field] = prompt
return workflow
return workflow
def inject_image_into_workflow(workflow: dict, image_filename: str, image_node_id: str = None) -> dict:
"""Inject uploaded image filename into workflow."""
workflow = workflow.copy()
# If specific node ID provided, use it
if image_node_id and image_node_id in workflow:
node = workflow[image_node_id]
if "inputs" in node:
node["inputs"]["image"] = image_filename
return workflow
# Auto-detect: find LoadImage nodes
for node_id, node in workflow.items():
class_type = node.get("class_type", "")
if class_type in ["LoadImage", "LoadImageFromPath"]:
if "inputs" in node:
node["inputs"]["image"] = image_filename
return workflow
return workflow
def queue_workflow(workflow: dict, client_id: str = None) -> str:
"""Queue workflow and return prompt_id."""
if client_id is None:
client_id = uuid.uuid4().hex
payload = {
"prompt": workflow,
"client_id": client_id
}
resp = requests.post(
f"{COMFYUI_HOST}/prompt",
json=payload
)
if resp.status_code != 200:
raise Exception(f"Failed to queue workflow: {resp.text}")
result = resp.json()
return result["prompt_id"]
def get_history(prompt_id: str) -> dict:
"""Get execution history for a prompt."""
resp = requests.get(f"{COMFYUI_HOST}/history/{prompt_id}")
if resp.status_code != 200:
return {}
return resp.json()
def poll_for_completion(prompt_id: str, timeout: int = MAX_TIMEOUT) -> dict:
"""Poll until workflow completes or timeout."""
start_time = time.time()
while time.time() - start_time < timeout:
history = get_history(prompt_id)
if prompt_id in history:
status = history[prompt_id].get("status", {})
# Check for completion
if status.get("completed", False):
return history[prompt_id]
# Check for error
if status.get("status_str") == "error":
raise Exception(f"Workflow execution failed: {status}")
time.sleep(POLL_INTERVAL)
raise TimeoutError(f"Workflow execution timed out after {timeout}s")
def get_output_files(history: dict) -> list:
"""Extract output file info from history."""
outputs = []
if "outputs" not in history:
return outputs
for node_id, node_output in history["outputs"].items():
# Handle image outputs
if "images" in node_output:
for img in node_output["images"]:
outputs.append({
"type": "image",
"filename": img["filename"],
"subfolder": img.get("subfolder", ""),
"type_folder": img.get("type", "output")
})
# Handle video outputs (VideoHelperSuite and similar)
if "gifs" in node_output:
for vid in node_output["gifs"]:
outputs.append({
"type": "video",
"filename": vid["filename"],
"subfolder": vid.get("subfolder", ""),
"type_folder": vid.get("type", "output")
})
# Handle generic files
if "files" in node_output:
for f in node_output["files"]:
filename = f.get("filename", "")
ext = Path(filename).suffix.lower()
file_type = "video" if ext in [".mp4", ".webm", ".gif", ".mov"] else "image"
outputs.append({
"type": file_type,
"filename": filename,
"subfolder": f.get("subfolder", ""),
"type_folder": f.get("type", "output")
})
return outputs
def fetch_output(output_info: dict) -> bytes:
"""Fetch output file from ComfyUI."""
params = {
"filename": output_info["filename"],
"subfolder": output_info["subfolder"],
"type": output_info["type_folder"]
}
resp = requests.get(f"{COMFYUI_HOST}/view", params=params)
if resp.status_code != 200:
raise Exception(f"Failed to fetch output: {resp.status_code}")
return resp.content
def handler(job: dict) -> dict:
"""RunPod serverless handler."""
job_input = job.get("input", {})
# Validate input
workflow = job_input.get("workflow")
if not workflow:
return {"error": "Missing 'workflow' in input"}
# Ensure ComfyUI is running
if not start_comfyui():
return {"error": "Failed to start ComfyUI server"}
try:
# Handle image upload if provided
if "image" in job_input and job_input["image"]:
image_filename = upload_image(
job_input["image"],
job_input.get("image_filename")
)
workflow = inject_image_into_workflow(
workflow,
image_filename,
job_input.get("image_node_id")
)
# Handle prompt injection if provided
if "prompt" in job_input and job_input["prompt"]:
workflow = inject_prompt_into_workflow(
workflow,
job_input["prompt"],
job_input.get("prompt_node_id")
)
# Queue workflow
client_id = uuid.uuid4().hex
prompt_id = queue_workflow(workflow, client_id)
# Poll for completion
timeout = min(job_input.get("timeout", MAX_TIMEOUT), MAX_TIMEOUT)
history = poll_for_completion(prompt_id, timeout)
# Get output files
outputs = get_output_files(history)
if not outputs:
return {"error": "No outputs generated"}
# Fetch and encode outputs
results = []
for output_info in outputs:
data = fetch_output(output_info)
# Check size for video files
if output_info["type"] == "video" and len(data) > 10 * 1024 * 1024:
# For large videos, save to network volume and return path
output_path = Path("/userdata/outputs") / output_info["filename"]
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_bytes(data)
results.append({
"type": output_info["type"],
"filename": output_info["filename"],
"path": str(output_path),
"size": len(data)
})
else:
# Return as base64
results.append({
"type": output_info["type"],
"filename": output_info["filename"],
"data": base64.b64encode(data).decode("utf-8"),
"size": len(data)
})
return {
"status": "success",
"prompt_id": prompt_id,
"outputs": results
}
except TimeoutError as e:
return {"error": str(e), "status": "timeout"}
except Exception as e:
return {"error": str(e), "status": "error"}
# RunPod serverless entry point
if __name__ == "__main__":
print("Starting ComfyUI RunPod Handler...")
runpod.serverless.start({"handler": handler})