All checks were successful
Build and Push Docker Image / build (push) Successful in 3m25s
- Add background thread to read ComfyUI stdout in real-time - Store last 200 lines in circular buffer - Echo output to RunPod logs with [ComfyUI] prefix - Include last 100 lines in error responses for debugging - Add comfyui_output field to error responses This will help diagnose why ComfyUI crashes during generation. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
662 lines
22 KiB
Python
662 lines
22 KiB
Python
"""
|
|
ComfyUI RunPod Serverless Handler
|
|
Handles image/video generation workflows with ComfyUI API
|
|
|
|
Wan22-I2V-Remix Workflow Node Mapping:
|
|
- Node 148: LoadImage - image input
|
|
- Node 134: CLIPTextEncode - positive prompt
|
|
- Node 137: CLIPTextEncode - negative prompt
|
|
- Node 147: easy int - resolution (720 default)
|
|
- Node 150: INTConstant - steps (8 default)
|
|
- Node 151: INTConstant - split_step (4 default)
|
|
- Node 117: SaveVideo - output
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import time
|
|
import base64
|
|
import uuid
|
|
import subprocess
|
|
import signal
|
|
import threading
|
|
from collections import deque
|
|
import requests
|
|
from pathlib import Path
|
|
import runpod
|
|
|
|
# Configuration
|
|
COMFYUI_DIR = "/workspace/ComfyUI"
|
|
COMFYUI_PORT = 8188
|
|
COMFYUI_HOST = f"http://127.0.0.1:{COMFYUI_PORT}"
|
|
MAX_TIMEOUT = 1200 # 20 minutes max for video generation
|
|
POLL_INTERVAL = 1.0
|
|
STARTUP_TIMEOUT = 120
|
|
DEFAULT_WORKFLOW_PATH = "/workspace/workflows/Wan22-I2V-Remix-API.json"
|
|
|
|
# Wan22-I2V-Remix node IDs
|
|
NODE_IMAGE_INPUT = "148"
|
|
NODE_POSITIVE_PROMPT = "134"
|
|
NODE_NEGATIVE_PROMPT = "137"
|
|
NODE_RESOLUTION = "147"
|
|
NODE_STEPS = "150"
|
|
NODE_SPLIT_STEP = "151"
|
|
NODE_SAVE_VIDEO = "117"
|
|
|
|
# Global ComfyUI process and output capture
|
|
comfyui_process = None
|
|
comfyui_output_buffer = deque(maxlen=200) # Keep last 200 lines
|
|
comfyui_output_lock = threading.Lock()
|
|
comfyui_reader_thread = None
|
|
|
|
|
|
def _read_comfyui_output():
|
|
"""Background thread to read ComfyUI stdout and store in buffer."""
|
|
global comfyui_process, comfyui_output_buffer
|
|
while comfyui_process is not None:
|
|
try:
|
|
line = comfyui_process.stdout.readline()
|
|
if line:
|
|
decoded = line.decode('utf-8', errors='replace').rstrip()
|
|
with comfyui_output_lock:
|
|
comfyui_output_buffer.append(decoded)
|
|
print(f"[ComfyUI] {decoded}") # Echo to RunPod logs
|
|
elif comfyui_process.poll() is not None:
|
|
# Process ended
|
|
break
|
|
except Exception:
|
|
break
|
|
|
|
|
|
def get_comfyui_output(last_n: int = 50) -> list:
|
|
"""Get the last N lines of ComfyUI output."""
|
|
with comfyui_output_lock:
|
|
lines = list(comfyui_output_buffer)
|
|
return lines[-last_n:] if len(lines) > last_n else lines
|
|
|
|
|
|
class JobLogger:
|
|
"""Accumulates log messages with timestamps for returning to the client."""
|
|
|
|
def __init__(self):
|
|
self.logs = []
|
|
self.start_time = time.time()
|
|
|
|
def log(self, message: str):
|
|
"""Log a message with timestamp."""
|
|
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
|
|
elapsed = time.time() - self.start_time
|
|
entry = f"[{timestamp}] [{elapsed:.1f}s] {message}"
|
|
self.logs.append(entry)
|
|
print(entry) # Also print for RunPod console
|
|
|
|
def get_logs(self) -> list:
|
|
"""Return all accumulated logs."""
|
|
return self.logs
|
|
|
|
|
|
def start_comfyui(logger: JobLogger = None):
|
|
"""Start ComfyUI server if not already running."""
|
|
global comfyui_process, comfyui_reader_thread, comfyui_output_buffer
|
|
|
|
def log(msg):
|
|
if logger:
|
|
logger.log(msg)
|
|
else:
|
|
print(msg)
|
|
|
|
if comfyui_process is not None and comfyui_process.poll() is None:
|
|
log("ComfyUI server already running")
|
|
return True
|
|
|
|
# Clear output buffer for fresh start
|
|
with comfyui_output_lock:
|
|
comfyui_output_buffer.clear()
|
|
|
|
log("Starting ComfyUI server...")
|
|
|
|
comfyui_process = subprocess.Popen(
|
|
[
|
|
sys.executable, "main.py",
|
|
"--listen", "127.0.0.1",
|
|
"--port", str(COMFYUI_PORT),
|
|
"--disable-auto-launch"
|
|
],
|
|
cwd=COMFYUI_DIR,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
preexec_fn=os.setsid if hasattr(os, 'setsid') else None
|
|
)
|
|
|
|
# Start background thread to capture output
|
|
comfyui_reader_thread = threading.Thread(target=_read_comfyui_output, daemon=True)
|
|
comfyui_reader_thread.start()
|
|
|
|
# Wait for server to be ready
|
|
start_time = time.time()
|
|
last_status_time = start_time
|
|
while time.time() - start_time < STARTUP_TIMEOUT:
|
|
try:
|
|
resp = requests.get(f"{COMFYUI_HOST}/system_stats", timeout=2)
|
|
if resp.status_code == 200:
|
|
log("ComfyUI server ready")
|
|
return True
|
|
except requests.exceptions.RequestException:
|
|
pass
|
|
|
|
# Log progress every 15 seconds
|
|
if time.time() - last_status_time >= 15:
|
|
elapsed = int(time.time() - start_time)
|
|
log(f"Waiting for ComfyUI startup... ({elapsed}s)")
|
|
last_status_time = time.time()
|
|
|
|
time.sleep(1)
|
|
|
|
log("ComfyUI server failed to start after {STARTUP_TIMEOUT}s")
|
|
return False
|
|
|
|
|
|
def stop_comfyui():
|
|
"""Stop ComfyUI server."""
|
|
global comfyui_process
|
|
|
|
if comfyui_process is not None:
|
|
try:
|
|
os.killpg(os.getpgid(comfyui_process.pid), signal.SIGTERM)
|
|
except (OSError, ProcessLookupError):
|
|
comfyui_process.terminate()
|
|
comfyui_process = None
|
|
|
|
|
|
def load_default_workflow() -> dict:
|
|
"""Load the default Wan22-I2V-Remix workflow."""
|
|
workflow_path = Path(DEFAULT_WORKFLOW_PATH)
|
|
if not workflow_path.exists():
|
|
raise FileNotFoundError(f"Default workflow not found: {DEFAULT_WORKFLOW_PATH}")
|
|
|
|
with open(workflow_path) as f:
|
|
return json.load(f)
|
|
|
|
|
|
def convert_frontend_to_api(frontend_workflow: dict) -> dict:
|
|
"""Convert ComfyUI frontend format to API format."""
|
|
# If already in API format (no 'nodes' key), return as-is
|
|
if "nodes" not in frontend_workflow:
|
|
return frontend_workflow
|
|
|
|
api_workflow = {}
|
|
nodes = frontend_workflow.get("nodes", [])
|
|
links = frontend_workflow.get("links", [])
|
|
|
|
# Build set of active (non-bypassed) node IDs
|
|
active_nodes = {str(node["id"]) for node in nodes if node.get("mode") != 4}
|
|
|
|
# Build link lookup: link_id -> (source_node_id, source_slot)
|
|
# Only include links from active nodes
|
|
link_map = {}
|
|
for link in links:
|
|
link_id, src_node, src_slot, dst_node, dst_slot, link_type = link[:6]
|
|
if str(src_node) in active_nodes:
|
|
link_map[link_id] = (str(src_node), src_slot)
|
|
|
|
# Widget mappings for each node type: list of input names in order
|
|
WIDGET_MAPPINGS = {
|
|
"LoadImage": ["image", "upload"],
|
|
"CLIPTextEncode": ["text"],
|
|
"easy int": ["value"],
|
|
"INTConstant": ["value"],
|
|
"SaveVideo": ["filename_prefix", "format", "codec"],
|
|
"CreateVideo": ["fps"],
|
|
"RIFE VFI": ["ckpt_name", "clear_cache_after_n_frames", "multiplier", "fast_mode", "ensemble", "scale_factor"],
|
|
"CLIPLoader": ["clip_name", "type", "device"],
|
|
"GetImageSize": [],
|
|
"WanVideoTorchCompileSettings": [
|
|
"backend", "fullgraph", "mode", "dynamic", "dynamo_cache_size_limit",
|
|
"compile_transformer_blocks_only", "compile_cache_max_entries",
|
|
"compile_single_blocks", "compile_double_blocks"
|
|
],
|
|
"WanVideoBlockSwap": [
|
|
"blocks_to_swap", "offload_txt_emb", "offload_img_emb", "offload_txt_clip_emb",
|
|
"offload_modulation", "cpu_offload_streams", "use_async_transfer"
|
|
],
|
|
"WanVideoModelLoader": [
|
|
"model", "base_precision", "quantization", "load_device", "attention_mode", "lora_scale_mode"
|
|
],
|
|
"WanVideoLoraSelect": ["lora", "strength", "use_lora_sparse", "sparse_lora_blocks"],
|
|
"WanVideoVAELoader": ["model_name", "precision"],
|
|
"WanVideoTextEmbedBridge": [],
|
|
"WanVideoImageToVideoEncode": [
|
|
"width", "height", "num_frames", "noise_aug_strength",
|
|
"start_latent_strength", "end_latent_strength", "image_noise_aug",
|
|
"mask_noise_aug", "force_offload"
|
|
],
|
|
"WanVideoSampler": [
|
|
"shift", "cfg", "steps", "seed", "seed_mode", "force_offload", "scheduler",
|
|
"riflex_freq_index", "riflex_freq_dim", "riflex_freq_scale",
|
|
"use_comfy_pbar", "start_step", "end_step", "denoise"
|
|
],
|
|
"WanVideoDecode": [
|
|
"enable_tiling", "tile_x", "tile_y", "tile_stride_x", "tile_stride_y", "tiling_decoder"
|
|
],
|
|
"MathExpression|pysssss": ["expression"],
|
|
}
|
|
|
|
for node in nodes:
|
|
# Skip bypassed/muted nodes (mode 4)
|
|
if node.get("mode") == 4:
|
|
continue
|
|
|
|
node_id = str(node["id"])
|
|
class_type = node.get("type", "")
|
|
|
|
inputs = {}
|
|
|
|
# Process widget values using mappings
|
|
widgets_values = node.get("widgets_values", [])
|
|
widget_names = WIDGET_MAPPINGS.get(class_type, [])
|
|
|
|
for i, value in enumerate(widgets_values):
|
|
if i < len(widget_names) and widget_names[i]:
|
|
inputs[widget_names[i]] = value
|
|
|
|
# Process node inputs (connections) - these override widget values
|
|
for inp in node.get("inputs", []):
|
|
inp_name = inp["name"]
|
|
link_id = inp.get("link")
|
|
if link_id is not None and link_id in link_map:
|
|
src_node, src_slot = link_map[link_id]
|
|
inputs[inp_name] = [src_node, src_slot]
|
|
|
|
api_workflow[node_id] = {
|
|
"class_type": class_type,
|
|
"inputs": inputs
|
|
}
|
|
|
|
# Add meta if title exists
|
|
if "title" in node:
|
|
api_workflow[node_id]["_meta"] = {"title": node["title"]}
|
|
|
|
return api_workflow
|
|
|
|
|
|
def upload_image(image_base64: str, filename: str = None) -> str:
|
|
"""Upload base64 image to ComfyUI and return the filename."""
|
|
if filename is None:
|
|
filename = f"input_{uuid.uuid4().hex[:8]}.png"
|
|
|
|
# Decode base64
|
|
image_data = base64.b64decode(image_base64)
|
|
|
|
# Upload to ComfyUI
|
|
files = {
|
|
"image": (filename, image_data, "image/png"),
|
|
}
|
|
data = {
|
|
"overwrite": "true"
|
|
}
|
|
|
|
resp = requests.post(
|
|
f"{COMFYUI_HOST}/upload/image",
|
|
files=files,
|
|
data=data
|
|
)
|
|
|
|
if resp.status_code != 200:
|
|
raise Exception(f"Failed to upload image: {resp.text}")
|
|
|
|
result = resp.json()
|
|
return result.get("name", filename)
|
|
|
|
|
|
def inject_wan22_params(workflow: dict, params: dict) -> dict:
|
|
"""Inject parameters into Wan22-I2V-Remix workflow nodes."""
|
|
workflow = json.loads(json.dumps(workflow)) # Deep copy
|
|
|
|
# Image input (node 148)
|
|
if "image_filename" in params and NODE_IMAGE_INPUT in workflow:
|
|
workflow[NODE_IMAGE_INPUT]["inputs"]["image"] = params["image_filename"]
|
|
|
|
# Positive prompt (node 134)
|
|
if "prompt" in params and NODE_POSITIVE_PROMPT in workflow:
|
|
workflow[NODE_POSITIVE_PROMPT]["inputs"]["text"] = params["prompt"]
|
|
|
|
# Negative prompt (node 137) - optional override
|
|
if "negative_prompt" in params and NODE_NEGATIVE_PROMPT in workflow:
|
|
workflow[NODE_NEGATIVE_PROMPT]["inputs"]["text"] = params["negative_prompt"]
|
|
|
|
# Resolution (node 147)
|
|
if "resolution" in params and NODE_RESOLUTION in workflow:
|
|
workflow[NODE_RESOLUTION]["inputs"]["value"] = params["resolution"]
|
|
|
|
# Steps (node 150)
|
|
if "steps" in params and NODE_STEPS in workflow:
|
|
workflow[NODE_STEPS]["inputs"]["value"] = params["steps"]
|
|
|
|
# Split step (node 151)
|
|
if "split_step" in params and NODE_SPLIT_STEP in workflow:
|
|
workflow[NODE_SPLIT_STEP]["inputs"]["value"] = params["split_step"]
|
|
|
|
return workflow
|
|
|
|
|
|
def queue_workflow(workflow: dict, client_id: str = None) -> str:
|
|
"""Queue workflow and return prompt_id."""
|
|
if client_id is None:
|
|
client_id = uuid.uuid4().hex
|
|
|
|
payload = {
|
|
"prompt": workflow,
|
|
"client_id": client_id
|
|
}
|
|
|
|
resp = requests.post(
|
|
f"{COMFYUI_HOST}/prompt",
|
|
json=payload
|
|
)
|
|
|
|
if resp.status_code != 200:
|
|
raise Exception(f"Failed to queue workflow: {resp.text}")
|
|
|
|
result = resp.json()
|
|
|
|
# Debug: print full queue response
|
|
print(f"Queue response keys: {result.keys()}")
|
|
if "node_errors" in result and result["node_errors"]:
|
|
print(f"Node errors: {result['node_errors']}")
|
|
if "error" in result:
|
|
print(f"Queue error: {result['error']}")
|
|
|
|
return result["prompt_id"]
|
|
|
|
|
|
def get_history(prompt_id: str) -> dict:
|
|
"""Get execution history for a prompt."""
|
|
resp = requests.get(f"{COMFYUI_HOST}/history/{prompt_id}")
|
|
if resp.status_code != 200:
|
|
return {}
|
|
return resp.json()
|
|
|
|
|
|
def poll_for_completion(prompt_id: str, timeout: int = MAX_TIMEOUT, logger: JobLogger = None) -> dict:
|
|
"""Poll until workflow completes or timeout."""
|
|
start_time = time.time()
|
|
last_log_time = start_time
|
|
|
|
def log(msg):
|
|
if logger:
|
|
logger.log(msg)
|
|
else:
|
|
print(msg)
|
|
|
|
while time.time() - start_time < timeout:
|
|
history = get_history(prompt_id)
|
|
|
|
if prompt_id in history:
|
|
status = history[prompt_id].get("status", {})
|
|
|
|
# Check for completion
|
|
if status.get("completed", False):
|
|
return history[prompt_id]
|
|
|
|
# Check for error
|
|
if status.get("status_str") == "error":
|
|
raise Exception(f"Workflow execution failed: {status}")
|
|
|
|
# Log progress every 30 seconds
|
|
if time.time() - last_log_time >= 30:
|
|
elapsed = int(time.time() - start_time)
|
|
remaining = timeout - elapsed
|
|
log(f"Generating... ({elapsed}s elapsed, {remaining}s remaining)")
|
|
last_log_time = time.time()
|
|
|
|
time.sleep(POLL_INTERVAL)
|
|
|
|
raise TimeoutError(f"Workflow execution timed out after {timeout}s")
|
|
|
|
|
|
def get_file_type(filename: str) -> str:
|
|
"""Determine file type from extension."""
|
|
ext = Path(filename).suffix.lower()
|
|
if ext in [".mp4", ".webm", ".gif", ".mov", ".avi", ".mkv"]:
|
|
return "video"
|
|
return "image"
|
|
|
|
|
|
def get_output_files(history: dict) -> list:
|
|
"""Extract output file info from history."""
|
|
outputs = []
|
|
|
|
if "outputs" not in history:
|
|
return outputs
|
|
|
|
for node_id, node_output in history["outputs"].items():
|
|
# Handle image outputs (note: SaveVideo sometimes puts videos here)
|
|
if "images" in node_output:
|
|
for img in node_output["images"]:
|
|
filename = img["filename"]
|
|
outputs.append({
|
|
"type": get_file_type(filename),
|
|
"filename": filename,
|
|
"subfolder": img.get("subfolder", ""),
|
|
"type_folder": img.get("type", "output")
|
|
})
|
|
|
|
# Handle video outputs (SaveVideo node)
|
|
if "videos" in node_output:
|
|
for vid in node_output["videos"]:
|
|
outputs.append({
|
|
"type": "video",
|
|
"filename": vid["filename"],
|
|
"subfolder": vid.get("subfolder", ""),
|
|
"type_folder": vid.get("type", "output")
|
|
})
|
|
|
|
# Handle video outputs (VideoHelperSuite gifs)
|
|
if "gifs" in node_output:
|
|
for vid in node_output["gifs"]:
|
|
outputs.append({
|
|
"type": "video",
|
|
"filename": vid["filename"],
|
|
"subfolder": vid.get("subfolder", ""),
|
|
"type_folder": vid.get("type", "output")
|
|
})
|
|
|
|
# Handle generic files
|
|
if "files" in node_output:
|
|
for f in node_output["files"]:
|
|
filename = f.get("filename", "")
|
|
ext = Path(filename).suffix.lower()
|
|
file_type = "video" if ext in [".mp4", ".webm", ".gif", ".mov"] else "image"
|
|
outputs.append({
|
|
"type": file_type,
|
|
"filename": filename,
|
|
"subfolder": f.get("subfolder", ""),
|
|
"type_folder": f.get("type", "output")
|
|
})
|
|
|
|
return outputs
|
|
|
|
|
|
def fetch_output(output_info: dict) -> bytes:
|
|
"""Fetch output file from ComfyUI."""
|
|
params = {
|
|
"filename": output_info["filename"],
|
|
"subfolder": output_info["subfolder"],
|
|
"type": output_info["type_folder"]
|
|
}
|
|
|
|
resp = requests.get(f"{COMFYUI_HOST}/view", params=params)
|
|
|
|
if resp.status_code != 200:
|
|
raise Exception(f"Failed to fetch output: {resp.status_code}")
|
|
|
|
return resp.content
|
|
|
|
|
|
def handler(job: dict) -> dict:
|
|
"""
|
|
RunPod serverless handler.
|
|
|
|
Input schema:
|
|
{
|
|
"image": "base64 encoded image (required)",
|
|
"prompt": "positive prompt text (required)",
|
|
"negative_prompt": "negative prompt (optional)",
|
|
"resolution": 720 (optional, default 720),
|
|
"steps": 8 (optional, default 8),
|
|
"split_step": 4 (optional, default 4),
|
|
"timeout": 1200 (optional, max 1200),
|
|
"workflow": {} (optional, override default workflow)
|
|
}
|
|
"""
|
|
logger = JobLogger()
|
|
job_input = job.get("input", {})
|
|
|
|
logger.log("Job started")
|
|
|
|
# Validate required inputs
|
|
if "image" not in job_input or not job_input["image"]:
|
|
logger.log("ERROR: Missing required 'image' (base64) in input")
|
|
return {"error": "Missing required 'image' (base64) in input", "logs": logger.get_logs()}
|
|
|
|
if "prompt" not in job_input or not job_input["prompt"]:
|
|
logger.log("ERROR: Missing required 'prompt' in input")
|
|
return {"error": "Missing required 'prompt' in input", "logs": logger.get_logs()}
|
|
|
|
# Log input parameters
|
|
image_size_kb = len(job_input["image"]) * 3 / 4 / 1024 # Approximate decoded size
|
|
logger.log(f"Input image size: ~{image_size_kb:.1f} KB (base64)")
|
|
logger.log(f"Prompt: {job_input['prompt'][:100]}{'...' if len(job_input['prompt']) > 100 else ''}")
|
|
if job_input.get("negative_prompt"):
|
|
logger.log(f"Negative prompt: {job_input['negative_prompt'][:50]}...")
|
|
logger.log(f"Resolution: {job_input.get('resolution', 720)}")
|
|
logger.log(f"Steps: {job_input.get('steps', 8)}")
|
|
logger.log(f"Split step: {job_input.get('split_step', 4)}")
|
|
|
|
# Ensure ComfyUI is running
|
|
if not start_comfyui(logger):
|
|
logger.log("ERROR: Failed to start ComfyUI server")
|
|
return {"error": "Failed to start ComfyUI server", "logs": logger.get_logs()}
|
|
|
|
try:
|
|
# Load workflow (custom or default)
|
|
if "workflow" in job_input and job_input["workflow"]:
|
|
logger.log("Loading custom workflow")
|
|
workflow = job_input["workflow"]
|
|
workflow = convert_frontend_to_api(workflow)
|
|
else:
|
|
logger.log("Loading default workflow")
|
|
frontend_workflow = load_default_workflow()
|
|
workflow = convert_frontend_to_api(frontend_workflow)
|
|
|
|
logger.log(f"Workflow loaded with {len(workflow)} nodes")
|
|
|
|
# Upload image
|
|
image_filename = upload_image(job_input["image"])
|
|
logger.log(f"Uploaded image: {image_filename}")
|
|
|
|
# Build params for injection
|
|
params = {
|
|
"image_filename": image_filename,
|
|
"prompt": job_input["prompt"]
|
|
}
|
|
|
|
if "negative_prompt" in job_input:
|
|
params["negative_prompt"] = job_input["negative_prompt"]
|
|
|
|
if "resolution" in job_input:
|
|
params["resolution"] = int(job_input["resolution"])
|
|
|
|
if "steps" in job_input:
|
|
params["steps"] = int(job_input["steps"])
|
|
|
|
if "split_step" in job_input:
|
|
params["split_step"] = int(job_input["split_step"])
|
|
|
|
# Inject parameters into workflow
|
|
workflow = inject_wan22_params(workflow, params)
|
|
logger.log("Parameters injected into workflow")
|
|
|
|
# Queue workflow
|
|
client_id = uuid.uuid4().hex
|
|
prompt_id = queue_workflow(workflow, client_id)
|
|
logger.log(f"Workflow queued: {prompt_id}")
|
|
|
|
# Poll for completion
|
|
timeout = min(job_input.get("timeout", MAX_TIMEOUT), MAX_TIMEOUT)
|
|
logger.log(f"Starting generation (timeout: {timeout}s)")
|
|
history = poll_for_completion(prompt_id, timeout, logger)
|
|
logger.log("Workflow completed successfully")
|
|
|
|
# Get output files
|
|
outputs = get_output_files(history)
|
|
|
|
if not outputs:
|
|
logger.log("ERROR: No outputs generated")
|
|
return {"error": "No outputs generated", "logs": logger.get_logs()}
|
|
|
|
logger.log(f"Found {len(outputs)} output file(s)")
|
|
|
|
# Fetch and encode outputs
|
|
results = []
|
|
for output_info in outputs:
|
|
data = fetch_output(output_info)
|
|
size_mb = len(data) / 1024 / 1024
|
|
logger.log(f"Fetched output: {output_info['filename']} ({size_mb:.2f} MB)")
|
|
|
|
# Check size for video files
|
|
if output_info["type"] == "video" and len(data) > 10 * 1024 * 1024:
|
|
# For large videos, save to network volume and return path
|
|
output_path = Path("/runpod-volume/outputs") / output_info["filename"]
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
output_path.write_bytes(data)
|
|
logger.log(f"Large video saved to volume: {output_path}")
|
|
results.append({
|
|
"type": output_info["type"],
|
|
"filename": output_info["filename"],
|
|
"path": str(output_path),
|
|
"size": len(data)
|
|
})
|
|
else:
|
|
# Return as base64
|
|
results.append({
|
|
"type": output_info["type"],
|
|
"filename": output_info["filename"],
|
|
"data": base64.b64encode(data).decode("utf-8"),
|
|
"size": len(data)
|
|
})
|
|
|
|
logger.log("Job completed successfully")
|
|
|
|
return {
|
|
"status": "success",
|
|
"prompt_id": prompt_id,
|
|
"outputs": results,
|
|
"logs": logger.get_logs()
|
|
}
|
|
|
|
except TimeoutError as e:
|
|
logger.log(f"ERROR: Timeout - {str(e)}")
|
|
comfyui_logs = get_comfyui_output(100)
|
|
logger.log(f"ComfyUI output (last 100 lines):")
|
|
for line in comfyui_logs:
|
|
logger.log(f" {line}")
|
|
return {"error": str(e), "status": "timeout", "logs": logger.get_logs(), "comfyui_output": comfyui_logs}
|
|
except Exception as e:
|
|
import traceback
|
|
tb = traceback.format_exc()
|
|
logger.log(f"ERROR: {str(e)}")
|
|
logger.log(f"Traceback:\n{tb}")
|
|
comfyui_logs = get_comfyui_output(100)
|
|
logger.log(f"ComfyUI output (last 100 lines):")
|
|
for line in comfyui_logs:
|
|
logger.log(f" {line}")
|
|
return {"error": str(e), "status": "error", "logs": logger.get_logs(), "comfyui_output": comfyui_logs}
|
|
|
|
|
|
# RunPod serverless entry point
|
|
if __name__ == "__main__":
|
|
print("Starting ComfyUI RunPod Handler...")
|
|
runpod.serverless.start({"handler": handler})
|