import torch import os import sys import json import hashlib import traceback import math import time import random import logging from PIL import Image, ImageOps, ImageSequence, ImageFile from PIL.PngImagePlugin import PngInfo import numpy as np import safetensors.torch sys.path.insert(0, os.path.join(os.path.dirname(os.path.realpath(__file__)), "comfy")) import comfy.diffusers_load import comfy.samplers import comfy.sample import comfy.sd import comfy.utils import comfy.controlnet import comfy.clip_vision import comfy.model_management from comfy.cli_args import args import importlib import folder_paths import latent_preview import node_helpers def before_node_execution(): comfy.model_management.throw_exception_if_processing_interrupted() def interrupt_processing(value=True): comfy.model_management.interrupt_current_processing(value) MAX_RESOLUTION=16384 class CLIPTextEncode: @classmethod def INPUT_TYPES(s): return {"required": {"text": ("STRING", {"multiline": True, "dynamicPrompts": True}), "clip": ("CLIP", )}} RETURN_TYPES = ("CONDITIONING",) FUNCTION = "encode" CATEGORY = "conditioning" def encode(self, clip, text): tokens = clip.tokenize(text) cond, pooled = clip.encode_from_tokens(tokens, return_pooled=True) return ([[cond, {"pooled_output": pooled}]], ) class ConditioningCombine: @classmethod def INPUT_TYPES(s): return {"required": {"conditioning_1": ("CONDITIONING", ), "conditioning_2": ("CONDITIONING", )}} RETURN_TYPES = ("CONDITIONING",) FUNCTION = "combine" CATEGORY = "conditioning" def combine(self, conditioning_1, conditioning_2): return (conditioning_1 + conditioning_2, ) class ConditioningAverage : @classmethod def INPUT_TYPES(s): return {"required": {"conditioning_to": ("CONDITIONING", ), "conditioning_from": ("CONDITIONING", ), "conditioning_to_strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01}) }} RETURN_TYPES = ("CONDITIONING",) FUNCTION = "addWeighted" CATEGORY = "conditioning" def addWeighted(self, conditioning_to, conditioning_from, conditioning_to_strength): out = [] if len(conditioning_from) > 1: logging.warning("Warning: ConditioningAverage conditioning_from contains more than 1 cond, only the first one will actually be applied to conditioning_to.") cond_from = conditioning_from[0][0] pooled_output_from = conditioning_from[0][1].get("pooled_output", None) for i in range(len(conditioning_to)): t1 = conditioning_to[i][0] pooled_output_to = conditioning_to[i][1].get("pooled_output", pooled_output_from) t0 = cond_from[:,:t1.shape[1]] if t0.shape[1] < t1.shape[1]: t0 = torch.cat([t0] + [torch.zeros((1, (t1.shape[1] - t0.shape[1]), t1.shape[2]))], dim=1) tw = torch.mul(t1, conditioning_to_strength) + torch.mul(t0, (1.0 - conditioning_to_strength)) t_to = conditioning_to[i][1].copy() if pooled_output_from is not None and pooled_output_to is not None: t_to["pooled_output"] = torch.mul(pooled_output_to, conditioning_to_strength) + torch.mul(pooled_output_from, (1.0 - conditioning_to_strength)) elif pooled_output_from is not None: t_to["pooled_output"] = pooled_output_from n = [tw, t_to] out.append(n) return (out, ) class ConditioningConcat: @classmethod def INPUT_TYPES(s): return {"required": { "conditioning_to": ("CONDITIONING",), "conditioning_from": ("CONDITIONING",), }} RETURN_TYPES = ("CONDITIONING",) FUNCTION = "concat" CATEGORY = "conditioning" def concat(self, conditioning_to, conditioning_from): out = [] if len(conditioning_from) > 1: logging.warning("Warning: ConditioningConcat conditioning_from contains more than 1 cond, only the first one will actually be applied to conditioning_to.") cond_from = conditioning_from[0][0] for i in range(len(conditioning_to)): t1 = conditioning_to[i][0] tw = torch.cat((t1, cond_from),1) n = [tw, conditioning_to[i][1].copy()] out.append(n) return (out, ) class ConditioningSetArea: @classmethod def INPUT_TYPES(s): return {"required": {"conditioning": ("CONDITIONING", ), "width": ("INT", {"default": 64, "min": 64, "max": MAX_RESOLUTION, "step": 8}), "height": ("INT", {"default": 64, "min": 64, "max": MAX_RESOLUTION, "step": 8}), "x": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), "y": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), "strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}), }} RETURN_TYPES = ("CONDITIONING",) FUNCTION = "append" CATEGORY = "conditioning" def append(self, conditioning, width, height, x, y, strength): c = node_helpers.conditioning_set_values(conditioning, {"area": (height // 8, width // 8, y // 8, x // 8), "strength": strength, "set_area_to_bounds": False}) return (c, ) class ConditioningSetAreaPercentage: @classmethod def INPUT_TYPES(s): return {"required": {"conditioning": ("CONDITIONING", ), "width": ("FLOAT", {"default": 1.0, "min": 0, "max": 1.0, "step": 0.01}), "height": ("FLOAT", {"default": 1.0, "min": 0, "max": 1.0, "step": 0.01}), "x": ("FLOAT", {"default": 0, "min": 0, "max": 1.0, "step": 0.01}), "y": ("FLOAT", {"default": 0, "min": 0, "max": 1.0, "step": 0.01}), "strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}), }} RETURN_TYPES = ("CONDITIONING",) FUNCTION = "append" CATEGORY = "conditioning" def append(self, conditioning, width, height, x, y, strength): c = node_helpers.conditioning_set_values(conditioning, {"area": ("percentage", height, width, y, x), "strength": strength, "set_area_to_bounds": False}) return (c, ) class ConditioningSetAreaStrength: @classmethod def INPUT_TYPES(s): return {"required": {"conditioning": ("CONDITIONING", ), "strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}), }} RETURN_TYPES = ("CONDITIONING",) FUNCTION = "append" CATEGORY = "conditioning" def append(self, conditioning, strength): c = node_helpers.conditioning_set_values(conditioning, {"strength": strength}) return (c, ) class ConditioningSetMask: @classmethod def INPUT_TYPES(s): return {"required": {"conditioning": ("CONDITIONING", ), "mask": ("MASK", ), "strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}), "set_cond_area": (["default", "mask bounds"],), }} RETURN_TYPES = ("CONDITIONING",) FUNCTION = "append" CATEGORY = "conditioning" def append(self, conditioning, mask, set_cond_area, strength): set_area_to_bounds = False if set_cond_area != "default": set_area_to_bounds = True if len(mask.shape) < 3: mask = mask.unsqueeze(0) c = node_helpers.conditioning_set_values(conditioning, {"mask": mask, "set_area_to_bounds": set_area_to_bounds, "mask_strength": strength}) return (c, ) class ConditioningZeroOut: @classmethod def INPUT_TYPES(s): return {"required": {"conditioning": ("CONDITIONING", )}} RETURN_TYPES = ("CONDITIONING",) FUNCTION = "zero_out" CATEGORY = "advanced/conditioning" def zero_out(self, conditioning): c = [] for t in conditioning: d = t[1].copy() if "pooled_output" in d: d["pooled_output"] = torch.zeros_like(d["pooled_output"]) n = [torch.zeros_like(t[0]), d] c.append(n) return (c, ) class ConditioningSetTimestepRange: @classmethod def INPUT_TYPES(s): return {"required": {"conditioning": ("CONDITIONING", ), "start": ("FLOAT", {"default": 0.0, "min": 0.0, "max": 1.0, "step": 0.001}), "end": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.001}) }} RETURN_TYPES = ("CONDITIONING",) FUNCTION = "set_range" CATEGORY = "advanced/conditioning" def set_range(self, conditioning, start, end): c = node_helpers.conditioning_set_values(conditioning, {"start_percent": start, "end_percent": end}) return (c, ) class VAEDecode: @classmethod def INPUT_TYPES(s): return {"required": { "samples": ("LATENT", ), "vae": ("VAE", )}} RETURN_TYPES = ("IMAGE",) FUNCTION = "decode" CATEGORY = "latent" def decode(self, vae, samples): return (vae.decode(samples["samples"]), ) class VAEDecodeTiled: @classmethod def INPUT_TYPES(s): return {"required": {"samples": ("LATENT", ), "vae": ("VAE", ), "tile_size": ("INT", {"default": 512, "min": 320, "max": 4096, "step": 64}) }} RETURN_TYPES = ("IMAGE",) FUNCTION = "decode" CATEGORY = "_for_testing" def decode(self, vae, samples, tile_size): return (vae.decode_tiled(samples["samples"], tile_x=tile_size // 8, tile_y=tile_size // 8, ), ) class VAEEncode: @classmethod def INPUT_TYPES(s): return {"required": { "pixels": ("IMAGE", ), "vae": ("VAE", )}} RETURN_TYPES = ("LATENT",) FUNCTION = "encode" CATEGORY = "latent" def encode(self, vae, pixels): t = vae.encode(pixels[:,:,:,:3]) return ({"samples":t}, ) class VAEEncodeTiled: @classmethod def INPUT_TYPES(s): return {"required": {"pixels": ("IMAGE", ), "vae": ("VAE", ), "tile_size": ("INT", {"default": 512, "min": 320, "max": 4096, "step": 64}) }} RETURN_TYPES = ("LATENT",) FUNCTION = "encode" CATEGORY = "_for_testing" def encode(self, vae, pixels, tile_size): t = vae.encode_tiled(pixels[:,:,:,:3], tile_x=tile_size, tile_y=tile_size, ) return ({"samples":t}, ) class VAEEncodeForInpaint: @classmethod def INPUT_TYPES(s): return {"required": { "pixels": ("IMAGE", ), "vae": ("VAE", ), "mask": ("MASK", ), "grow_mask_by": ("INT", {"default": 6, "min": 0, "max": 64, "step": 1}),}} RETURN_TYPES = ("LATENT",) FUNCTION = "encode" CATEGORY = "latent/inpaint" def encode(self, vae, pixels, mask, grow_mask_by=6): x = (pixels.shape[1] // vae.downscale_ratio) * vae.downscale_ratio y = (pixels.shape[2] // vae.downscale_ratio) * vae.downscale_ratio mask = torch.nn.functional.interpolate(mask.reshape((-1, 1, mask.shape[-2], mask.shape[-1])), size=(pixels.shape[1], pixels.shape[2]), mode="bilinear") pixels = pixels.clone() if pixels.shape[1] != x or pixels.shape[2] != y: x_offset = (pixels.shape[1] % vae.downscale_ratio) // 2 y_offset = (pixels.shape[2] % vae.downscale_ratio) // 2 pixels = pixels[:,x_offset:x + x_offset, y_offset:y + y_offset,:] mask = mask[:,:,x_offset:x + x_offset, y_offset:y + y_offset] #grow mask by a few pixels to keep things seamless in latent space if grow_mask_by == 0: mask_erosion = mask else: kernel_tensor = torch.ones((1, 1, grow_mask_by, grow_mask_by)) padding = math.ceil((grow_mask_by - 1) / 2) mask_erosion = torch.clamp(torch.nn.functional.conv2d(mask.round(), kernel_tensor, padding=padding), 0, 1) m = (1.0 - mask.round()).squeeze(1) for i in range(3): pixels[:,:,:,i] -= 0.5 pixels[:,:,:,i] *= m pixels[:,:,:,i] += 0.5 t = vae.encode(pixels) return ({"samples":t, "noise_mask": (mask_erosion[:,:,:x,:y].round())}, ) class InpaintModelConditioning: @classmethod def INPUT_TYPES(s): return {"required": {"positive": ("CONDITIONING", ), "negative": ("CONDITIONING", ), "vae": ("VAE", ), "pixels": ("IMAGE", ), "mask": ("MASK", ), }} RETURN_TYPES = ("CONDITIONING","CONDITIONING","LATENT") RETURN_NAMES = ("positive", "negative", "latent") FUNCTION = "encode" CATEGORY = "conditioning/inpaint" def encode(self, positive, negative, pixels, vae, mask): x = (pixels.shape[1] // 8) * 8 y = (pixels.shape[2] // 8) * 8 mask = torch.nn.functional.interpolate(mask.reshape((-1, 1, mask.shape[-2], mask.shape[-1])), size=(pixels.shape[1], pixels.shape[2]), mode="bilinear") orig_pixels = pixels pixels = orig_pixels.clone() if pixels.shape[1] != x or pixels.shape[2] != y: x_offset = (pixels.shape[1] % 8) // 2 y_offset = (pixels.shape[2] % 8) // 2 pixels = pixels[:,x_offset:x + x_offset, y_offset:y + y_offset,:] mask = mask[:,:,x_offset:x + x_offset, y_offset:y + y_offset] m = (1.0 - mask.round()).squeeze(1) for i in range(3): pixels[:,:,:,i] -= 0.5 pixels[:,:,:,i] *= m pixels[:,:,:,i] += 0.5 concat_latent = vae.encode(pixels) orig_latent = vae.encode(orig_pixels) out_latent = {} out_latent["samples"] = orig_latent out_latent["noise_mask"] = mask out = [] for conditioning in [positive, negative]: c = node_helpers.conditioning_set_values(conditioning, {"concat_latent_image": concat_latent, "concat_mask": mask}) out.append(c) return (out[0], out[1], out_latent) class SaveLatent: def __init__(self): self.output_dir = folder_paths.get_output_directory() @classmethod def INPUT_TYPES(s): return {"required": { "samples": ("LATENT", ), "filename_prefix": ("STRING", {"default": "latents/ComfyUI"})}, "hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO"}, } RETURN_TYPES = () FUNCTION = "save" OUTPUT_NODE = True CATEGORY = "_for_testing" def save(self, samples, filename_prefix="ComfyUI", prompt=None, extra_pnginfo=None): full_output_folder, filename, counter, subfolder, filename_prefix = folder_paths.get_save_image_path(filename_prefix, self.output_dir) # support save metadata for latent sharing prompt_info = "" if prompt is not None: prompt_info = json.dumps(prompt) metadata = None if not args.disable_metadata: metadata = {"prompt": prompt_info} if extra_pnginfo is not None: for x in extra_pnginfo: metadata[x] = json.dumps(extra_pnginfo[x]) file = f"{filename}_{counter:05}_.latent" results = list() results.append({ "filename": file, "subfolder": subfolder, "type": "output" }) file = os.path.join(full_output_folder, file) output = {} output["latent_tensor"] = samples["samples"] output["latent_format_version_0"] = torch.tensor([]) comfy.utils.save_torch_file(output, file, metadata=metadata) return { "ui": { "latents": results } } class LoadLatent: @classmethod def INPUT_TYPES(s): input_dir = folder_paths.get_input_directory() files = [f for f in os.listdir(input_dir) if os.path.isfile(os.path.join(input_dir, f)) and f.endswith(".latent")] return {"required": {"latent": [sorted(files), ]}, } CATEGORY = "_for_testing" RETURN_TYPES = ("LATENT", ) FUNCTION = "load" def load(self, latent): latent_path = folder_paths.get_annotated_filepath(latent) latent = safetensors.torch.load_file(latent_path, device="cpu") multiplier = 1.0 if "latent_format_version_0" not in latent: multiplier = 1.0 / 0.18215 samples = {"samples": latent["latent_tensor"].float() * multiplier} return (samples, ) @classmethod def IS_CHANGED(s, latent): image_path = folder_paths.get_annotated_filepath(latent) m = hashlib.sha256() with open(image_path, 'rb') as f: m.update(f.read()) return m.digest().hex() @classmethod def VALIDATE_INPUTS(s, latent): if not folder_paths.exists_annotated_filepath(latent): return "Invalid latent file: {}".format(latent) return True class CheckpointLoader: @classmethod def INPUT_TYPES(s): return {"required": { "config_name": (folder_paths.get_filename_list("configs"), ), "ckpt_name": (folder_paths.get_filename_list("checkpoints"), )}} RETURN_TYPES = ("MODEL", "CLIP", "VAE") FUNCTION = "load_checkpoint" CATEGORY = "advanced/loaders" def load_checkpoint(self, config_name, ckpt_name): config_path = folder_paths.get_full_path("configs", config_name) ckpt_path = folder_paths.get_full_path("checkpoints", ckpt_name) return comfy.sd.load_checkpoint(config_path, ckpt_path, output_vae=True, output_clip=True, embedding_directory=folder_paths.get_folder_paths("embeddings")) class CheckpointLoaderSimple: @classmethod def INPUT_TYPES(s): return {"required": { "ckpt_name": (folder_paths.get_filename_list("checkpoints"), ), }} RETURN_TYPES = ("MODEL", "CLIP", "VAE") FUNCTION = "load_checkpoint" CATEGORY = "loaders" def load_checkpoint(self, ckpt_name): ckpt_path = folder_paths.get_full_path("checkpoints", ckpt_name) out = comfy.sd.load_checkpoint_guess_config(ckpt_path, output_vae=True, output_clip=True, embedding_directory=folder_paths.get_folder_paths("embeddings")) return out[:3] class DiffusersLoader: @classmethod def INPUT_TYPES(cls): paths = [] for search_path in folder_paths.get_folder_paths("diffusers"): if os.path.exists(search_path): for root, subdir, files in os.walk(search_path, followlinks=True): if "model_index.json" in files: paths.append(os.path.relpath(root, start=search_path)) return {"required": {"model_path": (paths,), }} RETURN_TYPES = ("MODEL", "CLIP", "VAE") FUNCTION = "load_checkpoint" CATEGORY = "advanced/loaders/deprecated" def load_checkpoint(self, model_path, output_vae=True, output_clip=True): for search_path in folder_paths.get_folder_paths("diffusers"): if os.path.exists(search_path): path = os.path.join(search_path, model_path) if os.path.exists(path): model_path = path break return comfy.diffusers_load.load_diffusers(model_path, output_vae=output_vae, output_clip=output_clip, embedding_directory=folder_paths.get_folder_paths("embeddings")) class unCLIPCheckpointLoader: @classmethod def INPUT_TYPES(s): return {"required": { "ckpt_name": (folder_paths.get_filename_list("checkpoints"), ), }} RETURN_TYPES = ("MODEL", "CLIP", "VAE", "CLIP_VISION") FUNCTION = "load_checkpoint" CATEGORY = "loaders" def load_checkpoint(self, ckpt_name, output_vae=True, output_clip=True): ckpt_path = folder_paths.get_full_path("checkpoints", ckpt_name) out = comfy.sd.load_checkpoint_guess_config(ckpt_path, output_vae=True, output_clip=True, output_clipvision=True, embedding_directory=folder_paths.get_folder_paths("embeddings")) return out class CLIPSetLastLayer: @classmethod def INPUT_TYPES(s): return {"required": { "clip": ("CLIP", ), "stop_at_clip_layer": ("INT", {"default": -1, "min": -24, "max": -1, "step": 1}), }} RETURN_TYPES = ("CLIP",) FUNCTION = "set_last_layer" CATEGORY = "conditioning" def set_last_layer(self, clip, stop_at_clip_layer): clip = clip.clone() clip.clip_layer(stop_at_clip_layer) return (clip,) class LoraLoader: def __init__(self): self.loaded_lora = None @classmethod def INPUT_TYPES(s): return {"required": { "model": ("MODEL",), "clip": ("CLIP", ), "lora_name": (folder_paths.get_filename_list("loras"), ), "strength_model": ("FLOAT", {"default": 1.0, "min": -100.0, "max": 100.0, "step": 0.01}), "strength_clip": ("FLOAT", {"default": 1.0, "min": -100.0, "max": 100.0, "step": 0.01}), }} RETURN_TYPES = ("MODEL", "CLIP") FUNCTION = "load_lora" CATEGORY = "loaders" def load_lora(self, model, clip, lora_name, strength_model, strength_clip): if strength_model == 0 and strength_clip == 0: return (model, clip) lora_path = folder_paths.get_full_path("loras", lora_name) lora = None if self.loaded_lora is not None: if self.loaded_lora[0] == lora_path: lora = self.loaded_lora[1] else: temp = self.loaded_lora self.loaded_lora = None del temp if lora is None: lora = comfy.utils.load_torch_file(lora_path, safe_load=True) self.loaded_lora = (lora_path, lora) model_lora, clip_lora = comfy.sd.load_lora_for_models(model, clip, lora, strength_model, strength_clip) return (model_lora, clip_lora) class LoraLoaderModelOnly(LoraLoader): @classmethod def INPUT_TYPES(s): return {"required": { "model": ("MODEL",), "lora_name": (folder_paths.get_filename_list("loras"), ), "strength_model": ("FLOAT", {"default": 1.0, "min": -100.0, "max": 100.0, "step": 0.01}), }} RETURN_TYPES = ("MODEL",) FUNCTION = "load_lora_model_only" def load_lora_model_only(self, model, lora_name, strength_model): return (self.load_lora(model, None, lora_name, strength_model, 0)[0],) class VAELoader: @staticmethod def vae_list(): vaes = folder_paths.get_filename_list("vae") approx_vaes = folder_paths.get_filename_list("vae_approx") sdxl_taesd_enc = False sdxl_taesd_dec = False sd1_taesd_enc = False sd1_taesd_dec = False sd3_taesd_enc = False sd3_taesd_dec = False for v in approx_vaes: if v.startswith("taesd_decoder."): sd1_taesd_dec = True elif v.startswith("taesd_encoder."): sd1_taesd_enc = True elif v.startswith("taesdxl_decoder."): sdxl_taesd_dec = True elif v.startswith("taesdxl_encoder."): sdxl_taesd_enc = True elif v.startswith("taesd3_decoder."): sd3_taesd_dec = True elif v.startswith("taesd3_encoder."): sd3_taesd_enc = True if sd1_taesd_dec and sd1_taesd_enc: vaes.append("taesd") if sdxl_taesd_dec and sdxl_taesd_enc: vaes.append("taesdxl") if sd3_taesd_dec and sd3_taesd_enc: vaes.append("taesd3") return vaes @staticmethod def load_taesd(name): sd = {} approx_vaes = folder_paths.get_filename_list("vae_approx") encoder = next(filter(lambda a: a.startswith("{}_encoder.".format(name)), approx_vaes)) decoder = next(filter(lambda a: a.startswith("{}_decoder.".format(name)), approx_vaes)) enc = comfy.utils.load_torch_file(folder_paths.get_full_path("vae_approx", encoder)) for k in enc: sd["taesd_encoder.{}".format(k)] = enc[k] dec = comfy.utils.load_torch_file(folder_paths.get_full_path("vae_approx", decoder)) for k in dec: sd["taesd_decoder.{}".format(k)] = dec[k] if name == "taesd": sd["vae_scale"] = torch.tensor(0.18215) sd["vae_shift"] = torch.tensor(0.0) elif name == "taesdxl": sd["vae_scale"] = torch.tensor(0.13025) sd["vae_shift"] = torch.tensor(0.0) elif name == "taesd3": sd["vae_scale"] = torch.tensor(1.5305) sd["vae_shift"] = torch.tensor(0.0609) return sd @classmethod def INPUT_TYPES(s): return {"required": { "vae_name": (s.vae_list(), )}} RETURN_TYPES = ("VAE",) FUNCTION = "load_vae" CATEGORY = "loaders" #TODO: scale factor? def load_vae(self, vae_name): if vae_name in ["taesd", "taesdxl", "taesd3"]: sd = self.load_taesd(vae_name) else: vae_path = folder_paths.get_full_path("vae", vae_name) sd = comfy.utils.load_torch_file(vae_path) vae = comfy.sd.VAE(sd=sd) return (vae,) class ControlNetLoader: @classmethod def INPUT_TYPES(s): return {"required": { "control_net_name": (folder_paths.get_filename_list("controlnet"), )}} RETURN_TYPES = ("CONTROL_NET",) FUNCTION = "load_controlnet" CATEGORY = "loaders" def load_controlnet(self, control_net_name): controlnet_path = folder_paths.get_full_path("controlnet", control_net_name) controlnet = comfy.controlnet.load_controlnet(controlnet_path) return (controlnet,) class DiffControlNetLoader: @classmethod def INPUT_TYPES(s): return {"required": { "model": ("MODEL",), "control_net_name": (folder_paths.get_filename_list("controlnet"), )}} RETURN_TYPES = ("CONTROL_NET",) FUNCTION = "load_controlnet" CATEGORY = "loaders" def load_controlnet(self, model, control_net_name): controlnet_path = folder_paths.get_full_path("controlnet", control_net_name) controlnet = comfy.controlnet.load_controlnet(controlnet_path, model) return (controlnet,) class ControlNetApply: @classmethod def INPUT_TYPES(s): return {"required": {"conditioning": ("CONDITIONING", ), "control_net": ("CONTROL_NET", ), "image": ("IMAGE", ), "strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}) }} RETURN_TYPES = ("CONDITIONING",) FUNCTION = "apply_controlnet" CATEGORY = "conditioning" def apply_controlnet(self, conditioning, control_net, image, strength): if strength == 0: return (conditioning, ) c = [] control_hint = image.movedim(-1,1) for t in conditioning: n = [t[0], t[1].copy()] c_net = control_net.copy().set_cond_hint(control_hint, strength) if 'control' in t[1]: c_net.set_previous_controlnet(t[1]['control']) n[1]['control'] = c_net n[1]['control_apply_to_uncond'] = True c.append(n) return (c, ) class ControlNetApplyAdvanced: @classmethod def INPUT_TYPES(s): return {"required": {"positive": ("CONDITIONING", ), "negative": ("CONDITIONING", ), "control_net": ("CONTROL_NET", ), "image": ("IMAGE", ), "strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}), "start_percent": ("FLOAT", {"default": 0.0, "min": 0.0, "max": 1.0, "step": 0.001}), "end_percent": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.001}) }} RETURN_TYPES = ("CONDITIONING","CONDITIONING") RETURN_NAMES = ("positive", "negative") FUNCTION = "apply_controlnet" CATEGORY = "conditioning" def apply_controlnet(self, positive, negative, control_net, image, strength, start_percent, end_percent): if strength == 0: return (positive, negative) control_hint = image.movedim(-1,1) cnets = {} out = [] for conditioning in [positive, negative]: c = [] for t in conditioning: d = t[1].copy() prev_cnet = d.get('control', None) if prev_cnet in cnets: c_net = cnets[prev_cnet] else: c_net = control_net.copy().set_cond_hint(control_hint, strength, (start_percent, end_percent)) c_net.set_previous_controlnet(prev_cnet) cnets[prev_cnet] = c_net d['control'] = c_net d['control_apply_to_uncond'] = False n = [t[0], d] c.append(n) out.append(c) return (out[0], out[1]) class UNETLoader: @classmethod def INPUT_TYPES(s): return {"required": { "unet_name": (folder_paths.get_filename_list("unet"), ), }} RETURN_TYPES = ("MODEL",) FUNCTION = "load_unet" CATEGORY = "advanced/loaders" def load_unet(self, unet_name): unet_path = folder_paths.get_full_path("unet", unet_name) model = comfy.sd.load_unet(unet_path) return (model,) class CLIPLoader: @classmethod def INPUT_TYPES(s): return {"required": { "clip_name": (folder_paths.get_filename_list("clip"), ), "type": (["stable_diffusion", "stable_cascade", "sd3", "stable_audio"], ), }} RETURN_TYPES = ("CLIP",) FUNCTION = "load_clip" CATEGORY = "advanced/loaders" def load_clip(self, clip_name, type="stable_diffusion"): if type == "stable_cascade": clip_type = comfy.sd.CLIPType.STABLE_CASCADE elif type == "sd3": clip_type = comfy.sd.CLIPType.SD3 elif type == "stable_audio": clip_type = comfy.sd.CLIPType.STABLE_AUDIO else: clip_type = comfy.sd.CLIPType.STABLE_DIFFUSION clip_path = folder_paths.get_full_path("clip", clip_name) clip = comfy.sd.load_clip(ckpt_paths=[clip_path], embedding_directory=folder_paths.get_folder_paths("embeddings"), clip_type=clip_type) return (clip,) class DualCLIPLoader: @classmethod def INPUT_TYPES(s): return {"required": { "clip_name1": (folder_paths.get_filename_list("clip"), ), "clip_name2": (folder_paths.get_filename_list("clip"), ), "type": (["sdxl", "sd3"], ), }} RETURN_TYPES = ("CLIP",) FUNCTION = "load_clip" CATEGORY = "advanced/loaders" def load_clip(self, clip_name1, clip_name2, type): clip_path1 = folder_paths.get_full_path("clip", clip_name1) clip_path2 = folder_paths.get_full_path("clip", clip_name2) if type == "sdxl": clip_type = comfy.sd.CLIPType.STABLE_DIFFUSION elif type == "sd3": clip_type = comfy.sd.CLIPType.SD3 clip = comfy.sd.load_clip(ckpt_paths=[clip_path1, clip_path2], embedding_directory=folder_paths.get_folder_paths("embeddings"), clip_type=clip_type) return (clip,) class CLIPVisionLoader: @classmethod def INPUT_TYPES(s): return {"required": { "clip_name": (folder_paths.get_filename_list("clip_vision"), ), }} RETURN_TYPES = ("CLIP_VISION",) FUNCTION = "load_clip" CATEGORY = "loaders" def load_clip(self, clip_name): clip_path = folder_paths.get_full_path("clip_vision", clip_name) clip_vision = comfy.clip_vision.load(clip_path) return (clip_vision,) class CLIPVisionEncode: @classmethod def INPUT_TYPES(s): return {"required": { "clip_vision": ("CLIP_VISION",), "image": ("IMAGE",) }} RETURN_TYPES = ("CLIP_VISION_OUTPUT",) FUNCTION = "encode" CATEGORY = "conditioning" def encode(self, clip_vision, image): output = clip_vision.encode_image(image) return (output,) class StyleModelLoader: @classmethod def INPUT_TYPES(s): return {"required": { "style_model_name": (folder_paths.get_filename_list("style_models"), )}} RETURN_TYPES = ("STYLE_MODEL",) FUNCTION = "load_style_model" CATEGORY = "loaders" def load_style_model(self, style_model_name): style_model_path = folder_paths.get_full_path("style_models", style_model_name) style_model = comfy.sd.load_style_model(style_model_path) return (style_model,) class StyleModelApply: @classmethod def INPUT_TYPES(s): return {"required": {"conditioning": ("CONDITIONING", ), "style_model": ("STYLE_MODEL", ), "clip_vision_output": ("CLIP_VISION_OUTPUT", ), }} RETURN_TYPES = ("CONDITIONING",) FUNCTION = "apply_stylemodel" CATEGORY = "conditioning/style_model" def apply_stylemodel(self, clip_vision_output, style_model, conditioning): cond = style_model.get_cond(clip_vision_output).flatten(start_dim=0, end_dim=1).unsqueeze(dim=0) c = [] for t in conditioning: n = [torch.cat((t[0], cond), dim=1), t[1].copy()] c.append(n) return (c, ) class unCLIPConditioning: @classmethod def INPUT_TYPES(s): return {"required": {"conditioning": ("CONDITIONING", ), "clip_vision_output": ("CLIP_VISION_OUTPUT", ), "strength": ("FLOAT", {"default": 1.0, "min": -10.0, "max": 10.0, "step": 0.01}), "noise_augmentation": ("FLOAT", {"default": 0.0, "min": 0.0, "max": 1.0, "step": 0.01}), }} RETURN_TYPES = ("CONDITIONING",) FUNCTION = "apply_adm" CATEGORY = "conditioning" def apply_adm(self, conditioning, clip_vision_output, strength, noise_augmentation): if strength == 0: return (conditioning, ) c = [] for t in conditioning: o = t[1].copy() x = {"clip_vision_output": clip_vision_output, "strength": strength, "noise_augmentation": noise_augmentation} if "unclip_conditioning" in o: o["unclip_conditioning"] = o["unclip_conditioning"][:] + [x] else: o["unclip_conditioning"] = [x] n = [t[0], o] c.append(n) return (c, ) class GLIGENLoader: @classmethod def INPUT_TYPES(s): return {"required": { "gligen_name": (folder_paths.get_filename_list("gligen"), )}} RETURN_TYPES = ("GLIGEN",) FUNCTION = "load_gligen" CATEGORY = "loaders" def load_gligen(self, gligen_name): gligen_path = folder_paths.get_full_path("gligen", gligen_name) gligen = comfy.sd.load_gligen(gligen_path) return (gligen,) class GLIGENTextBoxApply: @classmethod def INPUT_TYPES(s): return {"required": {"conditioning_to": ("CONDITIONING", ), "clip": ("CLIP", ), "gligen_textbox_model": ("GLIGEN", ), "text": ("STRING", {"multiline": True, "dynamicPrompts": True}), "width": ("INT", {"default": 64, "min": 8, "max": MAX_RESOLUTION, "step": 8}), "height": ("INT", {"default": 64, "min": 8, "max": MAX_RESOLUTION, "step": 8}), "x": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), "y": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), }} RETURN_TYPES = ("CONDITIONING",) FUNCTION = "append" CATEGORY = "conditioning/gligen" def append(self, conditioning_to, clip, gligen_textbox_model, text, width, height, x, y): c = [] cond, cond_pooled = clip.encode_from_tokens(clip.tokenize(text), return_pooled="unprojected") for t in conditioning_to: n = [t[0], t[1].copy()] position_params = [(cond_pooled, height // 8, width // 8, y // 8, x // 8)] prev = [] if "gligen" in n[1]: prev = n[1]['gligen'][2] n[1]['gligen'] = ("position", gligen_textbox_model, prev + position_params) c.append(n) return (c, ) class EmptyLatentImage: def __init__(self): self.device = comfy.model_management.intermediate_device() @classmethod def INPUT_TYPES(s): return {"required": { "width": ("INT", {"default": 512, "min": 16, "max": MAX_RESOLUTION, "step": 8}), "height": ("INT", {"default": 512, "min": 16, "max": MAX_RESOLUTION, "step": 8}), "batch_size": ("INT", {"default": 1, "min": 1, "max": 4096})}} RETURN_TYPES = ("LATENT",) FUNCTION = "generate" CATEGORY = "latent" def generate(self, width, height, batch_size=1): latent = torch.zeros([batch_size, 4, height // 8, width // 8], device=self.device) return ({"samples":latent}, ) class LatentFromBatch: @classmethod def INPUT_TYPES(s): return {"required": { "samples": ("LATENT",), "batch_index": ("INT", {"default": 0, "min": 0, "max": 63}), "length": ("INT", {"default": 1, "min": 1, "max": 64}), }} RETURN_TYPES = ("LATENT",) FUNCTION = "frombatch" CATEGORY = "latent/batch" def frombatch(self, samples, batch_index, length): s = samples.copy() s_in = samples["samples"] batch_index = min(s_in.shape[0] - 1, batch_index) length = min(s_in.shape[0] - batch_index, length) s["samples"] = s_in[batch_index:batch_index + length].clone() if "noise_mask" in samples: masks = samples["noise_mask"] if masks.shape[0] == 1: s["noise_mask"] = masks.clone() else: if masks.shape[0] < s_in.shape[0]: masks = masks.repeat(math.ceil(s_in.shape[0] / masks.shape[0]), 1, 1, 1)[:s_in.shape[0]] s["noise_mask"] = masks[batch_index:batch_index + length].clone() if "batch_index" not in s: s["batch_index"] = [x for x in range(batch_index, batch_index+length)] else: s["batch_index"] = samples["batch_index"][batch_index:batch_index + length] return (s,) class RepeatLatentBatch: @classmethod def INPUT_TYPES(s): return {"required": { "samples": ("LATENT",), "amount": ("INT", {"default": 1, "min": 1, "max": 64}), }} RETURN_TYPES = ("LATENT",) FUNCTION = "repeat" CATEGORY = "latent/batch" def repeat(self, samples, amount): s = samples.copy() s_in = samples["samples"] s["samples"] = s_in.repeat((amount, 1,1,1)) if "noise_mask" in samples and samples["noise_mask"].shape[0] > 1: masks = samples["noise_mask"] if masks.shape[0] < s_in.shape[0]: masks = masks.repeat(math.ceil(s_in.shape[0] / masks.shape[0]), 1, 1, 1)[:s_in.shape[0]] s["noise_mask"] = samples["noise_mask"].repeat((amount, 1,1,1)) if "batch_index" in s: offset = max(s["batch_index"]) - min(s["batch_index"]) + 1 s["batch_index"] = s["batch_index"] + [x + (i * offset) for i in range(1, amount) for x in s["batch_index"]] return (s,) class LatentUpscale: upscale_methods = ["nearest-exact", "bilinear", "area", "bicubic", "bislerp"] crop_methods = ["disabled", "center"] @classmethod def INPUT_TYPES(s): return {"required": { "samples": ("LATENT",), "upscale_method": (s.upscale_methods,), "width": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 8}), "height": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 8}), "crop": (s.crop_methods,)}} RETURN_TYPES = ("LATENT",) FUNCTION = "upscale" CATEGORY = "latent" def upscale(self, samples, upscale_method, width, height, crop): if width == 0 and height == 0: s = samples else: s = samples.copy() if width == 0: height = max(64, height) width = max(64, round(samples["samples"].shape[3] * height / samples["samples"].shape[2])) elif height == 0: width = max(64, width) height = max(64, round(samples["samples"].shape[2] * width / samples["samples"].shape[3])) else: width = max(64, width) height = max(64, height) s["samples"] = comfy.utils.common_upscale(samples["samples"], width // 8, height // 8, upscale_method, crop) return (s,) class LatentUpscaleBy: upscale_methods = ["nearest-exact", "bilinear", "area", "bicubic", "bislerp"] @classmethod def INPUT_TYPES(s): return {"required": { "samples": ("LATENT",), "upscale_method": (s.upscale_methods,), "scale_by": ("FLOAT", {"default": 1.5, "min": 0.01, "max": 8.0, "step": 0.01}),}} RETURN_TYPES = ("LATENT",) FUNCTION = "upscale" CATEGORY = "latent" def upscale(self, samples, upscale_method, scale_by): s = samples.copy() width = round(samples["samples"].shape[3] * scale_by) height = round(samples["samples"].shape[2] * scale_by) s["samples"] = comfy.utils.common_upscale(samples["samples"], width, height, upscale_method, "disabled") return (s,) class LatentRotate: @classmethod def INPUT_TYPES(s): return {"required": { "samples": ("LATENT",), "rotation": (["none", "90 degrees", "180 degrees", "270 degrees"],), }} RETURN_TYPES = ("LATENT",) FUNCTION = "rotate" CATEGORY = "latent/transform" def rotate(self, samples, rotation): s = samples.copy() rotate_by = 0 if rotation.startswith("90"): rotate_by = 1 elif rotation.startswith("180"): rotate_by = 2 elif rotation.startswith("270"): rotate_by = 3 s["samples"] = torch.rot90(samples["samples"], k=rotate_by, dims=[3, 2]) return (s,) class LatentFlip: @classmethod def INPUT_TYPES(s): return {"required": { "samples": ("LATENT",), "flip_method": (["x-axis: vertically", "y-axis: horizontally"],), }} RETURN_TYPES = ("LATENT",) FUNCTION = "flip" CATEGORY = "latent/transform" def flip(self, samples, flip_method): s = samples.copy() if flip_method.startswith("x"): s["samples"] = torch.flip(samples["samples"], dims=[2]) elif flip_method.startswith("y"): s["samples"] = torch.flip(samples["samples"], dims=[3]) return (s,) class LatentComposite: @classmethod def INPUT_TYPES(s): return {"required": { "samples_to": ("LATENT",), "samples_from": ("LATENT",), "x": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), "y": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), "feather": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), }} RETURN_TYPES = ("LATENT",) FUNCTION = "composite" CATEGORY = "latent" def composite(self, samples_to, samples_from, x, y, composite_method="normal", feather=0): x = x // 8 y = y // 8 feather = feather // 8 samples_out = samples_to.copy() s = samples_to["samples"].clone() samples_to = samples_to["samples"] samples_from = samples_from["samples"] if feather == 0: s[:,:,y:y+samples_from.shape[2],x:x+samples_from.shape[3]] = samples_from[:,:,:samples_to.shape[2] - y, :samples_to.shape[3] - x] else: samples_from = samples_from[:,:,:samples_to.shape[2] - y, :samples_to.shape[3] - x] mask = torch.ones_like(samples_from) for t in range(feather): if y != 0: mask[:,:,t:1+t,:] *= ((1.0/feather) * (t + 1)) if y + samples_from.shape[2] < samples_to.shape[2]: mask[:,:,mask.shape[2] -1 -t: mask.shape[2]-t,:] *= ((1.0/feather) * (t + 1)) if x != 0: mask[:,:,:,t:1+t] *= ((1.0/feather) * (t + 1)) if x + samples_from.shape[3] < samples_to.shape[3]: mask[:,:,:,mask.shape[3]- 1 - t: mask.shape[3]- t] *= ((1.0/feather) * (t + 1)) rev_mask = torch.ones_like(mask) - mask s[:,:,y:y+samples_from.shape[2],x:x+samples_from.shape[3]] = samples_from[:,:,:samples_to.shape[2] - y, :samples_to.shape[3] - x] * mask + s[:,:,y:y+samples_from.shape[2],x:x+samples_from.shape[3]] * rev_mask samples_out["samples"] = s return (samples_out,) class LatentBlend: @classmethod def INPUT_TYPES(s): return {"required": { "samples1": ("LATENT",), "samples2": ("LATENT",), "blend_factor": ("FLOAT", { "default": 0.5, "min": 0, "max": 1, "step": 0.01 }), }} RETURN_TYPES = ("LATENT",) FUNCTION = "blend" CATEGORY = "_for_testing" def blend(self, samples1, samples2, blend_factor:float, blend_mode: str="normal"): samples_out = samples1.copy() samples1 = samples1["samples"] samples2 = samples2["samples"] if samples1.shape != samples2.shape: samples2.permute(0, 3, 1, 2) samples2 = comfy.utils.common_upscale(samples2, samples1.shape[3], samples1.shape[2], 'bicubic', crop='center') samples2.permute(0, 2, 3, 1) samples_blended = self.blend_mode(samples1, samples2, blend_mode) samples_blended = samples1 * blend_factor + samples_blended * (1 - blend_factor) samples_out["samples"] = samples_blended return (samples_out,) def blend_mode(self, img1, img2, mode): if mode == "normal": return img2 else: raise ValueError(f"Unsupported blend mode: {mode}") class LatentCrop: @classmethod def INPUT_TYPES(s): return {"required": { "samples": ("LATENT",), "width": ("INT", {"default": 512, "min": 64, "max": MAX_RESOLUTION, "step": 8}), "height": ("INT", {"default": 512, "min": 64, "max": MAX_RESOLUTION, "step": 8}), "x": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), "y": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), }} RETURN_TYPES = ("LATENT",) FUNCTION = "crop" CATEGORY = "latent/transform" def crop(self, samples, width, height, x, y): s = samples.copy() samples = samples['samples'] x = x // 8 y = y // 8 #enfonce minimum size of 64 if x > (samples.shape[3] - 8): x = samples.shape[3] - 8 if y > (samples.shape[2] - 8): y = samples.shape[2] - 8 new_height = height // 8 new_width = width // 8 to_x = new_width + x to_y = new_height + y s['samples'] = samples[:,:,y:to_y, x:to_x] return (s,) class SetLatentNoiseMask: @classmethod def INPUT_TYPES(s): return {"required": { "samples": ("LATENT",), "mask": ("MASK",), }} RETURN_TYPES = ("LATENT",) FUNCTION = "set_mask" CATEGORY = "latent/inpaint" def set_mask(self, samples, mask): s = samples.copy() s["noise_mask"] = mask.reshape((-1, 1, mask.shape[-2], mask.shape[-1])) return (s,) def common_ksampler(model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent, denoise=1.0, disable_noise=False, start_step=None, last_step=None, force_full_denoise=False): latent_image = latent["samples"] latent_image = comfy.sample.fix_empty_latent_channels(model, latent_image) if disable_noise: noise = torch.zeros(latent_image.size(), dtype=latent_image.dtype, layout=latent_image.layout, device="cpu") else: batch_inds = latent["batch_index"] if "batch_index" in latent else None noise = comfy.sample.prepare_noise(latent_image, seed, batch_inds) noise_mask = None if "noise_mask" in latent: noise_mask = latent["noise_mask"] callback = latent_preview.prepare_callback(model, steps) disable_pbar = not comfy.utils.PROGRESS_BAR_ENABLED samples = comfy.sample.sample(model, noise, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise=denoise, disable_noise=disable_noise, start_step=start_step, last_step=last_step, force_full_denoise=force_full_denoise, noise_mask=noise_mask, callback=callback, disable_pbar=disable_pbar, seed=seed) out = latent.copy() out["samples"] = samples return (out, ) class KSampler: @classmethod def INPUT_TYPES(s): return {"required": {"model": ("MODEL",), "seed": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff}), "steps": ("INT", {"default": 20, "min": 1, "max": 10000}), "cfg": ("FLOAT", {"default": 8.0, "min": 0.0, "max": 100.0, "step":0.1, "round": 0.01}), "sampler_name": (comfy.samplers.KSampler.SAMPLERS, ), "scheduler": (comfy.samplers.KSampler.SCHEDULERS, ), "positive": ("CONDITIONING", ), "negative": ("CONDITIONING", ), "latent_image": ("LATENT", ), "denoise": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01}), } } RETURN_TYPES = ("LATENT",) FUNCTION = "sample" CATEGORY = "sampling" def sample(self, model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise=1.0): return common_ksampler(model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise=denoise) class KSamplerAdvanced: @classmethod def INPUT_TYPES(s): return {"required": {"model": ("MODEL",), "add_noise": (["enable", "disable"], ), "noise_seed": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff}), "steps": ("INT", {"default": 20, "min": 1, "max": 10000}), "cfg": ("FLOAT", {"default": 8.0, "min": 0.0, "max": 100.0, "step":0.1, "round": 0.01}), "sampler_name": (comfy.samplers.KSampler.SAMPLERS, ), "scheduler": (comfy.samplers.KSampler.SCHEDULERS, ), "positive": ("CONDITIONING", ), "negative": ("CONDITIONING", ), "latent_image": ("LATENT", ), "start_at_step": ("INT", {"default": 0, "min": 0, "max": 10000}), "end_at_step": ("INT", {"default": 10000, "min": 0, "max": 10000}), "return_with_leftover_noise": (["disable", "enable"], ), } } RETURN_TYPES = ("LATENT",) FUNCTION = "sample" CATEGORY = "sampling" def sample(self, model, add_noise, noise_seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, start_at_step, end_at_step, return_with_leftover_noise, denoise=1.0): force_full_denoise = True if return_with_leftover_noise == "enable": force_full_denoise = False disable_noise = False if add_noise == "disable": disable_noise = True return common_ksampler(model, noise_seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise=denoise, disable_noise=disable_noise, start_step=start_at_step, last_step=end_at_step, force_full_denoise=force_full_denoise) class SaveImage: def __init__(self): self.output_dir = folder_paths.get_output_directory() self.type = "output" self.prefix_append = "" self.compress_level = 4 @classmethod def INPUT_TYPES(s): return {"required": {"images": ("IMAGE", ), "filename_prefix": ("STRING", {"default": "ComfyUI"})}, "hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO"}, } RETURN_TYPES = () FUNCTION = "save_images" OUTPUT_NODE = True CATEGORY = "image" def save_images(self, images, filename_prefix="ComfyUI", prompt=None, extra_pnginfo=None): filename_prefix += self.prefix_append full_output_folder, filename, counter, subfolder, filename_prefix = folder_paths.get_save_image_path(filename_prefix, self.output_dir, images[0].shape[1], images[0].shape[0]) results = list() for (batch_number, image) in enumerate(images): i = 255. * image.cpu().numpy() img = Image.fromarray(np.clip(i, 0, 255).astype(np.uint8)) metadata = None if not args.disable_metadata: metadata = PngInfo() if prompt is not None: metadata.add_text("prompt", json.dumps(prompt)) if extra_pnginfo is not None: for x in extra_pnginfo: metadata.add_text(x, json.dumps(extra_pnginfo[x])) filename_with_batch_num = filename.replace("%batch_num%", str(batch_number)) file = f"{filename_with_batch_num}_{counter:05}_.png" img.save(os.path.join(full_output_folder, file), pnginfo=metadata, compress_level=self.compress_level) results.append({ "filename": file, "subfolder": subfolder, "type": self.type }) counter += 1 return { "ui": { "images": results } } class PreviewImage(SaveImage): def __init__(self): self.output_dir = folder_paths.get_temp_directory() self.type = "temp" self.prefix_append = "_temp_" + ''.join(random.choice("abcdefghijklmnopqrstupvxyz") for x in range(5)) self.compress_level = 1 @classmethod def INPUT_TYPES(s): return {"required": {"images": ("IMAGE", ), }, "hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO"}, } class LoadImage: @classmethod def INPUT_TYPES(s): input_dir = folder_paths.get_input_directory() files = [f for f in os.listdir(input_dir) if os.path.isfile(os.path.join(input_dir, f))] return {"required": {"image": (sorted(files), {"image_upload": True})}, } CATEGORY = "image" RETURN_TYPES = ("IMAGE", "MASK") FUNCTION = "load_image" def load_image(self, image): image_path = folder_paths.get_annotated_filepath(image) img = node_helpers.pillow(Image.open, image_path) output_images = [] output_masks = [] w, h = None, None excluded_formats = ['MPO'] for i in ImageSequence.Iterator(img): i = node_helpers.pillow(ImageOps.exif_transpose, i) if i.mode == 'I': i = i.point(lambda i: i * (1 / 255)) image = i.convert("RGB") if len(output_images) == 0: w = image.size[0] h = image.size[1] if image.size[0] != w or image.size[1] != h: continue image = np.array(image).astype(np.float32) / 255.0 image = torch.from_numpy(image)[None,] if 'A' in i.getbands(): mask = np.array(i.getchannel('A')).astype(np.float32) / 255.0 mask = 1. - torch.from_numpy(mask) else: mask = torch.zeros((64,64), dtype=torch.float32, device="cpu") output_images.append(image) output_masks.append(mask.unsqueeze(0)) if len(output_images) > 1 and img.format not in excluded_formats: output_image = torch.cat(output_images, dim=0) output_mask = torch.cat(output_masks, dim=0) else: output_image = output_images[0] output_mask = output_masks[0] return (output_image, output_mask) @classmethod def IS_CHANGED(s, image): image_path = folder_paths.get_annotated_filepath(image) m = hashlib.sha256() with open(image_path, 'rb') as f: m.update(f.read()) return m.digest().hex() @classmethod def VALIDATE_INPUTS(s, image): if not folder_paths.exists_annotated_filepath(image): return "Invalid image file: {}".format(image) return True class LoadImageMask: _color_channels = ["alpha", "red", "green", "blue"] @classmethod def INPUT_TYPES(s): input_dir = folder_paths.get_input_directory() files = [f for f in os.listdir(input_dir) if os.path.isfile(os.path.join(input_dir, f))] return {"required": {"image": (sorted(files), {"image_upload": True}), "channel": (s._color_channels, ), } } CATEGORY = "mask" RETURN_TYPES = ("MASK",) FUNCTION = "load_image" def load_image(self, image, channel): image_path = folder_paths.get_annotated_filepath(image) i = node_helpers.pillow(Image.open, image_path) i = node_helpers.pillow(ImageOps.exif_transpose, i) if i.getbands() != ("R", "G", "B", "A"): if i.mode == 'I': i = i.point(lambda i: i * (1 / 255)) i = i.convert("RGBA") mask = None c = channel[0].upper() if c in i.getbands(): mask = np.array(i.getchannel(c)).astype(np.float32) / 255.0 mask = torch.from_numpy(mask) if c == 'A': mask = 1. - mask else: mask = torch.zeros((64,64), dtype=torch.float32, device="cpu") return (mask.unsqueeze(0),) @classmethod def IS_CHANGED(s, image, channel): image_path = folder_paths.get_annotated_filepath(image) m = hashlib.sha256() with open(image_path, 'rb') as f: m.update(f.read()) return m.digest().hex() @classmethod def VALIDATE_INPUTS(s, image): if not folder_paths.exists_annotated_filepath(image): return "Invalid image file: {}".format(image) return True class ImageScale: upscale_methods = ["nearest-exact", "bilinear", "area", "bicubic", "lanczos"] crop_methods = ["disabled", "center"] @classmethod def INPUT_TYPES(s): return {"required": { "image": ("IMAGE",), "upscale_method": (s.upscale_methods,), "width": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 1}), "height": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 1}), "crop": (s.crop_methods,)}} RETURN_TYPES = ("IMAGE",) FUNCTION = "upscale" CATEGORY = "image/upscaling" def upscale(self, image, upscale_method, width, height, crop): if width == 0 and height == 0: s = image else: samples = image.movedim(-1,1) if width == 0: width = max(1, round(samples.shape[3] * height / samples.shape[2])) elif height == 0: height = max(1, round(samples.shape[2] * width / samples.shape[3])) s = comfy.utils.common_upscale(samples, width, height, upscale_method, crop) s = s.movedim(1,-1) return (s,) class ImageScaleBy: upscale_methods = ["nearest-exact", "bilinear", "area", "bicubic", "lanczos"] @classmethod def INPUT_TYPES(s): return {"required": { "image": ("IMAGE",), "upscale_method": (s.upscale_methods,), "scale_by": ("FLOAT", {"default": 1.0, "min": 0.01, "max": 8.0, "step": 0.01}),}} RETURN_TYPES = ("IMAGE",) FUNCTION = "upscale" CATEGORY = "image/upscaling" def upscale(self, image, upscale_method, scale_by): samples = image.movedim(-1,1) width = round(samples.shape[3] * scale_by) height = round(samples.shape[2] * scale_by) s = comfy.utils.common_upscale(samples, width, height, upscale_method, "disabled") s = s.movedim(1,-1) return (s,) class ImageInvert: @classmethod def INPUT_TYPES(s): return {"required": { "image": ("IMAGE",)}} RETURN_TYPES = ("IMAGE",) FUNCTION = "invert" CATEGORY = "image" def invert(self, image): s = 1.0 - image return (s,) class ImageBatch: @classmethod def INPUT_TYPES(s): return {"required": { "image1": ("IMAGE",), "image2": ("IMAGE",)}} RETURN_TYPES = ("IMAGE",) FUNCTION = "batch" CATEGORY = "image" def batch(self, image1, image2): if image1.shape[1:] != image2.shape[1:]: image2 = comfy.utils.common_upscale(image2.movedim(-1,1), image1.shape[2], image1.shape[1], "bilinear", "center").movedim(1,-1) s = torch.cat((image1, image2), dim=0) return (s,) class EmptyImage: def __init__(self, device="cpu"): self.device = device @classmethod def INPUT_TYPES(s): return {"required": { "width": ("INT", {"default": 512, "min": 1, "max": MAX_RESOLUTION, "step": 1}), "height": ("INT", {"default": 512, "min": 1, "max": MAX_RESOLUTION, "step": 1}), "batch_size": ("INT", {"default": 1, "min": 1, "max": 4096}), "color": ("INT", {"default": 0, "min": 0, "max": 0xFFFFFF, "step": 1, "display": "color"}), }} RETURN_TYPES = ("IMAGE",) FUNCTION = "generate" CATEGORY = "image" def generate(self, width, height, batch_size=1, color=0): r = torch.full([batch_size, height, width, 1], ((color >> 16) & 0xFF) / 0xFF) g = torch.full([batch_size, height, width, 1], ((color >> 8) & 0xFF) / 0xFF) b = torch.full([batch_size, height, width, 1], ((color) & 0xFF) / 0xFF) return (torch.cat((r, g, b), dim=-1), ) class ImagePadForOutpaint: @classmethod def INPUT_TYPES(s): return { "required": { "image": ("IMAGE",), "left": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), "top": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), "right": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), "bottom": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), "feathering": ("INT", {"default": 40, "min": 0, "max": MAX_RESOLUTION, "step": 1}), } } RETURN_TYPES = ("IMAGE", "MASK") FUNCTION = "expand_image" CATEGORY = "image" def expand_image(self, image, left, top, right, bottom, feathering): d1, d2, d3, d4 = image.size() new_image = torch.ones( (d1, d2 + top + bottom, d3 + left + right, d4), dtype=torch.float32, ) * 0.5 new_image[:, top:top + d2, left:left + d3, :] = image mask = torch.ones( (d2 + top + bottom, d3 + left + right), dtype=torch.float32, ) t = torch.zeros( (d2, d3), dtype=torch.float32 ) if feathering > 0 and feathering * 2 < d2 and feathering * 2 < d3: for i in range(d2): for j in range(d3): dt = i if top != 0 else d2 db = d2 - i if bottom != 0 else d2 dl = j if left != 0 else d3 dr = d3 - j if right != 0 else d3 d = min(dt, db, dl, dr) if d >= feathering: continue v = (feathering - d) / feathering t[i, j] = v * v mask[top:top + d2, left:left + d3] = t return (new_image, mask) NODE_CLASS_MAPPINGS = { "KSampler": KSampler, "CheckpointLoaderSimple": CheckpointLoaderSimple, "CLIPTextEncode": CLIPTextEncode, "CLIPSetLastLayer": CLIPSetLastLayer, "VAEDecode": VAEDecode, "VAEEncode": VAEEncode, "VAEEncodeForInpaint": VAEEncodeForInpaint, "VAELoader": VAELoader, "EmptyLatentImage": EmptyLatentImage, "LatentUpscale": LatentUpscale, "LatentUpscaleBy": LatentUpscaleBy, "LatentFromBatch": LatentFromBatch, "RepeatLatentBatch": RepeatLatentBatch, "SaveImage": SaveImage, "PreviewImage": PreviewImage, "LoadImage": LoadImage, "LoadImageMask": LoadImageMask, "ImageScale": ImageScale, "ImageScaleBy": ImageScaleBy, "ImageInvert": ImageInvert, "ImageBatch": ImageBatch, "ImagePadForOutpaint": ImagePadForOutpaint, "EmptyImage": EmptyImage, "ConditioningAverage": ConditioningAverage , "ConditioningCombine": ConditioningCombine, "ConditioningConcat": ConditioningConcat, "ConditioningSetArea": ConditioningSetArea, "ConditioningSetAreaPercentage": ConditioningSetAreaPercentage, "ConditioningSetAreaStrength": ConditioningSetAreaStrength, "ConditioningSetMask": ConditioningSetMask, "KSamplerAdvanced": KSamplerAdvanced, "SetLatentNoiseMask": SetLatentNoiseMask, "LatentComposite": LatentComposite, "LatentBlend": LatentBlend, "LatentRotate": LatentRotate, "LatentFlip": LatentFlip, "LatentCrop": LatentCrop, "LoraLoader": LoraLoader, "CLIPLoader": CLIPLoader, "UNETLoader": UNETLoader, "DualCLIPLoader": DualCLIPLoader, "CLIPVisionEncode": CLIPVisionEncode, "StyleModelApply": StyleModelApply, "unCLIPConditioning": unCLIPConditioning, "ControlNetApply": ControlNetApply, "ControlNetApplyAdvanced": ControlNetApplyAdvanced, "ControlNetLoader": ControlNetLoader, "DiffControlNetLoader": DiffControlNetLoader, "StyleModelLoader": StyleModelLoader, "CLIPVisionLoader": CLIPVisionLoader, "VAEDecodeTiled": VAEDecodeTiled, "VAEEncodeTiled": VAEEncodeTiled, "unCLIPCheckpointLoader": unCLIPCheckpointLoader, "GLIGENLoader": GLIGENLoader, "GLIGENTextBoxApply": GLIGENTextBoxApply, "InpaintModelConditioning": InpaintModelConditioning, "CheckpointLoader": CheckpointLoader, "DiffusersLoader": DiffusersLoader, "LoadLatent": LoadLatent, "SaveLatent": SaveLatent, "ConditioningZeroOut": ConditioningZeroOut, "ConditioningSetTimestepRange": ConditioningSetTimestepRange, "LoraLoaderModelOnly": LoraLoaderModelOnly, } NODE_DISPLAY_NAME_MAPPINGS = { # Sampling "KSampler": "KSampler", "KSamplerAdvanced": "KSampler (Advanced)", # Loaders "CheckpointLoader": "Load Checkpoint With Config (DEPRECATED)", "CheckpointLoaderSimple": "Load Checkpoint", "VAELoader": "Load VAE", "LoraLoader": "Load LoRA", "CLIPLoader": "Load CLIP", "ControlNetLoader": "Load ControlNet Model", "DiffControlNetLoader": "Load ControlNet Model (diff)", "StyleModelLoader": "Load Style Model", "CLIPVisionLoader": "Load CLIP Vision", "UpscaleModelLoader": "Load Upscale Model", # Conditioning "CLIPVisionEncode": "CLIP Vision Encode", "StyleModelApply": "Apply Style Model", "CLIPTextEncode": "CLIP Text Encode (Prompt)", "CLIPSetLastLayer": "CLIP Set Last Layer", "ConditioningCombine": "Conditioning (Combine)", "ConditioningAverage ": "Conditioning (Average)", "ConditioningConcat": "Conditioning (Concat)", "ConditioningSetArea": "Conditioning (Set Area)", "ConditioningSetAreaPercentage": "Conditioning (Set Area with Percentage)", "ConditioningSetMask": "Conditioning (Set Mask)", "ControlNetApply": "Apply ControlNet", "ControlNetApplyAdvanced": "Apply ControlNet (Advanced)", # Latent "VAEEncodeForInpaint": "VAE Encode (for Inpainting)", "SetLatentNoiseMask": "Set Latent Noise Mask", "VAEDecode": "VAE Decode", "VAEEncode": "VAE Encode", "LatentRotate": "Rotate Latent", "LatentFlip": "Flip Latent", "LatentCrop": "Crop Latent", "EmptyLatentImage": "Empty Latent Image", "LatentUpscale": "Upscale Latent", "LatentUpscaleBy": "Upscale Latent By", "LatentComposite": "Latent Composite", "LatentBlend": "Latent Blend", "LatentFromBatch" : "Latent From Batch", "RepeatLatentBatch": "Repeat Latent Batch", # Image "SaveImage": "Save Image", "PreviewImage": "Preview Image", "LoadImage": "Load Image", "LoadImageMask": "Load Image (as Mask)", "ImageScale": "Upscale Image", "ImageScaleBy": "Upscale Image By", "ImageUpscaleWithModel": "Upscale Image (using Model)", "ImageInvert": "Invert Image", "ImagePadForOutpaint": "Pad Image for Outpainting", "ImageBatch": "Batch Images", # _for_testing "VAEDecodeTiled": "VAE Decode (Tiled)", "VAEEncodeTiled": "VAE Encode (Tiled)", } EXTENSION_WEB_DIRS = {} def load_custom_node(module_path, ignore=set()): module_name = os.path.basename(module_path) if os.path.isfile(module_path): sp = os.path.splitext(module_path) module_name = sp[0] try: logging.debug("Trying to load custom node {}".format(module_path)) if os.path.isfile(module_path): module_spec = importlib.util.spec_from_file_location(module_name, module_path) module_dir = os.path.split(module_path)[0] else: module_spec = importlib.util.spec_from_file_location(module_name, os.path.join(module_path, "__init__.py")) module_dir = module_path module = importlib.util.module_from_spec(module_spec) sys.modules[module_name] = module module_spec.loader.exec_module(module) if hasattr(module, "WEB_DIRECTORY") and getattr(module, "WEB_DIRECTORY") is not None: web_dir = os.path.abspath(os.path.join(module_dir, getattr(module, "WEB_DIRECTORY"))) if os.path.isdir(web_dir): EXTENSION_WEB_DIRS[module_name] = web_dir if hasattr(module, "NODE_CLASS_MAPPINGS") and getattr(module, "NODE_CLASS_MAPPINGS") is not None: for name in module.NODE_CLASS_MAPPINGS: if name not in ignore: NODE_CLASS_MAPPINGS[name] = module.NODE_CLASS_MAPPINGS[name] if hasattr(module, "NODE_DISPLAY_NAME_MAPPINGS") and getattr(module, "NODE_DISPLAY_NAME_MAPPINGS") is not None: NODE_DISPLAY_NAME_MAPPINGS.update(module.NODE_DISPLAY_NAME_MAPPINGS) return True else: logging.warning(f"Skip {module_path} module for custom nodes due to the lack of NODE_CLASS_MAPPINGS.") return False except Exception as e: logging.warning(traceback.format_exc()) logging.warning(f"Cannot import {module_path} module for custom nodes: {e}") return False def load_custom_nodes(): base_node_names = set(NODE_CLASS_MAPPINGS.keys()) node_paths = folder_paths.get_folder_paths("custom_nodes") node_import_times = [] for custom_node_path in node_paths: possible_modules = os.listdir(os.path.realpath(custom_node_path)) if "__pycache__" in possible_modules: possible_modules.remove("__pycache__") for possible_module in possible_modules: module_path = os.path.join(custom_node_path, possible_module) if os.path.isfile(module_path) and os.path.splitext(module_path)[1] != ".py": continue if module_path.endswith(".disabled"): continue time_before = time.perf_counter() success = load_custom_node(module_path, base_node_names) node_import_times.append((time.perf_counter() - time_before, module_path, success)) if len(node_import_times) > 0: logging.info("\nImport times for custom nodes:") for n in sorted(node_import_times): if n[2]: import_message = "" else: import_message = " (IMPORT FAILED)" logging.info("{:6.1f} seconds{}: {}".format(n[0], import_message, n[1])) logging.info("") def init_custom_nodes(): extras_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "comfy_extras") extras_files = [ "nodes_latent.py", "nodes_hypernetwork.py", "nodes_upscale_model.py", "nodes_post_processing.py", "nodes_mask.py", "nodes_compositing.py", "nodes_rebatch.py", "nodes_model_merging.py", "nodes_tomesd.py", "nodes_clip_sdxl.py", "nodes_canny.py", "nodes_freelunch.py", "nodes_custom_sampler.py", "nodes_hypertile.py", "nodes_model_advanced.py", "nodes_model_downscale.py", "nodes_images.py", "nodes_video_model.py", "nodes_sag.py", "nodes_perpneg.py", "nodes_stable3d.py", "nodes_sdupscale.py", "nodes_photomaker.py", "nodes_cond.py", "nodes_morphology.py", "nodes_stable_cascade.py", "nodes_differential_diffusion.py", "nodes_ip2p.py", "nodes_model_merging_model_specific.py", "nodes_pag.py", "nodes_align_your_steps.py", "nodes_attention_multiply.py", "nodes_advanced_samplers.py", "nodes_webcam.py", "nodes_audio.py", "nodes_sd3.py", "nodes_gits.py", ] import_failed = [] for node_file in extras_files: if not load_custom_node(os.path.join(extras_dir, node_file)): import_failed.append(node_file) load_custom_nodes() if len(import_failed) > 0: logging.warning("WARNING: some comfy_extras/ nodes did not import correctly. This may be because they are missing some dependencies.\n") for node in import_failed: logging.warning("IMPORT FAILED: {}".format(node)) logging.warning("\nThis issue might be caused by new missing dependencies added the last time you updated ComfyUI.") if args.windows_standalone_build: logging.warning("Please run the update script: update/update_comfyui.bat") else: logging.warning("Please do a: pip install -r requirements.txt") logging.warning("")