Optimize Dockerfile layers and reduce disk usage
Some checks failed
Build and Push Docker Image / build (push) Failing after 10m52s
Some checks failed
Build and Push Docker Image / build (push) Failing after 10m52s
- Combine PyTorch + triton install into single layer - Add pip cache cleanup after each install step - Change SageAttention to regular install and remove source after build - Consolidate custom node dependencies into single layer - Add CLAUDE.md, i2v-workflow.json, update handler.py and PROJECT.md 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
269
handler.py
269
handler.py
@@ -1,6 +1,15 @@
|
||||
"""
|
||||
ComfyUI RunPod Serverless Handler
|
||||
Handles image/video generation workflows with ComfyUI API
|
||||
|
||||
Wan22-I2V-Remix Workflow Node Mapping:
|
||||
- Node 148: LoadImage - image input
|
||||
- Node 134: CLIPTextEncode - positive prompt
|
||||
- Node 137: CLIPTextEncode - negative prompt
|
||||
- Node 147: easy int - resolution (720 default)
|
||||
- Node 150: INTConstant - steps (8 default)
|
||||
- Node 151: INTConstant - split_step (4 default)
|
||||
- Node 117: SaveVideo - output
|
||||
"""
|
||||
|
||||
import os
|
||||
@@ -13,7 +22,6 @@ import subprocess
|
||||
import signal
|
||||
import requests
|
||||
from pathlib import Path
|
||||
from urllib.parse import urljoin
|
||||
import runpod
|
||||
|
||||
# Configuration
|
||||
@@ -23,6 +31,16 @@ COMFYUI_HOST = f"http://127.0.0.1:{COMFYUI_PORT}"
|
||||
MAX_TIMEOUT = 600 # 10 minutes max for video generation
|
||||
POLL_INTERVAL = 1.0
|
||||
STARTUP_TIMEOUT = 120
|
||||
DEFAULT_WORKFLOW_PATH = "/workspace/workflows/Wan22-I2V-Remix.json"
|
||||
|
||||
# Wan22-I2V-Remix node IDs
|
||||
NODE_IMAGE_INPUT = "148"
|
||||
NODE_POSITIVE_PROMPT = "134"
|
||||
NODE_NEGATIVE_PROMPT = "137"
|
||||
NODE_RESOLUTION = "147"
|
||||
NODE_STEPS = "150"
|
||||
NODE_SPLIT_STEP = "151"
|
||||
NODE_SAVE_VIDEO = "117"
|
||||
|
||||
# Global ComfyUI process
|
||||
comfyui_process = None
|
||||
@@ -78,6 +96,91 @@ def stop_comfyui():
|
||||
comfyui_process = None
|
||||
|
||||
|
||||
def load_default_workflow() -> dict:
|
||||
"""Load the default Wan22-I2V-Remix workflow."""
|
||||
workflow_path = Path(DEFAULT_WORKFLOW_PATH)
|
||||
if not workflow_path.exists():
|
||||
raise FileNotFoundError(f"Default workflow not found: {DEFAULT_WORKFLOW_PATH}")
|
||||
|
||||
with open(workflow_path) as f:
|
||||
return json.load(f)
|
||||
|
||||
|
||||
def convert_frontend_to_api(frontend_workflow: dict) -> dict:
|
||||
"""Convert ComfyUI frontend format to API format."""
|
||||
# If already in API format (no 'nodes' key), return as-is
|
||||
if "nodes" not in frontend_workflow:
|
||||
return frontend_workflow
|
||||
|
||||
api_workflow = {}
|
||||
nodes = frontend_workflow.get("nodes", [])
|
||||
links = frontend_workflow.get("links", [])
|
||||
|
||||
# Build link lookup: link_id -> (source_node_id, source_slot)
|
||||
link_map = {}
|
||||
for link in links:
|
||||
link_id, src_node, src_slot, dst_node, dst_slot, link_type = link[:6]
|
||||
link_map[link_id] = (str(src_node), src_slot)
|
||||
|
||||
for node in nodes:
|
||||
node_id = str(node["id"])
|
||||
class_type = node.get("type", "")
|
||||
|
||||
inputs = {}
|
||||
|
||||
# Process widget values
|
||||
widgets_values = node.get("widgets_values", [])
|
||||
|
||||
# Map widget values based on class type
|
||||
# This is a simplified mapping - specific nodes may need custom handling
|
||||
if class_type == "LoadImage" and len(widgets_values) >= 1:
|
||||
inputs["image"] = widgets_values[0]
|
||||
if len(widgets_values) >= 2:
|
||||
inputs["upload"] = widgets_values[1]
|
||||
|
||||
elif class_type == "CLIPTextEncode" and len(widgets_values) >= 1:
|
||||
inputs["text"] = widgets_values[0]
|
||||
|
||||
elif class_type in ["easy int", "INTConstant"] and len(widgets_values) >= 1:
|
||||
inputs["value"] = widgets_values[0]
|
||||
|
||||
elif class_type == "SaveVideo" and len(widgets_values) >= 1:
|
||||
inputs["filename_prefix"] = widgets_values[0]
|
||||
if len(widgets_values) >= 2:
|
||||
inputs["format"] = widgets_values[1]
|
||||
if len(widgets_values) >= 3:
|
||||
inputs["codec"] = widgets_values[2]
|
||||
|
||||
elif class_type == "CreateVideo" and len(widgets_values) >= 1:
|
||||
inputs["frame_rate"] = widgets_values[0]
|
||||
|
||||
elif class_type == "RIFE VFI" and len(widgets_values) >= 1:
|
||||
inputs["ckpt_name"] = widgets_values[0]
|
||||
if len(widgets_values) >= 2:
|
||||
inputs["clear_cache_after_n_frames"] = widgets_values[1]
|
||||
if len(widgets_values) >= 3:
|
||||
inputs["multiplier"] = widgets_values[2]
|
||||
|
||||
# Process node inputs (connections)
|
||||
for inp in node.get("inputs", []):
|
||||
inp_name = inp["name"]
|
||||
link_id = inp.get("link")
|
||||
if link_id is not None and link_id in link_map:
|
||||
src_node, src_slot = link_map[link_id]
|
||||
inputs[inp_name] = [src_node, src_slot]
|
||||
|
||||
api_workflow[node_id] = {
|
||||
"class_type": class_type,
|
||||
"inputs": inputs
|
||||
}
|
||||
|
||||
# Add meta if title exists
|
||||
if "title" in node:
|
||||
api_workflow[node_id]["_meta"] = {"title": node["title"]}
|
||||
|
||||
return api_workflow
|
||||
|
||||
|
||||
def upload_image(image_base64: str, filename: str = None) -> str:
|
||||
"""Upload base64 image to ComfyUI and return the filename."""
|
||||
if filename is None:
|
||||
@@ -107,61 +210,33 @@ def upload_image(image_base64: str, filename: str = None) -> str:
|
||||
return result.get("name", filename)
|
||||
|
||||
|
||||
def inject_prompt_into_workflow(workflow: dict, prompt: str, prompt_node_id: str = None) -> dict:
|
||||
"""Inject prompt text into workflow at specified node or auto-detect."""
|
||||
workflow = workflow.copy()
|
||||
def inject_wan22_params(workflow: dict, params: dict) -> dict:
|
||||
"""Inject parameters into Wan22-I2V-Remix workflow nodes."""
|
||||
workflow = json.loads(json.dumps(workflow)) # Deep copy
|
||||
|
||||
# If specific node ID provided, use it
|
||||
if prompt_node_id and prompt_node_id in workflow:
|
||||
node = workflow[prompt_node_id]
|
||||
if "inputs" in node:
|
||||
# Common prompt input field names
|
||||
for field in ["text", "prompt", "positive", "string"]:
|
||||
if field in node["inputs"]:
|
||||
node["inputs"][field] = prompt
|
||||
return workflow
|
||||
# Image input (node 148)
|
||||
if "image_filename" in params and NODE_IMAGE_INPUT in workflow:
|
||||
workflow[NODE_IMAGE_INPUT]["inputs"]["image"] = params["image_filename"]
|
||||
|
||||
# Auto-detect: find nodes that look like text/prompt inputs
|
||||
prompt_node_types = [
|
||||
"CLIPTextEncode",
|
||||
"CLIPTextEncodeSDXL",
|
||||
"Text Multiline",
|
||||
"String",
|
||||
"TextInput"
|
||||
]
|
||||
# Positive prompt (node 134)
|
||||
if "prompt" in params and NODE_POSITIVE_PROMPT in workflow:
|
||||
workflow[NODE_POSITIVE_PROMPT]["inputs"]["text"] = params["prompt"]
|
||||
|
||||
for node_id, node in workflow.items():
|
||||
class_type = node.get("class_type", "")
|
||||
if class_type in prompt_node_types:
|
||||
if "inputs" in node:
|
||||
for field in ["text", "prompt", "positive", "string"]:
|
||||
if field in node["inputs"]:
|
||||
# Only inject into positive prompts, skip negative
|
||||
if "negative" not in node.get("_meta", {}).get("title", "").lower():
|
||||
node["inputs"][field] = prompt
|
||||
return workflow
|
||||
# Negative prompt (node 137) - optional override
|
||||
if "negative_prompt" in params and NODE_NEGATIVE_PROMPT in workflow:
|
||||
workflow[NODE_NEGATIVE_PROMPT]["inputs"]["text"] = params["negative_prompt"]
|
||||
|
||||
return workflow
|
||||
# Resolution (node 147)
|
||||
if "resolution" in params and NODE_RESOLUTION in workflow:
|
||||
workflow[NODE_RESOLUTION]["inputs"]["value"] = params["resolution"]
|
||||
|
||||
# Steps (node 150)
|
||||
if "steps" in params and NODE_STEPS in workflow:
|
||||
workflow[NODE_STEPS]["inputs"]["value"] = params["steps"]
|
||||
|
||||
def inject_image_into_workflow(workflow: dict, image_filename: str, image_node_id: str = None) -> dict:
|
||||
"""Inject uploaded image filename into workflow."""
|
||||
workflow = workflow.copy()
|
||||
|
||||
# If specific node ID provided, use it
|
||||
if image_node_id and image_node_id in workflow:
|
||||
node = workflow[image_node_id]
|
||||
if "inputs" in node:
|
||||
node["inputs"]["image"] = image_filename
|
||||
return workflow
|
||||
|
||||
# Auto-detect: find LoadImage nodes
|
||||
for node_id, node in workflow.items():
|
||||
class_type = node.get("class_type", "")
|
||||
if class_type in ["LoadImage", "LoadImageFromPath"]:
|
||||
if "inputs" in node:
|
||||
node["inputs"]["image"] = image_filename
|
||||
return workflow
|
||||
# Split step (node 151)
|
||||
if "split_step" in params and NODE_SPLIT_STEP in workflow:
|
||||
workflow[NODE_SPLIT_STEP]["inputs"]["value"] = params["split_step"]
|
||||
|
||||
return workflow
|
||||
|
||||
@@ -237,7 +312,17 @@ def get_output_files(history: dict) -> list:
|
||||
"type_folder": img.get("type", "output")
|
||||
})
|
||||
|
||||
# Handle video outputs (VideoHelperSuite and similar)
|
||||
# Handle video outputs (SaveVideo node)
|
||||
if "videos" in node_output:
|
||||
for vid in node_output["videos"]:
|
||||
outputs.append({
|
||||
"type": "video",
|
||||
"filename": vid["filename"],
|
||||
"subfolder": vid.get("subfolder", ""),
|
||||
"type_folder": vid.get("type", "output")
|
||||
})
|
||||
|
||||
# Handle video outputs (VideoHelperSuite gifs)
|
||||
if "gifs" in node_output:
|
||||
for vid in node_output["gifs"]:
|
||||
outputs.append({
|
||||
@@ -280,46 +365,79 @@ def fetch_output(output_info: dict) -> bytes:
|
||||
|
||||
|
||||
def handler(job: dict) -> dict:
|
||||
"""RunPod serverless handler."""
|
||||
"""
|
||||
RunPod serverless handler.
|
||||
|
||||
Input schema:
|
||||
{
|
||||
"image": "base64 encoded image (required)",
|
||||
"prompt": "positive prompt text (required)",
|
||||
"negative_prompt": "negative prompt (optional)",
|
||||
"resolution": 720 (optional, default 720),
|
||||
"steps": 8 (optional, default 8),
|
||||
"split_step": 4 (optional, default 4),
|
||||
"timeout": 600 (optional, max 600),
|
||||
"workflow": {} (optional, override default workflow)
|
||||
}
|
||||
"""
|
||||
job_input = job.get("input", {})
|
||||
|
||||
# Validate input
|
||||
workflow = job_input.get("workflow")
|
||||
if not workflow:
|
||||
return {"error": "Missing 'workflow' in input"}
|
||||
# Validate required inputs
|
||||
if "image" not in job_input or not job_input["image"]:
|
||||
return {"error": "Missing required 'image' (base64) in input"}
|
||||
|
||||
if "prompt" not in job_input or not job_input["prompt"]:
|
||||
return {"error": "Missing required 'prompt' in input"}
|
||||
|
||||
# Ensure ComfyUI is running
|
||||
if not start_comfyui():
|
||||
return {"error": "Failed to start ComfyUI server"}
|
||||
|
||||
try:
|
||||
# Handle image upload if provided
|
||||
if "image" in job_input and job_input["image"]:
|
||||
image_filename = upload_image(
|
||||
job_input["image"],
|
||||
job_input.get("image_filename")
|
||||
)
|
||||
workflow = inject_image_into_workflow(
|
||||
workflow,
|
||||
image_filename,
|
||||
job_input.get("image_node_id")
|
||||
)
|
||||
# Load workflow (custom or default)
|
||||
if "workflow" in job_input and job_input["workflow"]:
|
||||
workflow = job_input["workflow"]
|
||||
# Convert frontend format if needed
|
||||
workflow = convert_frontend_to_api(workflow)
|
||||
else:
|
||||
# Load and convert default workflow
|
||||
frontend_workflow = load_default_workflow()
|
||||
workflow = convert_frontend_to_api(frontend_workflow)
|
||||
|
||||
# Handle prompt injection if provided
|
||||
if "prompt" in job_input and job_input["prompt"]:
|
||||
workflow = inject_prompt_into_workflow(
|
||||
workflow,
|
||||
job_input["prompt"],
|
||||
job_input.get("prompt_node_id")
|
||||
)
|
||||
# Upload image
|
||||
image_filename = upload_image(job_input["image"])
|
||||
print(f"Uploaded image: {image_filename}")
|
||||
|
||||
# Build params for injection
|
||||
params = {
|
||||
"image_filename": image_filename,
|
||||
"prompt": job_input["prompt"]
|
||||
}
|
||||
|
||||
if "negative_prompt" in job_input:
|
||||
params["negative_prompt"] = job_input["negative_prompt"]
|
||||
|
||||
if "resolution" in job_input:
|
||||
params["resolution"] = int(job_input["resolution"])
|
||||
|
||||
if "steps" in job_input:
|
||||
params["steps"] = int(job_input["steps"])
|
||||
|
||||
if "split_step" in job_input:
|
||||
params["split_step"] = int(job_input["split_step"])
|
||||
|
||||
# Inject parameters into workflow
|
||||
workflow = inject_wan22_params(workflow, params)
|
||||
|
||||
# Queue workflow
|
||||
client_id = uuid.uuid4().hex
|
||||
prompt_id = queue_workflow(workflow, client_id)
|
||||
print(f"Queued workflow: {prompt_id}")
|
||||
|
||||
# Poll for completion
|
||||
timeout = min(job_input.get("timeout", MAX_TIMEOUT), MAX_TIMEOUT)
|
||||
history = poll_for_completion(prompt_id, timeout)
|
||||
print("Workflow completed")
|
||||
|
||||
# Get output files
|
||||
outputs = get_output_files(history)
|
||||
@@ -331,6 +449,7 @@ def handler(job: dict) -> dict:
|
||||
results = []
|
||||
for output_info in outputs:
|
||||
data = fetch_output(output_info)
|
||||
print(f"Fetched output: {output_info['filename']} ({len(data)} bytes)")
|
||||
|
||||
# Check size for video files
|
||||
if output_info["type"] == "video" and len(data) > 10 * 1024 * 1024:
|
||||
@@ -362,6 +481,8 @@ def handler(job: dict) -> dict:
|
||||
except TimeoutError as e:
|
||||
return {"error": str(e), "status": "timeout"}
|
||||
except Exception as e:
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
return {"error": str(e), "status": "error"}
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user