2172 lines
84 KiB
Python
2172 lines
84 KiB
Python
import torch
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import hashlib
|
|
import traceback
|
|
import math
|
|
import time
|
|
import random
|
|
import logging
|
|
|
|
from PIL import Image, ImageOps, ImageSequence, ImageFile
|
|
from PIL.PngImagePlugin import PngInfo
|
|
|
|
import numpy as np
|
|
import safetensors.torch
|
|
|
|
sys.path.insert(0, os.path.join(os.path.dirname(os.path.realpath(__file__)), "comfy"))
|
|
|
|
import comfy.diffusers_load
|
|
import comfy.samplers
|
|
import comfy.sample
|
|
import comfy.sd
|
|
import comfy.utils
|
|
import comfy.controlnet
|
|
|
|
import comfy.clip_vision
|
|
|
|
import comfy.model_management
|
|
from comfy.cli_args import args
|
|
|
|
import importlib
|
|
|
|
import folder_paths
|
|
import latent_preview
|
|
import node_helpers
|
|
|
|
def before_node_execution():
|
|
comfy.model_management.throw_exception_if_processing_interrupted()
|
|
|
|
def interrupt_processing(value=True):
|
|
comfy.model_management.interrupt_current_processing(value)
|
|
|
|
MAX_RESOLUTION=16384
|
|
|
|
class CLIPTextEncode:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {
|
|
"required": {
|
|
"text": ("STRING", {"multiline": True, "dynamicPrompts": True, "tooltip": "The text to be encoded."}),
|
|
"clip": ("CLIP", {"tooltip": "The CLIP model used for encoding the text."})
|
|
}
|
|
}
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
OUTPUT_TOOLTIPS = ("A conditioning containing the embedded text used to guide the diffusion model.",)
|
|
FUNCTION = "encode"
|
|
|
|
CATEGORY = "conditioning"
|
|
DESCRIPTION = "Encodes a text prompt using a CLIP model into an embedding that can be used to guide the diffusion model towards generating specific images."
|
|
|
|
def encode(self, clip, text):
|
|
tokens = clip.tokenize(text)
|
|
output = clip.encode_from_tokens(tokens, return_pooled=True, return_dict=True)
|
|
cond = output.pop("cond")
|
|
return ([[cond, output]], )
|
|
|
|
class ConditioningCombine:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"conditioning_1": ("CONDITIONING", ), "conditioning_2": ("CONDITIONING", )}}
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
FUNCTION = "combine"
|
|
|
|
CATEGORY = "conditioning"
|
|
|
|
def combine(self, conditioning_1, conditioning_2):
|
|
return (conditioning_1 + conditioning_2, )
|
|
|
|
class ConditioningAverage :
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"conditioning_to": ("CONDITIONING", ), "conditioning_from": ("CONDITIONING", ),
|
|
"conditioning_to_strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01})
|
|
}}
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
FUNCTION = "addWeighted"
|
|
|
|
CATEGORY = "conditioning"
|
|
|
|
def addWeighted(self, conditioning_to, conditioning_from, conditioning_to_strength):
|
|
out = []
|
|
|
|
if len(conditioning_from) > 1:
|
|
logging.warning("Warning: ConditioningAverage conditioning_from contains more than 1 cond, only the first one will actually be applied to conditioning_to.")
|
|
|
|
cond_from = conditioning_from[0][0]
|
|
pooled_output_from = conditioning_from[0][1].get("pooled_output", None)
|
|
|
|
for i in range(len(conditioning_to)):
|
|
t1 = conditioning_to[i][0]
|
|
pooled_output_to = conditioning_to[i][1].get("pooled_output", pooled_output_from)
|
|
t0 = cond_from[:,:t1.shape[1]]
|
|
if t0.shape[1] < t1.shape[1]:
|
|
t0 = torch.cat([t0] + [torch.zeros((1, (t1.shape[1] - t0.shape[1]), t1.shape[2]))], dim=1)
|
|
|
|
tw = torch.mul(t1, conditioning_to_strength) + torch.mul(t0, (1.0 - conditioning_to_strength))
|
|
t_to = conditioning_to[i][1].copy()
|
|
if pooled_output_from is not None and pooled_output_to is not None:
|
|
t_to["pooled_output"] = torch.mul(pooled_output_to, conditioning_to_strength) + torch.mul(pooled_output_from, (1.0 - conditioning_to_strength))
|
|
elif pooled_output_from is not None:
|
|
t_to["pooled_output"] = pooled_output_from
|
|
|
|
n = [tw, t_to]
|
|
out.append(n)
|
|
return (out, )
|
|
|
|
class ConditioningConcat:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {
|
|
"conditioning_to": ("CONDITIONING",),
|
|
"conditioning_from": ("CONDITIONING",),
|
|
}}
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
FUNCTION = "concat"
|
|
|
|
CATEGORY = "conditioning"
|
|
|
|
def concat(self, conditioning_to, conditioning_from):
|
|
out = []
|
|
|
|
if len(conditioning_from) > 1:
|
|
logging.warning("Warning: ConditioningConcat conditioning_from contains more than 1 cond, only the first one will actually be applied to conditioning_to.")
|
|
|
|
cond_from = conditioning_from[0][0]
|
|
|
|
for i in range(len(conditioning_to)):
|
|
t1 = conditioning_to[i][0]
|
|
tw = torch.cat((t1, cond_from),1)
|
|
n = [tw, conditioning_to[i][1].copy()]
|
|
out.append(n)
|
|
|
|
return (out, )
|
|
|
|
class ConditioningSetArea:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"conditioning": ("CONDITIONING", ),
|
|
"width": ("INT", {"default": 64, "min": 64, "max": MAX_RESOLUTION, "step": 8}),
|
|
"height": ("INT", {"default": 64, "min": 64, "max": MAX_RESOLUTION, "step": 8}),
|
|
"x": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
"y": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
"strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}),
|
|
}}
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
FUNCTION = "append"
|
|
|
|
CATEGORY = "conditioning"
|
|
|
|
def append(self, conditioning, width, height, x, y, strength):
|
|
c = node_helpers.conditioning_set_values(conditioning, {"area": (height // 8, width // 8, y // 8, x // 8),
|
|
"strength": strength,
|
|
"set_area_to_bounds": False})
|
|
return (c, )
|
|
|
|
class ConditioningSetAreaPercentage:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"conditioning": ("CONDITIONING", ),
|
|
"width": ("FLOAT", {"default": 1.0, "min": 0, "max": 1.0, "step": 0.01}),
|
|
"height": ("FLOAT", {"default": 1.0, "min": 0, "max": 1.0, "step": 0.01}),
|
|
"x": ("FLOAT", {"default": 0, "min": 0, "max": 1.0, "step": 0.01}),
|
|
"y": ("FLOAT", {"default": 0, "min": 0, "max": 1.0, "step": 0.01}),
|
|
"strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}),
|
|
}}
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
FUNCTION = "append"
|
|
|
|
CATEGORY = "conditioning"
|
|
|
|
def append(self, conditioning, width, height, x, y, strength):
|
|
c = node_helpers.conditioning_set_values(conditioning, {"area": ("percentage", height, width, y, x),
|
|
"strength": strength,
|
|
"set_area_to_bounds": False})
|
|
return (c, )
|
|
|
|
class ConditioningSetAreaStrength:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"conditioning": ("CONDITIONING", ),
|
|
"strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}),
|
|
}}
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
FUNCTION = "append"
|
|
|
|
CATEGORY = "conditioning"
|
|
|
|
def append(self, conditioning, strength):
|
|
c = node_helpers.conditioning_set_values(conditioning, {"strength": strength})
|
|
return (c, )
|
|
|
|
|
|
class ConditioningSetMask:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"conditioning": ("CONDITIONING", ),
|
|
"mask": ("MASK", ),
|
|
"strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}),
|
|
"set_cond_area": (["default", "mask bounds"],),
|
|
}}
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
FUNCTION = "append"
|
|
|
|
CATEGORY = "conditioning"
|
|
|
|
def append(self, conditioning, mask, set_cond_area, strength):
|
|
set_area_to_bounds = False
|
|
if set_cond_area != "default":
|
|
set_area_to_bounds = True
|
|
if len(mask.shape) < 3:
|
|
mask = mask.unsqueeze(0)
|
|
|
|
c = node_helpers.conditioning_set_values(conditioning, {"mask": mask,
|
|
"set_area_to_bounds": set_area_to_bounds,
|
|
"mask_strength": strength})
|
|
return (c, )
|
|
|
|
class ConditioningZeroOut:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"conditioning": ("CONDITIONING", )}}
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
FUNCTION = "zero_out"
|
|
|
|
CATEGORY = "advanced/conditioning"
|
|
|
|
def zero_out(self, conditioning):
|
|
c = []
|
|
for t in conditioning:
|
|
d = t[1].copy()
|
|
pooled_output = d.get("pooled_output", None)
|
|
if pooled_output is not None:
|
|
d["pooled_output"] = torch.zeros_like(pooled_output)
|
|
n = [torch.zeros_like(t[0]), d]
|
|
c.append(n)
|
|
return (c, )
|
|
|
|
class ConditioningSetTimestepRange:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"conditioning": ("CONDITIONING", ),
|
|
"start": ("FLOAT", {"default": 0.0, "min": 0.0, "max": 1.0, "step": 0.001}),
|
|
"end": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.001})
|
|
}}
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
FUNCTION = "set_range"
|
|
|
|
CATEGORY = "advanced/conditioning"
|
|
|
|
def set_range(self, conditioning, start, end):
|
|
c = node_helpers.conditioning_set_values(conditioning, {"start_percent": start,
|
|
"end_percent": end})
|
|
return (c, )
|
|
|
|
class VAEDecode:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {
|
|
"required": {
|
|
"samples": ("LATENT", {"tooltip": "The latent to be decoded."}),
|
|
"vae": ("VAE", {"tooltip": "The VAE model used for decoding the latent."})
|
|
}
|
|
}
|
|
RETURN_TYPES = ("IMAGE",)
|
|
OUTPUT_TOOLTIPS = ("The decoded image.",)
|
|
FUNCTION = "decode"
|
|
|
|
CATEGORY = "latent"
|
|
DESCRIPTION = "Decodes latent images back into pixel space images."
|
|
|
|
def decode(self, vae, samples):
|
|
images = vae.decode(samples["samples"])
|
|
if len(images.shape) == 5: #Combine batches
|
|
images = images.reshape(-1, images.shape[-3], images.shape[-2], images.shape[-1])
|
|
return (images, )
|
|
|
|
class VAEDecodeTiled:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"samples": ("LATENT", ), "vae": ("VAE", ),
|
|
"tile_size": ("INT", {"default": 512, "min": 128, "max": 4096, "step": 32}),
|
|
"overlap": ("INT", {"default": 64, "min": 0, "max": 4096, "step": 32}),
|
|
}}
|
|
RETURN_TYPES = ("IMAGE",)
|
|
FUNCTION = "decode"
|
|
|
|
CATEGORY = "_for_testing"
|
|
|
|
def decode(self, vae, samples, tile_size, overlap=64):
|
|
if tile_size < overlap * 4:
|
|
overlap = tile_size // 4
|
|
images = vae.decode_tiled(samples["samples"], tile_x=tile_size // 8, tile_y=tile_size // 8, overlap=overlap // 8)
|
|
if len(images.shape) == 5: #Combine batches
|
|
images = images.reshape(-1, images.shape[-3], images.shape[-2], images.shape[-1])
|
|
return (images, )
|
|
|
|
class VAEEncode:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "pixels": ("IMAGE", ), "vae": ("VAE", )}}
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "encode"
|
|
|
|
CATEGORY = "latent"
|
|
|
|
def encode(self, vae, pixels):
|
|
t = vae.encode(pixels[:,:,:,:3])
|
|
return ({"samples":t}, )
|
|
|
|
class VAEEncodeTiled:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"pixels": ("IMAGE", ), "vae": ("VAE", ),
|
|
"tile_size": ("INT", {"default": 512, "min": 320, "max": 4096, "step": 64})
|
|
}}
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "encode"
|
|
|
|
CATEGORY = "_for_testing"
|
|
|
|
def encode(self, vae, pixels, tile_size):
|
|
t = vae.encode_tiled(pixels[:,:,:,:3], tile_x=tile_size, tile_y=tile_size, )
|
|
return ({"samples":t}, )
|
|
|
|
class VAEEncodeForInpaint:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "pixels": ("IMAGE", ), "vae": ("VAE", ), "mask": ("MASK", ), "grow_mask_by": ("INT", {"default": 6, "min": 0, "max": 64, "step": 1}),}}
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "encode"
|
|
|
|
CATEGORY = "latent/inpaint"
|
|
|
|
def encode(self, vae, pixels, mask, grow_mask_by=6):
|
|
x = (pixels.shape[1] // vae.downscale_ratio) * vae.downscale_ratio
|
|
y = (pixels.shape[2] // vae.downscale_ratio) * vae.downscale_ratio
|
|
mask = torch.nn.functional.interpolate(mask.reshape((-1, 1, mask.shape[-2], mask.shape[-1])), size=(pixels.shape[1], pixels.shape[2]), mode="bilinear")
|
|
|
|
pixels = pixels.clone()
|
|
if pixels.shape[1] != x or pixels.shape[2] != y:
|
|
x_offset = (pixels.shape[1] % vae.downscale_ratio) // 2
|
|
y_offset = (pixels.shape[2] % vae.downscale_ratio) // 2
|
|
pixels = pixels[:,x_offset:x + x_offset, y_offset:y + y_offset,:]
|
|
mask = mask[:,:,x_offset:x + x_offset, y_offset:y + y_offset]
|
|
|
|
#grow mask by a few pixels to keep things seamless in latent space
|
|
if grow_mask_by == 0:
|
|
mask_erosion = mask
|
|
else:
|
|
kernel_tensor = torch.ones((1, 1, grow_mask_by, grow_mask_by))
|
|
padding = math.ceil((grow_mask_by - 1) / 2)
|
|
|
|
mask_erosion = torch.clamp(torch.nn.functional.conv2d(mask.round(), kernel_tensor, padding=padding), 0, 1)
|
|
|
|
m = (1.0 - mask.round()).squeeze(1)
|
|
for i in range(3):
|
|
pixels[:,:,:,i] -= 0.5
|
|
pixels[:,:,:,i] *= m
|
|
pixels[:,:,:,i] += 0.5
|
|
t = vae.encode(pixels)
|
|
|
|
return ({"samples":t, "noise_mask": (mask_erosion[:,:,:x,:y].round())}, )
|
|
|
|
|
|
class InpaintModelConditioning:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"positive": ("CONDITIONING", ),
|
|
"negative": ("CONDITIONING", ),
|
|
"vae": ("VAE", ),
|
|
"pixels": ("IMAGE", ),
|
|
"mask": ("MASK", ),
|
|
"noise_mask": ("BOOLEAN", {"default": True, "tooltip": "Add a noise mask to the latent so sampling will only happen within the mask. Might improve results or completely break things depending on the model."}),
|
|
}}
|
|
|
|
RETURN_TYPES = ("CONDITIONING","CONDITIONING","LATENT")
|
|
RETURN_NAMES = ("positive", "negative", "latent")
|
|
FUNCTION = "encode"
|
|
|
|
CATEGORY = "conditioning/inpaint"
|
|
|
|
def encode(self, positive, negative, pixels, vae, mask, noise_mask):
|
|
x = (pixels.shape[1] // 8) * 8
|
|
y = (pixels.shape[2] // 8) * 8
|
|
mask = torch.nn.functional.interpolate(mask.reshape((-1, 1, mask.shape[-2], mask.shape[-1])), size=(pixels.shape[1], pixels.shape[2]), mode="bilinear")
|
|
|
|
orig_pixels = pixels
|
|
pixels = orig_pixels.clone()
|
|
if pixels.shape[1] != x or pixels.shape[2] != y:
|
|
x_offset = (pixels.shape[1] % 8) // 2
|
|
y_offset = (pixels.shape[2] % 8) // 2
|
|
pixels = pixels[:,x_offset:x + x_offset, y_offset:y + y_offset,:]
|
|
mask = mask[:,:,x_offset:x + x_offset, y_offset:y + y_offset]
|
|
|
|
m = (1.0 - mask.round()).squeeze(1)
|
|
for i in range(3):
|
|
pixels[:,:,:,i] -= 0.5
|
|
pixels[:,:,:,i] *= m
|
|
pixels[:,:,:,i] += 0.5
|
|
concat_latent = vae.encode(pixels)
|
|
orig_latent = vae.encode(orig_pixels)
|
|
|
|
out_latent = {}
|
|
|
|
out_latent["samples"] = orig_latent
|
|
if noise_mask:
|
|
out_latent["noise_mask"] = mask
|
|
|
|
out = []
|
|
for conditioning in [positive, negative]:
|
|
c = node_helpers.conditioning_set_values(conditioning, {"concat_latent_image": concat_latent,
|
|
"concat_mask": mask})
|
|
out.append(c)
|
|
return (out[0], out[1], out_latent)
|
|
|
|
|
|
class SaveLatent:
|
|
def __init__(self):
|
|
self.output_dir = folder_paths.get_output_directory()
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "samples": ("LATENT", ),
|
|
"filename_prefix": ("STRING", {"default": "latents/ComfyUI"})},
|
|
"hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO"},
|
|
}
|
|
RETURN_TYPES = ()
|
|
FUNCTION = "save"
|
|
|
|
OUTPUT_NODE = True
|
|
|
|
CATEGORY = "_for_testing"
|
|
|
|
def save(self, samples, filename_prefix="ComfyUI", prompt=None, extra_pnginfo=None):
|
|
full_output_folder, filename, counter, subfolder, filename_prefix = folder_paths.get_save_image_path(filename_prefix, self.output_dir)
|
|
|
|
# support save metadata for latent sharing
|
|
prompt_info = ""
|
|
if prompt is not None:
|
|
prompt_info = json.dumps(prompt)
|
|
|
|
metadata = None
|
|
if not args.disable_metadata:
|
|
metadata = {"prompt": prompt_info}
|
|
if extra_pnginfo is not None:
|
|
for x in extra_pnginfo:
|
|
metadata[x] = json.dumps(extra_pnginfo[x])
|
|
|
|
file = f"{filename}_{counter:05}_.latent"
|
|
|
|
results = list()
|
|
results.append({
|
|
"filename": file,
|
|
"subfolder": subfolder,
|
|
"type": "output"
|
|
})
|
|
|
|
file = os.path.join(full_output_folder, file)
|
|
|
|
output = {}
|
|
output["latent_tensor"] = samples["samples"]
|
|
output["latent_format_version_0"] = torch.tensor([])
|
|
|
|
comfy.utils.save_torch_file(output, file, metadata=metadata)
|
|
return { "ui": { "latents": results } }
|
|
|
|
|
|
class LoadLatent:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
input_dir = folder_paths.get_input_directory()
|
|
files = [f for f in os.listdir(input_dir) if os.path.isfile(os.path.join(input_dir, f)) and f.endswith(".latent")]
|
|
return {"required": {"latent": [sorted(files), ]}, }
|
|
|
|
CATEGORY = "_for_testing"
|
|
|
|
RETURN_TYPES = ("LATENT", )
|
|
FUNCTION = "load"
|
|
|
|
def load(self, latent):
|
|
latent_path = folder_paths.get_annotated_filepath(latent)
|
|
latent = safetensors.torch.load_file(latent_path, device="cpu")
|
|
multiplier = 1.0
|
|
if "latent_format_version_0" not in latent:
|
|
multiplier = 1.0 / 0.18215
|
|
samples = {"samples": latent["latent_tensor"].float() * multiplier}
|
|
return (samples, )
|
|
|
|
@classmethod
|
|
def IS_CHANGED(s, latent):
|
|
image_path = folder_paths.get_annotated_filepath(latent)
|
|
m = hashlib.sha256()
|
|
with open(image_path, 'rb') as f:
|
|
m.update(f.read())
|
|
return m.digest().hex()
|
|
|
|
@classmethod
|
|
def VALIDATE_INPUTS(s, latent):
|
|
if not folder_paths.exists_annotated_filepath(latent):
|
|
return "Invalid latent file: {}".format(latent)
|
|
return True
|
|
|
|
|
|
class CheckpointLoader:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "config_name": (folder_paths.get_filename_list("configs"), ),
|
|
"ckpt_name": (folder_paths.get_filename_list("checkpoints"), )}}
|
|
RETURN_TYPES = ("MODEL", "CLIP", "VAE")
|
|
FUNCTION = "load_checkpoint"
|
|
|
|
CATEGORY = "advanced/loaders"
|
|
DEPRECATED = True
|
|
|
|
def load_checkpoint(self, config_name, ckpt_name):
|
|
config_path = folder_paths.get_full_path("configs", config_name)
|
|
ckpt_path = folder_paths.get_full_path_or_raise("checkpoints", ckpt_name)
|
|
return comfy.sd.load_checkpoint(config_path, ckpt_path, output_vae=True, output_clip=True, embedding_directory=folder_paths.get_folder_paths("embeddings"))
|
|
|
|
class CheckpointLoaderSimple:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {
|
|
"required": {
|
|
"ckpt_name": (folder_paths.get_filename_list("checkpoints"), {"tooltip": "The name of the checkpoint (model) to load."}),
|
|
}
|
|
}
|
|
RETURN_TYPES = ("MODEL", "CLIP", "VAE")
|
|
OUTPUT_TOOLTIPS = ("The model used for denoising latents.",
|
|
"The CLIP model used for encoding text prompts.",
|
|
"The VAE model used for encoding and decoding images to and from latent space.")
|
|
FUNCTION = "load_checkpoint"
|
|
|
|
CATEGORY = "loaders"
|
|
DESCRIPTION = "Loads a diffusion model checkpoint, diffusion models are used to denoise latents."
|
|
|
|
def load_checkpoint(self, ckpt_name):
|
|
ckpt_path = folder_paths.get_full_path_or_raise("checkpoints", ckpt_name)
|
|
out = comfy.sd.load_checkpoint_guess_config(ckpt_path, output_vae=True, output_clip=True, embedding_directory=folder_paths.get_folder_paths("embeddings"))
|
|
return out[:3]
|
|
|
|
class DiffusersLoader:
|
|
@classmethod
|
|
def INPUT_TYPES(cls):
|
|
paths = []
|
|
for search_path in folder_paths.get_folder_paths("diffusers"):
|
|
if os.path.exists(search_path):
|
|
for root, subdir, files in os.walk(search_path, followlinks=True):
|
|
if "model_index.json" in files:
|
|
paths.append(os.path.relpath(root, start=search_path))
|
|
|
|
return {"required": {"model_path": (paths,), }}
|
|
RETURN_TYPES = ("MODEL", "CLIP", "VAE")
|
|
FUNCTION = "load_checkpoint"
|
|
|
|
CATEGORY = "advanced/loaders/deprecated"
|
|
|
|
def load_checkpoint(self, model_path, output_vae=True, output_clip=True):
|
|
for search_path in folder_paths.get_folder_paths("diffusers"):
|
|
if os.path.exists(search_path):
|
|
path = os.path.join(search_path, model_path)
|
|
if os.path.exists(path):
|
|
model_path = path
|
|
break
|
|
|
|
return comfy.diffusers_load.load_diffusers(model_path, output_vae=output_vae, output_clip=output_clip, embedding_directory=folder_paths.get_folder_paths("embeddings"))
|
|
|
|
|
|
class unCLIPCheckpointLoader:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "ckpt_name": (folder_paths.get_filename_list("checkpoints"), ),
|
|
}}
|
|
RETURN_TYPES = ("MODEL", "CLIP", "VAE", "CLIP_VISION")
|
|
FUNCTION = "load_checkpoint"
|
|
|
|
CATEGORY = "loaders"
|
|
|
|
def load_checkpoint(self, ckpt_name, output_vae=True, output_clip=True):
|
|
ckpt_path = folder_paths.get_full_path_or_raise("checkpoints", ckpt_name)
|
|
out = comfy.sd.load_checkpoint_guess_config(ckpt_path, output_vae=True, output_clip=True, output_clipvision=True, embedding_directory=folder_paths.get_folder_paths("embeddings"))
|
|
return out
|
|
|
|
class CLIPSetLastLayer:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "clip": ("CLIP", ),
|
|
"stop_at_clip_layer": ("INT", {"default": -1, "min": -24, "max": -1, "step": 1}),
|
|
}}
|
|
RETURN_TYPES = ("CLIP",)
|
|
FUNCTION = "set_last_layer"
|
|
|
|
CATEGORY = "conditioning"
|
|
|
|
def set_last_layer(self, clip, stop_at_clip_layer):
|
|
clip = clip.clone()
|
|
clip.clip_layer(stop_at_clip_layer)
|
|
return (clip,)
|
|
|
|
class LoraLoader:
|
|
def __init__(self):
|
|
self.loaded_lora = None
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {
|
|
"required": {
|
|
"model": ("MODEL", {"tooltip": "The diffusion model the LoRA will be applied to."}),
|
|
"clip": ("CLIP", {"tooltip": "The CLIP model the LoRA will be applied to."}),
|
|
"lora_name": (folder_paths.get_filename_list("loras"), {"tooltip": "The name of the LoRA."}),
|
|
"strength_model": ("FLOAT", {"default": 1.0, "min": -100.0, "max": 100.0, "step": 0.01, "tooltip": "How strongly to modify the diffusion model. This value can be negative."}),
|
|
"strength_clip": ("FLOAT", {"default": 1.0, "min": -100.0, "max": 100.0, "step": 0.01, "tooltip": "How strongly to modify the CLIP model. This value can be negative."}),
|
|
}
|
|
}
|
|
|
|
RETURN_TYPES = ("MODEL", "CLIP")
|
|
OUTPUT_TOOLTIPS = ("The modified diffusion model.", "The modified CLIP model.")
|
|
FUNCTION = "load_lora"
|
|
|
|
CATEGORY = "loaders"
|
|
DESCRIPTION = "LoRAs are used to modify diffusion and CLIP models, altering the way in which latents are denoised such as applying styles. Multiple LoRA nodes can be linked together."
|
|
|
|
def load_lora(self, model, clip, lora_name, strength_model, strength_clip):
|
|
if strength_model == 0 and strength_clip == 0:
|
|
return (model, clip)
|
|
|
|
lora_path = folder_paths.get_full_path_or_raise("loras", lora_name)
|
|
lora = None
|
|
if self.loaded_lora is not None:
|
|
if self.loaded_lora[0] == lora_path:
|
|
lora = self.loaded_lora[1]
|
|
else:
|
|
temp = self.loaded_lora
|
|
self.loaded_lora = None
|
|
del temp
|
|
|
|
if lora is None:
|
|
lora = comfy.utils.load_torch_file(lora_path, safe_load=True)
|
|
self.loaded_lora = (lora_path, lora)
|
|
|
|
model_lora, clip_lora = comfy.sd.load_lora_for_models(model, clip, lora, strength_model, strength_clip)
|
|
return (model_lora, clip_lora)
|
|
|
|
class LoraLoaderModelOnly(LoraLoader):
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "model": ("MODEL",),
|
|
"lora_name": (folder_paths.get_filename_list("loras"), ),
|
|
"strength_model": ("FLOAT", {"default": 1.0, "min": -100.0, "max": 100.0, "step": 0.01}),
|
|
}}
|
|
RETURN_TYPES = ("MODEL",)
|
|
FUNCTION = "load_lora_model_only"
|
|
|
|
def load_lora_model_only(self, model, lora_name, strength_model):
|
|
return (self.load_lora(model, None, lora_name, strength_model, 0)[0],)
|
|
|
|
class VAELoader:
|
|
@staticmethod
|
|
def vae_list():
|
|
vaes = folder_paths.get_filename_list("vae")
|
|
approx_vaes = folder_paths.get_filename_list("vae_approx")
|
|
sdxl_taesd_enc = False
|
|
sdxl_taesd_dec = False
|
|
sd1_taesd_enc = False
|
|
sd1_taesd_dec = False
|
|
sd3_taesd_enc = False
|
|
sd3_taesd_dec = False
|
|
f1_taesd_enc = False
|
|
f1_taesd_dec = False
|
|
|
|
for v in approx_vaes:
|
|
if v.startswith("taesd_decoder."):
|
|
sd1_taesd_dec = True
|
|
elif v.startswith("taesd_encoder."):
|
|
sd1_taesd_enc = True
|
|
elif v.startswith("taesdxl_decoder."):
|
|
sdxl_taesd_dec = True
|
|
elif v.startswith("taesdxl_encoder."):
|
|
sdxl_taesd_enc = True
|
|
elif v.startswith("taesd3_decoder."):
|
|
sd3_taesd_dec = True
|
|
elif v.startswith("taesd3_encoder."):
|
|
sd3_taesd_enc = True
|
|
elif v.startswith("taef1_encoder."):
|
|
f1_taesd_dec = True
|
|
elif v.startswith("taef1_decoder."):
|
|
f1_taesd_enc = True
|
|
if sd1_taesd_dec and sd1_taesd_enc:
|
|
vaes.append("taesd")
|
|
if sdxl_taesd_dec and sdxl_taesd_enc:
|
|
vaes.append("taesdxl")
|
|
if sd3_taesd_dec and sd3_taesd_enc:
|
|
vaes.append("taesd3")
|
|
if f1_taesd_dec and f1_taesd_enc:
|
|
vaes.append("taef1")
|
|
return vaes
|
|
|
|
@staticmethod
|
|
def load_taesd(name):
|
|
sd = {}
|
|
approx_vaes = folder_paths.get_filename_list("vae_approx")
|
|
|
|
encoder = next(filter(lambda a: a.startswith("{}_encoder.".format(name)), approx_vaes))
|
|
decoder = next(filter(lambda a: a.startswith("{}_decoder.".format(name)), approx_vaes))
|
|
|
|
enc = comfy.utils.load_torch_file(folder_paths.get_full_path_or_raise("vae_approx", encoder))
|
|
for k in enc:
|
|
sd["taesd_encoder.{}".format(k)] = enc[k]
|
|
|
|
dec = comfy.utils.load_torch_file(folder_paths.get_full_path_or_raise("vae_approx", decoder))
|
|
for k in dec:
|
|
sd["taesd_decoder.{}".format(k)] = dec[k]
|
|
|
|
if name == "taesd":
|
|
sd["vae_scale"] = torch.tensor(0.18215)
|
|
sd["vae_shift"] = torch.tensor(0.0)
|
|
elif name == "taesdxl":
|
|
sd["vae_scale"] = torch.tensor(0.13025)
|
|
sd["vae_shift"] = torch.tensor(0.0)
|
|
elif name == "taesd3":
|
|
sd["vae_scale"] = torch.tensor(1.5305)
|
|
sd["vae_shift"] = torch.tensor(0.0609)
|
|
elif name == "taef1":
|
|
sd["vae_scale"] = torch.tensor(0.3611)
|
|
sd["vae_shift"] = torch.tensor(0.1159)
|
|
return sd
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "vae_name": (s.vae_list(), )}}
|
|
RETURN_TYPES = ("VAE",)
|
|
FUNCTION = "load_vae"
|
|
|
|
CATEGORY = "loaders"
|
|
|
|
#TODO: scale factor?
|
|
def load_vae(self, vae_name):
|
|
if vae_name in ["taesd", "taesdxl", "taesd3", "taef1"]:
|
|
sd = self.load_taesd(vae_name)
|
|
else:
|
|
vae_path = folder_paths.get_full_path_or_raise("vae", vae_name)
|
|
sd = comfy.utils.load_torch_file(vae_path)
|
|
vae = comfy.sd.VAE(sd=sd)
|
|
return (vae,)
|
|
|
|
class ControlNetLoader:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "control_net_name": (folder_paths.get_filename_list("controlnet"), )}}
|
|
|
|
RETURN_TYPES = ("CONTROL_NET",)
|
|
FUNCTION = "load_controlnet"
|
|
|
|
CATEGORY = "loaders"
|
|
|
|
def load_controlnet(self, control_net_name):
|
|
controlnet_path = folder_paths.get_full_path_or_raise("controlnet", control_net_name)
|
|
controlnet = comfy.controlnet.load_controlnet(controlnet_path)
|
|
return (controlnet,)
|
|
|
|
class DiffControlNetLoader:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "model": ("MODEL",),
|
|
"control_net_name": (folder_paths.get_filename_list("controlnet"), )}}
|
|
|
|
RETURN_TYPES = ("CONTROL_NET",)
|
|
FUNCTION = "load_controlnet"
|
|
|
|
CATEGORY = "loaders"
|
|
|
|
def load_controlnet(self, model, control_net_name):
|
|
controlnet_path = folder_paths.get_full_path_or_raise("controlnet", control_net_name)
|
|
controlnet = comfy.controlnet.load_controlnet(controlnet_path, model)
|
|
return (controlnet,)
|
|
|
|
|
|
class ControlNetApply:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"conditioning": ("CONDITIONING", ),
|
|
"control_net": ("CONTROL_NET", ),
|
|
"image": ("IMAGE", ),
|
|
"strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01})
|
|
}}
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
FUNCTION = "apply_controlnet"
|
|
|
|
DEPRECATED = True
|
|
CATEGORY = "conditioning/controlnet"
|
|
|
|
def apply_controlnet(self, conditioning, control_net, image, strength):
|
|
if strength == 0:
|
|
return (conditioning, )
|
|
|
|
c = []
|
|
control_hint = image.movedim(-1,1)
|
|
for t in conditioning:
|
|
n = [t[0], t[1].copy()]
|
|
c_net = control_net.copy().set_cond_hint(control_hint, strength)
|
|
if 'control' in t[1]:
|
|
c_net.set_previous_controlnet(t[1]['control'])
|
|
n[1]['control'] = c_net
|
|
n[1]['control_apply_to_uncond'] = True
|
|
c.append(n)
|
|
return (c, )
|
|
|
|
|
|
class ControlNetApplyAdvanced:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"positive": ("CONDITIONING", ),
|
|
"negative": ("CONDITIONING", ),
|
|
"control_net": ("CONTROL_NET", ),
|
|
"image": ("IMAGE", ),
|
|
"strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}),
|
|
"start_percent": ("FLOAT", {"default": 0.0, "min": 0.0, "max": 1.0, "step": 0.001}),
|
|
"end_percent": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.001})
|
|
},
|
|
"optional": {"vae": ("VAE", ),
|
|
}
|
|
}
|
|
|
|
RETURN_TYPES = ("CONDITIONING","CONDITIONING")
|
|
RETURN_NAMES = ("positive", "negative")
|
|
FUNCTION = "apply_controlnet"
|
|
|
|
CATEGORY = "conditioning/controlnet"
|
|
|
|
def apply_controlnet(self, positive, negative, control_net, image, strength, start_percent, end_percent, vae=None, extra_concat=[]):
|
|
if strength == 0:
|
|
return (positive, negative)
|
|
|
|
control_hint = image.movedim(-1,1)
|
|
cnets = {}
|
|
|
|
out = []
|
|
for conditioning in [positive, negative]:
|
|
c = []
|
|
for t in conditioning:
|
|
d = t[1].copy()
|
|
|
|
prev_cnet = d.get('control', None)
|
|
if prev_cnet in cnets:
|
|
c_net = cnets[prev_cnet]
|
|
else:
|
|
c_net = control_net.copy().set_cond_hint(control_hint, strength, (start_percent, end_percent), vae=vae, extra_concat=extra_concat)
|
|
c_net.set_previous_controlnet(prev_cnet)
|
|
cnets[prev_cnet] = c_net
|
|
|
|
d['control'] = c_net
|
|
d['control_apply_to_uncond'] = False
|
|
n = [t[0], d]
|
|
c.append(n)
|
|
out.append(c)
|
|
return (out[0], out[1])
|
|
|
|
|
|
class UNETLoader:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "unet_name": (folder_paths.get_filename_list("diffusion_models"), ),
|
|
"weight_dtype": (["default", "fp8_e4m3fn", "fp8_e4m3fn_fast", "fp8_e5m2"],)
|
|
}}
|
|
RETURN_TYPES = ("MODEL",)
|
|
FUNCTION = "load_unet"
|
|
|
|
CATEGORY = "advanced/loaders"
|
|
|
|
def load_unet(self, unet_name, weight_dtype):
|
|
model_options = {}
|
|
if weight_dtype == "fp8_e4m3fn":
|
|
model_options["dtype"] = torch.float8_e4m3fn
|
|
elif weight_dtype == "fp8_e4m3fn_fast":
|
|
model_options["dtype"] = torch.float8_e4m3fn
|
|
model_options["fp8_optimizations"] = True
|
|
elif weight_dtype == "fp8_e5m2":
|
|
model_options["dtype"] = torch.float8_e5m2
|
|
|
|
unet_path = folder_paths.get_full_path_or_raise("diffusion_models", unet_name)
|
|
model = comfy.sd.load_diffusion_model(unet_path, model_options=model_options)
|
|
return (model,)
|
|
|
|
class CLIPLoader:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "clip_name": (folder_paths.get_filename_list("text_encoders"), ),
|
|
"type": (["stable_diffusion", "stable_cascade", "sd3", "stable_audio", "mochi", "ltxv"], ),
|
|
}}
|
|
RETURN_TYPES = ("CLIP",)
|
|
FUNCTION = "load_clip"
|
|
|
|
CATEGORY = "advanced/loaders"
|
|
|
|
DESCRIPTION = "[Recipes]\n\nstable_diffusion: clip-l\nstable_cascade: clip-g\nsd3: t5 / clip-g / clip-l\nstable_audio: t5\nmochi: t5"
|
|
|
|
def load_clip(self, clip_name, type="stable_diffusion"):
|
|
if type == "stable_cascade":
|
|
clip_type = comfy.sd.CLIPType.STABLE_CASCADE
|
|
elif type == "sd3":
|
|
clip_type = comfy.sd.CLIPType.SD3
|
|
elif type == "stable_audio":
|
|
clip_type = comfy.sd.CLIPType.STABLE_AUDIO
|
|
elif type == "mochi":
|
|
clip_type = comfy.sd.CLIPType.MOCHI
|
|
elif type == "ltxv":
|
|
clip_type = comfy.sd.CLIPType.LTXV
|
|
else:
|
|
clip_type = comfy.sd.CLIPType.STABLE_DIFFUSION
|
|
|
|
clip_path = folder_paths.get_full_path_or_raise("text_encoders", clip_name)
|
|
clip = comfy.sd.load_clip(ckpt_paths=[clip_path], embedding_directory=folder_paths.get_folder_paths("embeddings"), clip_type=clip_type)
|
|
return (clip,)
|
|
|
|
class DualCLIPLoader:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "clip_name1": (folder_paths.get_filename_list("text_encoders"), ),
|
|
"clip_name2": (folder_paths.get_filename_list("text_encoders"), ),
|
|
"type": (["sdxl", "sd3", "flux"], ),
|
|
}}
|
|
RETURN_TYPES = ("CLIP",)
|
|
FUNCTION = "load_clip"
|
|
|
|
CATEGORY = "advanced/loaders"
|
|
|
|
DESCRIPTION = "[Recipes]\n\nsdxl: clip-l, clip-g\nsd3: clip-l, clip-g / clip-l, t5 / clip-g, t5\nflux: clip-l, t5"
|
|
|
|
def load_clip(self, clip_name1, clip_name2, type):
|
|
clip_path1 = folder_paths.get_full_path_or_raise("text_encoders", clip_name1)
|
|
clip_path2 = folder_paths.get_full_path_or_raise("text_encoders", clip_name2)
|
|
if type == "sdxl":
|
|
clip_type = comfy.sd.CLIPType.STABLE_DIFFUSION
|
|
elif type == "sd3":
|
|
clip_type = comfy.sd.CLIPType.SD3
|
|
elif type == "flux":
|
|
clip_type = comfy.sd.CLIPType.FLUX
|
|
|
|
clip = comfy.sd.load_clip(ckpt_paths=[clip_path1, clip_path2], embedding_directory=folder_paths.get_folder_paths("embeddings"), clip_type=clip_type)
|
|
return (clip,)
|
|
|
|
class CLIPVisionLoader:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "clip_name": (folder_paths.get_filename_list("clip_vision"), ),
|
|
}}
|
|
RETURN_TYPES = ("CLIP_VISION",)
|
|
FUNCTION = "load_clip"
|
|
|
|
CATEGORY = "loaders"
|
|
|
|
def load_clip(self, clip_name):
|
|
clip_path = folder_paths.get_full_path_or_raise("clip_vision", clip_name)
|
|
clip_vision = comfy.clip_vision.load(clip_path)
|
|
return (clip_vision,)
|
|
|
|
class CLIPVisionEncode:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "clip_vision": ("CLIP_VISION",),
|
|
"image": ("IMAGE",)
|
|
}}
|
|
RETURN_TYPES = ("CLIP_VISION_OUTPUT",)
|
|
FUNCTION = "encode"
|
|
|
|
CATEGORY = "conditioning"
|
|
|
|
def encode(self, clip_vision, image):
|
|
output = clip_vision.encode_image(image)
|
|
return (output,)
|
|
|
|
class StyleModelLoader:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "style_model_name": (folder_paths.get_filename_list("style_models"), )}}
|
|
|
|
RETURN_TYPES = ("STYLE_MODEL",)
|
|
FUNCTION = "load_style_model"
|
|
|
|
CATEGORY = "loaders"
|
|
|
|
def load_style_model(self, style_model_name):
|
|
style_model_path = folder_paths.get_full_path_or_raise("style_models", style_model_name)
|
|
style_model = comfy.sd.load_style_model(style_model_path)
|
|
return (style_model,)
|
|
|
|
|
|
class StyleModelApply:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"conditioning": ("CONDITIONING", ),
|
|
"style_model": ("STYLE_MODEL", ),
|
|
"clip_vision_output": ("CLIP_VISION_OUTPUT", ),
|
|
}}
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
FUNCTION = "apply_stylemodel"
|
|
|
|
CATEGORY = "conditioning/style_model"
|
|
|
|
def apply_stylemodel(self, clip_vision_output, style_model, conditioning):
|
|
cond = style_model.get_cond(clip_vision_output).flatten(start_dim=0, end_dim=1).unsqueeze(dim=0)
|
|
c = []
|
|
for t in conditioning:
|
|
n = [torch.cat((t[0], cond), dim=1), t[1].copy()]
|
|
c.append(n)
|
|
return (c, )
|
|
|
|
class unCLIPConditioning:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"conditioning": ("CONDITIONING", ),
|
|
"clip_vision_output": ("CLIP_VISION_OUTPUT", ),
|
|
"strength": ("FLOAT", {"default": 1.0, "min": -10.0, "max": 10.0, "step": 0.01}),
|
|
"noise_augmentation": ("FLOAT", {"default": 0.0, "min": 0.0, "max": 1.0, "step": 0.01}),
|
|
}}
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
FUNCTION = "apply_adm"
|
|
|
|
CATEGORY = "conditioning"
|
|
|
|
def apply_adm(self, conditioning, clip_vision_output, strength, noise_augmentation):
|
|
if strength == 0:
|
|
return (conditioning, )
|
|
|
|
c = []
|
|
for t in conditioning:
|
|
o = t[1].copy()
|
|
x = {"clip_vision_output": clip_vision_output, "strength": strength, "noise_augmentation": noise_augmentation}
|
|
if "unclip_conditioning" in o:
|
|
o["unclip_conditioning"] = o["unclip_conditioning"][:] + [x]
|
|
else:
|
|
o["unclip_conditioning"] = [x]
|
|
n = [t[0], o]
|
|
c.append(n)
|
|
return (c, )
|
|
|
|
class GLIGENLoader:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "gligen_name": (folder_paths.get_filename_list("gligen"), )}}
|
|
|
|
RETURN_TYPES = ("GLIGEN",)
|
|
FUNCTION = "load_gligen"
|
|
|
|
CATEGORY = "loaders"
|
|
|
|
def load_gligen(self, gligen_name):
|
|
gligen_path = folder_paths.get_full_path_or_raise("gligen", gligen_name)
|
|
gligen = comfy.sd.load_gligen(gligen_path)
|
|
return (gligen,)
|
|
|
|
class GLIGENTextBoxApply:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"conditioning_to": ("CONDITIONING", ),
|
|
"clip": ("CLIP", ),
|
|
"gligen_textbox_model": ("GLIGEN", ),
|
|
"text": ("STRING", {"multiline": True, "dynamicPrompts": True}),
|
|
"width": ("INT", {"default": 64, "min": 8, "max": MAX_RESOLUTION, "step": 8}),
|
|
"height": ("INT", {"default": 64, "min": 8, "max": MAX_RESOLUTION, "step": 8}),
|
|
"x": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
"y": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
}}
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
FUNCTION = "append"
|
|
|
|
CATEGORY = "conditioning/gligen"
|
|
|
|
def append(self, conditioning_to, clip, gligen_textbox_model, text, width, height, x, y):
|
|
c = []
|
|
cond, cond_pooled = clip.encode_from_tokens(clip.tokenize(text), return_pooled="unprojected")
|
|
for t in conditioning_to:
|
|
n = [t[0], t[1].copy()]
|
|
position_params = [(cond_pooled, height // 8, width // 8, y // 8, x // 8)]
|
|
prev = []
|
|
if "gligen" in n[1]:
|
|
prev = n[1]['gligen'][2]
|
|
|
|
n[1]['gligen'] = ("position", gligen_textbox_model, prev + position_params)
|
|
c.append(n)
|
|
return (c, )
|
|
|
|
class EmptyLatentImage:
|
|
def __init__(self):
|
|
self.device = comfy.model_management.intermediate_device()
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {
|
|
"required": {
|
|
"width": ("INT", {"default": 512, "min": 16, "max": MAX_RESOLUTION, "step": 8, "tooltip": "The width of the latent images in pixels."}),
|
|
"height": ("INT", {"default": 512, "min": 16, "max": MAX_RESOLUTION, "step": 8, "tooltip": "The height of the latent images in pixels."}),
|
|
"batch_size": ("INT", {"default": 1, "min": 1, "max": 4096, "tooltip": "The number of latent images in the batch."})
|
|
}
|
|
}
|
|
RETURN_TYPES = ("LATENT",)
|
|
OUTPUT_TOOLTIPS = ("The empty latent image batch.",)
|
|
FUNCTION = "generate"
|
|
|
|
CATEGORY = "latent"
|
|
DESCRIPTION = "Create a new batch of empty latent images to be denoised via sampling."
|
|
|
|
def generate(self, width, height, batch_size=1):
|
|
latent = torch.zeros([batch_size, 4, height // 8, width // 8], device=self.device)
|
|
return ({"samples":latent}, )
|
|
|
|
|
|
class LatentFromBatch:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "samples": ("LATENT",),
|
|
"batch_index": ("INT", {"default": 0, "min": 0, "max": 63}),
|
|
"length": ("INT", {"default": 1, "min": 1, "max": 64}),
|
|
}}
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "frombatch"
|
|
|
|
CATEGORY = "latent/batch"
|
|
|
|
def frombatch(self, samples, batch_index, length):
|
|
s = samples.copy()
|
|
s_in = samples["samples"]
|
|
batch_index = min(s_in.shape[0] - 1, batch_index)
|
|
length = min(s_in.shape[0] - batch_index, length)
|
|
s["samples"] = s_in[batch_index:batch_index + length].clone()
|
|
if "noise_mask" in samples:
|
|
masks = samples["noise_mask"]
|
|
if masks.shape[0] == 1:
|
|
s["noise_mask"] = masks.clone()
|
|
else:
|
|
if masks.shape[0] < s_in.shape[0]:
|
|
masks = masks.repeat(math.ceil(s_in.shape[0] / masks.shape[0]), 1, 1, 1)[:s_in.shape[0]]
|
|
s["noise_mask"] = masks[batch_index:batch_index + length].clone()
|
|
if "batch_index" not in s:
|
|
s["batch_index"] = [x for x in range(batch_index, batch_index+length)]
|
|
else:
|
|
s["batch_index"] = samples["batch_index"][batch_index:batch_index + length]
|
|
return (s,)
|
|
|
|
class RepeatLatentBatch:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "samples": ("LATENT",),
|
|
"amount": ("INT", {"default": 1, "min": 1, "max": 64}),
|
|
}}
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "repeat"
|
|
|
|
CATEGORY = "latent/batch"
|
|
|
|
def repeat(self, samples, amount):
|
|
s = samples.copy()
|
|
s_in = samples["samples"]
|
|
|
|
s["samples"] = s_in.repeat((amount, 1,1,1))
|
|
if "noise_mask" in samples and samples["noise_mask"].shape[0] > 1:
|
|
masks = samples["noise_mask"]
|
|
if masks.shape[0] < s_in.shape[0]:
|
|
masks = masks.repeat(math.ceil(s_in.shape[0] / masks.shape[0]), 1, 1, 1)[:s_in.shape[0]]
|
|
s["noise_mask"] = samples["noise_mask"].repeat((amount, 1,1,1))
|
|
if "batch_index" in s:
|
|
offset = max(s["batch_index"]) - min(s["batch_index"]) + 1
|
|
s["batch_index"] = s["batch_index"] + [x + (i * offset) for i in range(1, amount) for x in s["batch_index"]]
|
|
return (s,)
|
|
|
|
class LatentUpscale:
|
|
upscale_methods = ["nearest-exact", "bilinear", "area", "bicubic", "bislerp"]
|
|
crop_methods = ["disabled", "center"]
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "samples": ("LATENT",), "upscale_method": (s.upscale_methods,),
|
|
"width": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
"height": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
"crop": (s.crop_methods,)}}
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "upscale"
|
|
|
|
CATEGORY = "latent"
|
|
|
|
def upscale(self, samples, upscale_method, width, height, crop):
|
|
if width == 0 and height == 0:
|
|
s = samples
|
|
else:
|
|
s = samples.copy()
|
|
|
|
if width == 0:
|
|
height = max(64, height)
|
|
width = max(64, round(samples["samples"].shape[-1] * height / samples["samples"].shape[-2]))
|
|
elif height == 0:
|
|
width = max(64, width)
|
|
height = max(64, round(samples["samples"].shape[-2] * width / samples["samples"].shape[-1]))
|
|
else:
|
|
width = max(64, width)
|
|
height = max(64, height)
|
|
|
|
s["samples"] = comfy.utils.common_upscale(samples["samples"], width // 8, height // 8, upscale_method, crop)
|
|
return (s,)
|
|
|
|
class LatentUpscaleBy:
|
|
upscale_methods = ["nearest-exact", "bilinear", "area", "bicubic", "bislerp"]
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "samples": ("LATENT",), "upscale_method": (s.upscale_methods,),
|
|
"scale_by": ("FLOAT", {"default": 1.5, "min": 0.01, "max": 8.0, "step": 0.01}),}}
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "upscale"
|
|
|
|
CATEGORY = "latent"
|
|
|
|
def upscale(self, samples, upscale_method, scale_by):
|
|
s = samples.copy()
|
|
width = round(samples["samples"].shape[-1] * scale_by)
|
|
height = round(samples["samples"].shape[-2] * scale_by)
|
|
s["samples"] = comfy.utils.common_upscale(samples["samples"], width, height, upscale_method, "disabled")
|
|
return (s,)
|
|
|
|
class LatentRotate:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "samples": ("LATENT",),
|
|
"rotation": (["none", "90 degrees", "180 degrees", "270 degrees"],),
|
|
}}
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "rotate"
|
|
|
|
CATEGORY = "latent/transform"
|
|
|
|
def rotate(self, samples, rotation):
|
|
s = samples.copy()
|
|
rotate_by = 0
|
|
if rotation.startswith("90"):
|
|
rotate_by = 1
|
|
elif rotation.startswith("180"):
|
|
rotate_by = 2
|
|
elif rotation.startswith("270"):
|
|
rotate_by = 3
|
|
|
|
s["samples"] = torch.rot90(samples["samples"], k=rotate_by, dims=[3, 2])
|
|
return (s,)
|
|
|
|
class LatentFlip:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "samples": ("LATENT",),
|
|
"flip_method": (["x-axis: vertically", "y-axis: horizontally"],),
|
|
}}
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "flip"
|
|
|
|
CATEGORY = "latent/transform"
|
|
|
|
def flip(self, samples, flip_method):
|
|
s = samples.copy()
|
|
if flip_method.startswith("x"):
|
|
s["samples"] = torch.flip(samples["samples"], dims=[2])
|
|
elif flip_method.startswith("y"):
|
|
s["samples"] = torch.flip(samples["samples"], dims=[3])
|
|
|
|
return (s,)
|
|
|
|
class LatentComposite:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "samples_to": ("LATENT",),
|
|
"samples_from": ("LATENT",),
|
|
"x": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
"y": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
"feather": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
}}
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "composite"
|
|
|
|
CATEGORY = "latent"
|
|
|
|
def composite(self, samples_to, samples_from, x, y, composite_method="normal", feather=0):
|
|
x = x // 8
|
|
y = y // 8
|
|
feather = feather // 8
|
|
samples_out = samples_to.copy()
|
|
s = samples_to["samples"].clone()
|
|
samples_to = samples_to["samples"]
|
|
samples_from = samples_from["samples"]
|
|
if feather == 0:
|
|
s[:,:,y:y+samples_from.shape[2],x:x+samples_from.shape[3]] = samples_from[:,:,:samples_to.shape[2] - y, :samples_to.shape[3] - x]
|
|
else:
|
|
samples_from = samples_from[:,:,:samples_to.shape[2] - y, :samples_to.shape[3] - x]
|
|
mask = torch.ones_like(samples_from)
|
|
for t in range(feather):
|
|
if y != 0:
|
|
mask[:,:,t:1+t,:] *= ((1.0/feather) * (t + 1))
|
|
|
|
if y + samples_from.shape[2] < samples_to.shape[2]:
|
|
mask[:,:,mask.shape[2] -1 -t: mask.shape[2]-t,:] *= ((1.0/feather) * (t + 1))
|
|
if x != 0:
|
|
mask[:,:,:,t:1+t] *= ((1.0/feather) * (t + 1))
|
|
if x + samples_from.shape[3] < samples_to.shape[3]:
|
|
mask[:,:,:,mask.shape[3]- 1 - t: mask.shape[3]- t] *= ((1.0/feather) * (t + 1))
|
|
rev_mask = torch.ones_like(mask) - mask
|
|
s[:,:,y:y+samples_from.shape[2],x:x+samples_from.shape[3]] = samples_from[:,:,:samples_to.shape[2] - y, :samples_to.shape[3] - x] * mask + s[:,:,y:y+samples_from.shape[2],x:x+samples_from.shape[3]] * rev_mask
|
|
samples_out["samples"] = s
|
|
return (samples_out,)
|
|
|
|
class LatentBlend:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {
|
|
"samples1": ("LATENT",),
|
|
"samples2": ("LATENT",),
|
|
"blend_factor": ("FLOAT", {
|
|
"default": 0.5,
|
|
"min": 0,
|
|
"max": 1,
|
|
"step": 0.01
|
|
}),
|
|
}}
|
|
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "blend"
|
|
|
|
CATEGORY = "_for_testing"
|
|
|
|
def blend(self, samples1, samples2, blend_factor:float, blend_mode: str="normal"):
|
|
|
|
samples_out = samples1.copy()
|
|
samples1 = samples1["samples"]
|
|
samples2 = samples2["samples"]
|
|
|
|
if samples1.shape != samples2.shape:
|
|
samples2.permute(0, 3, 1, 2)
|
|
samples2 = comfy.utils.common_upscale(samples2, samples1.shape[3], samples1.shape[2], 'bicubic', crop='center')
|
|
samples2.permute(0, 2, 3, 1)
|
|
|
|
samples_blended = self.blend_mode(samples1, samples2, blend_mode)
|
|
samples_blended = samples1 * blend_factor + samples_blended * (1 - blend_factor)
|
|
samples_out["samples"] = samples_blended
|
|
return (samples_out,)
|
|
|
|
def blend_mode(self, img1, img2, mode):
|
|
if mode == "normal":
|
|
return img2
|
|
else:
|
|
raise ValueError(f"Unsupported blend mode: {mode}")
|
|
|
|
class LatentCrop:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "samples": ("LATENT",),
|
|
"width": ("INT", {"default": 512, "min": 64, "max": MAX_RESOLUTION, "step": 8}),
|
|
"height": ("INT", {"default": 512, "min": 64, "max": MAX_RESOLUTION, "step": 8}),
|
|
"x": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
"y": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
}}
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "crop"
|
|
|
|
CATEGORY = "latent/transform"
|
|
|
|
def crop(self, samples, width, height, x, y):
|
|
s = samples.copy()
|
|
samples = samples['samples']
|
|
x = x // 8
|
|
y = y // 8
|
|
|
|
#enfonce minimum size of 64
|
|
if x > (samples.shape[3] - 8):
|
|
x = samples.shape[3] - 8
|
|
if y > (samples.shape[2] - 8):
|
|
y = samples.shape[2] - 8
|
|
|
|
new_height = height // 8
|
|
new_width = width // 8
|
|
to_x = new_width + x
|
|
to_y = new_height + y
|
|
s['samples'] = samples[:,:,y:to_y, x:to_x]
|
|
return (s,)
|
|
|
|
class SetLatentNoiseMask:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "samples": ("LATENT",),
|
|
"mask": ("MASK",),
|
|
}}
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "set_mask"
|
|
|
|
CATEGORY = "latent/inpaint"
|
|
|
|
def set_mask(self, samples, mask):
|
|
s = samples.copy()
|
|
s["noise_mask"] = mask.reshape((-1, 1, mask.shape[-2], mask.shape[-1]))
|
|
return (s,)
|
|
|
|
def common_ksampler(model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent, denoise=1.0, disable_noise=False, start_step=None, last_step=None, force_full_denoise=False):
|
|
latent_image = latent["samples"]
|
|
latent_image = comfy.sample.fix_empty_latent_channels(model, latent_image)
|
|
|
|
if disable_noise:
|
|
noise = torch.zeros(latent_image.size(), dtype=latent_image.dtype, layout=latent_image.layout, device="cpu")
|
|
else:
|
|
batch_inds = latent["batch_index"] if "batch_index" in latent else None
|
|
noise = comfy.sample.prepare_noise(latent_image, seed, batch_inds)
|
|
|
|
noise_mask = None
|
|
if "noise_mask" in latent:
|
|
noise_mask = latent["noise_mask"]
|
|
|
|
callback = latent_preview.prepare_callback(model, steps)
|
|
disable_pbar = not comfy.utils.PROGRESS_BAR_ENABLED
|
|
samples = comfy.sample.sample(model, noise, steps, cfg, sampler_name, scheduler, positive, negative, latent_image,
|
|
denoise=denoise, disable_noise=disable_noise, start_step=start_step, last_step=last_step,
|
|
force_full_denoise=force_full_denoise, noise_mask=noise_mask, callback=callback, disable_pbar=disable_pbar, seed=seed)
|
|
out = latent.copy()
|
|
out["samples"] = samples
|
|
return (out, )
|
|
|
|
class KSampler:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {
|
|
"required": {
|
|
"model": ("MODEL", {"tooltip": "The model used for denoising the input latent."}),
|
|
"seed": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff, "tooltip": "The random seed used for creating the noise."}),
|
|
"steps": ("INT", {"default": 20, "min": 1, "max": 10000, "tooltip": "The number of steps used in the denoising process."}),
|
|
"cfg": ("FLOAT", {"default": 8.0, "min": 0.0, "max": 100.0, "step":0.1, "round": 0.01, "tooltip": "The Classifier-Free Guidance scale balances creativity and adherence to the prompt. Higher values result in images more closely matching the prompt however too high values will negatively impact quality."}),
|
|
"sampler_name": (comfy.samplers.KSampler.SAMPLERS, {"tooltip": "The algorithm used when sampling, this can affect the quality, speed, and style of the generated output."}),
|
|
"scheduler": (comfy.samplers.KSampler.SCHEDULERS, {"tooltip": "The scheduler controls how noise is gradually removed to form the image."}),
|
|
"positive": ("CONDITIONING", {"tooltip": "The conditioning describing the attributes you want to include in the image."}),
|
|
"negative": ("CONDITIONING", {"tooltip": "The conditioning describing the attributes you want to exclude from the image."}),
|
|
"latent_image": ("LATENT", {"tooltip": "The latent image to denoise."}),
|
|
"denoise": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01, "tooltip": "The amount of denoising applied, lower values will maintain the structure of the initial image allowing for image to image sampling."}),
|
|
}
|
|
}
|
|
|
|
RETURN_TYPES = ("LATENT",)
|
|
OUTPUT_TOOLTIPS = ("The denoised latent.",)
|
|
FUNCTION = "sample"
|
|
|
|
CATEGORY = "sampling"
|
|
DESCRIPTION = "Uses the provided model, positive and negative conditioning to denoise the latent image."
|
|
|
|
def sample(self, model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise=1.0):
|
|
return common_ksampler(model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise=denoise)
|
|
|
|
class KSamplerAdvanced:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required":
|
|
{"model": ("MODEL",),
|
|
"add_noise": (["enable", "disable"], ),
|
|
"noise_seed": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff}),
|
|
"steps": ("INT", {"default": 20, "min": 1, "max": 10000}),
|
|
"cfg": ("FLOAT", {"default": 8.0, "min": 0.0, "max": 100.0, "step":0.1, "round": 0.01}),
|
|
"sampler_name": (comfy.samplers.KSampler.SAMPLERS, ),
|
|
"scheduler": (comfy.samplers.KSampler.SCHEDULERS, ),
|
|
"positive": ("CONDITIONING", ),
|
|
"negative": ("CONDITIONING", ),
|
|
"latent_image": ("LATENT", ),
|
|
"start_at_step": ("INT", {"default": 0, "min": 0, "max": 10000}),
|
|
"end_at_step": ("INT", {"default": 10000, "min": 0, "max": 10000}),
|
|
"return_with_leftover_noise": (["disable", "enable"], ),
|
|
}
|
|
}
|
|
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "sample"
|
|
|
|
CATEGORY = "sampling"
|
|
|
|
def sample(self, model, add_noise, noise_seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, start_at_step, end_at_step, return_with_leftover_noise, denoise=1.0):
|
|
force_full_denoise = True
|
|
if return_with_leftover_noise == "enable":
|
|
force_full_denoise = False
|
|
disable_noise = False
|
|
if add_noise == "disable":
|
|
disable_noise = True
|
|
return common_ksampler(model, noise_seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise=denoise, disable_noise=disable_noise, start_step=start_at_step, last_step=end_at_step, force_full_denoise=force_full_denoise)
|
|
|
|
class SaveImage:
|
|
def __init__(self):
|
|
self.output_dir = folder_paths.get_output_directory()
|
|
self.type = "output"
|
|
self.prefix_append = ""
|
|
self.compress_level = 4
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {
|
|
"required": {
|
|
"images": ("IMAGE", {"tooltip": "The images to save."}),
|
|
"filename_prefix": ("STRING", {"default": "ComfyUI", "tooltip": "The prefix for the file to save. This may include formatting information such as %date:yyyy-MM-dd% or %Empty Latent Image.width% to include values from nodes."})
|
|
},
|
|
"hidden": {
|
|
"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO"
|
|
},
|
|
}
|
|
|
|
RETURN_TYPES = ()
|
|
FUNCTION = "save_images"
|
|
|
|
OUTPUT_NODE = True
|
|
|
|
CATEGORY = "image"
|
|
DESCRIPTION = "Saves the input images to your ComfyUI output directory."
|
|
|
|
def save_images(self, images, filename_prefix="ComfyUI", prompt=None, extra_pnginfo=None):
|
|
filename_prefix += self.prefix_append
|
|
full_output_folder, filename, counter, subfolder, filename_prefix = folder_paths.get_save_image_path(filename_prefix, self.output_dir, images[0].shape[1], images[0].shape[0])
|
|
results = list()
|
|
for (batch_number, image) in enumerate(images):
|
|
i = 255. * image.cpu().numpy()
|
|
img = Image.fromarray(np.clip(i, 0, 255).astype(np.uint8))
|
|
metadata = None
|
|
if not args.disable_metadata:
|
|
metadata = PngInfo()
|
|
if prompt is not None:
|
|
metadata.add_text("prompt", json.dumps(prompt))
|
|
if extra_pnginfo is not None:
|
|
for x in extra_pnginfo:
|
|
metadata.add_text(x, json.dumps(extra_pnginfo[x]))
|
|
|
|
filename_with_batch_num = filename.replace("%batch_num%", str(batch_number))
|
|
file = f"{filename_with_batch_num}_{counter:05}_.png"
|
|
img.save(os.path.join(full_output_folder, file), pnginfo=metadata, compress_level=self.compress_level)
|
|
results.append({
|
|
"filename": file,
|
|
"subfolder": subfolder,
|
|
"type": self.type
|
|
})
|
|
counter += 1
|
|
|
|
return { "ui": { "images": results } }
|
|
|
|
class PreviewImage(SaveImage):
|
|
def __init__(self):
|
|
self.output_dir = folder_paths.get_temp_directory()
|
|
self.type = "temp"
|
|
self.prefix_append = "_temp_" + ''.join(random.choice("abcdefghijklmnopqrstupvxyz") for x in range(5))
|
|
self.compress_level = 1
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required":
|
|
{"images": ("IMAGE", ), },
|
|
"hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO"},
|
|
}
|
|
|
|
class LoadImage:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
input_dir = folder_paths.get_input_directory()
|
|
files = [f for f in os.listdir(input_dir) if os.path.isfile(os.path.join(input_dir, f))]
|
|
return {"required":
|
|
{"image": (sorted(files), {"image_upload": True})},
|
|
}
|
|
|
|
CATEGORY = "image"
|
|
|
|
RETURN_TYPES = ("IMAGE", "MASK")
|
|
FUNCTION = "load_image"
|
|
def load_image(self, image):
|
|
image_path = folder_paths.get_annotated_filepath(image)
|
|
|
|
img = node_helpers.pillow(Image.open, image_path)
|
|
|
|
output_images = []
|
|
output_masks = []
|
|
w, h = None, None
|
|
|
|
excluded_formats = ['MPO']
|
|
|
|
for i in ImageSequence.Iterator(img):
|
|
i = node_helpers.pillow(ImageOps.exif_transpose, i)
|
|
|
|
if i.mode == 'I':
|
|
i = i.point(lambda i: i * (1 / 255))
|
|
image = i.convert("RGB")
|
|
|
|
if len(output_images) == 0:
|
|
w = image.size[0]
|
|
h = image.size[1]
|
|
|
|
if image.size[0] != w or image.size[1] != h:
|
|
continue
|
|
|
|
image = np.array(image).astype(np.float32) / 255.0
|
|
image = torch.from_numpy(image)[None,]
|
|
if 'A' in i.getbands():
|
|
mask = np.array(i.getchannel('A')).astype(np.float32) / 255.0
|
|
mask = 1. - torch.from_numpy(mask)
|
|
else:
|
|
mask = torch.zeros((64,64), dtype=torch.float32, device="cpu")
|
|
output_images.append(image)
|
|
output_masks.append(mask.unsqueeze(0))
|
|
|
|
if len(output_images) > 1 and img.format not in excluded_formats:
|
|
output_image = torch.cat(output_images, dim=0)
|
|
output_mask = torch.cat(output_masks, dim=0)
|
|
else:
|
|
output_image = output_images[0]
|
|
output_mask = output_masks[0]
|
|
|
|
return (output_image, output_mask)
|
|
|
|
@classmethod
|
|
def IS_CHANGED(s, image):
|
|
image_path = folder_paths.get_annotated_filepath(image)
|
|
m = hashlib.sha256()
|
|
with open(image_path, 'rb') as f:
|
|
m.update(f.read())
|
|
return m.digest().hex()
|
|
|
|
@classmethod
|
|
def VALIDATE_INPUTS(s, image):
|
|
if not folder_paths.exists_annotated_filepath(image):
|
|
return "Invalid image file: {}".format(image)
|
|
|
|
return True
|
|
|
|
class LoadImageMask:
|
|
_color_channels = ["alpha", "red", "green", "blue"]
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
input_dir = folder_paths.get_input_directory()
|
|
files = [f for f in os.listdir(input_dir) if os.path.isfile(os.path.join(input_dir, f))]
|
|
return {"required":
|
|
{"image": (sorted(files), {"image_upload": True}),
|
|
"channel": (s._color_channels, ), }
|
|
}
|
|
|
|
CATEGORY = "mask"
|
|
|
|
RETURN_TYPES = ("MASK",)
|
|
FUNCTION = "load_image"
|
|
def load_image(self, image, channel):
|
|
image_path = folder_paths.get_annotated_filepath(image)
|
|
i = node_helpers.pillow(Image.open, image_path)
|
|
i = node_helpers.pillow(ImageOps.exif_transpose, i)
|
|
if i.getbands() != ("R", "G", "B", "A"):
|
|
if i.mode == 'I':
|
|
i = i.point(lambda i: i * (1 / 255))
|
|
i = i.convert("RGBA")
|
|
mask = None
|
|
c = channel[0].upper()
|
|
if c in i.getbands():
|
|
mask = np.array(i.getchannel(c)).astype(np.float32) / 255.0
|
|
mask = torch.from_numpy(mask)
|
|
if c == 'A':
|
|
mask = 1. - mask
|
|
else:
|
|
mask = torch.zeros((64,64), dtype=torch.float32, device="cpu")
|
|
return (mask.unsqueeze(0),)
|
|
|
|
@classmethod
|
|
def IS_CHANGED(s, image, channel):
|
|
image_path = folder_paths.get_annotated_filepath(image)
|
|
m = hashlib.sha256()
|
|
with open(image_path, 'rb') as f:
|
|
m.update(f.read())
|
|
return m.digest().hex()
|
|
|
|
@classmethod
|
|
def VALIDATE_INPUTS(s, image):
|
|
if not folder_paths.exists_annotated_filepath(image):
|
|
return "Invalid image file: {}".format(image)
|
|
|
|
return True
|
|
|
|
class ImageScale:
|
|
upscale_methods = ["nearest-exact", "bilinear", "area", "bicubic", "lanczos"]
|
|
crop_methods = ["disabled", "center"]
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "image": ("IMAGE",), "upscale_method": (s.upscale_methods,),
|
|
"width": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 1}),
|
|
"height": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 1}),
|
|
"crop": (s.crop_methods,)}}
|
|
RETURN_TYPES = ("IMAGE",)
|
|
FUNCTION = "upscale"
|
|
|
|
CATEGORY = "image/upscaling"
|
|
|
|
def upscale(self, image, upscale_method, width, height, crop):
|
|
if width == 0 and height == 0:
|
|
s = image
|
|
else:
|
|
samples = image.movedim(-1,1)
|
|
|
|
if width == 0:
|
|
width = max(1, round(samples.shape[3] * height / samples.shape[2]))
|
|
elif height == 0:
|
|
height = max(1, round(samples.shape[2] * width / samples.shape[3]))
|
|
|
|
s = comfy.utils.common_upscale(samples, width, height, upscale_method, crop)
|
|
s = s.movedim(1,-1)
|
|
return (s,)
|
|
|
|
class ImageScaleBy:
|
|
upscale_methods = ["nearest-exact", "bilinear", "area", "bicubic", "lanczos"]
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "image": ("IMAGE",), "upscale_method": (s.upscale_methods,),
|
|
"scale_by": ("FLOAT", {"default": 1.0, "min": 0.01, "max": 8.0, "step": 0.01}),}}
|
|
RETURN_TYPES = ("IMAGE",)
|
|
FUNCTION = "upscale"
|
|
|
|
CATEGORY = "image/upscaling"
|
|
|
|
def upscale(self, image, upscale_method, scale_by):
|
|
samples = image.movedim(-1,1)
|
|
width = round(samples.shape[3] * scale_by)
|
|
height = round(samples.shape[2] * scale_by)
|
|
s = comfy.utils.common_upscale(samples, width, height, upscale_method, "disabled")
|
|
s = s.movedim(1,-1)
|
|
return (s,)
|
|
|
|
class ImageInvert:
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "image": ("IMAGE",)}}
|
|
|
|
RETURN_TYPES = ("IMAGE",)
|
|
FUNCTION = "invert"
|
|
|
|
CATEGORY = "image"
|
|
|
|
def invert(self, image):
|
|
s = 1.0 - image
|
|
return (s,)
|
|
|
|
class ImageBatch:
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "image1": ("IMAGE",), "image2": ("IMAGE",)}}
|
|
|
|
RETURN_TYPES = ("IMAGE",)
|
|
FUNCTION = "batch"
|
|
|
|
CATEGORY = "image"
|
|
|
|
def batch(self, image1, image2):
|
|
if image1.shape[1:] != image2.shape[1:]:
|
|
image2 = comfy.utils.common_upscale(image2.movedim(-1,1), image1.shape[2], image1.shape[1], "bilinear", "center").movedim(1,-1)
|
|
s = torch.cat((image1, image2), dim=0)
|
|
return (s,)
|
|
|
|
class EmptyImage:
|
|
def __init__(self, device="cpu"):
|
|
self.device = device
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "width": ("INT", {"default": 512, "min": 1, "max": MAX_RESOLUTION, "step": 1}),
|
|
"height": ("INT", {"default": 512, "min": 1, "max": MAX_RESOLUTION, "step": 1}),
|
|
"batch_size": ("INT", {"default": 1, "min": 1, "max": 4096}),
|
|
"color": ("INT", {"default": 0, "min": 0, "max": 0xFFFFFF, "step": 1, "display": "color"}),
|
|
}}
|
|
RETURN_TYPES = ("IMAGE",)
|
|
FUNCTION = "generate"
|
|
|
|
CATEGORY = "image"
|
|
|
|
def generate(self, width, height, batch_size=1, color=0):
|
|
r = torch.full([batch_size, height, width, 1], ((color >> 16) & 0xFF) / 0xFF)
|
|
g = torch.full([batch_size, height, width, 1], ((color >> 8) & 0xFF) / 0xFF)
|
|
b = torch.full([batch_size, height, width, 1], ((color) & 0xFF) / 0xFF)
|
|
return (torch.cat((r, g, b), dim=-1), )
|
|
|
|
class ImagePadForOutpaint:
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {
|
|
"required": {
|
|
"image": ("IMAGE",),
|
|
"left": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
"top": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
"right": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
"bottom": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
"feathering": ("INT", {"default": 40, "min": 0, "max": MAX_RESOLUTION, "step": 1}),
|
|
}
|
|
}
|
|
|
|
RETURN_TYPES = ("IMAGE", "MASK")
|
|
FUNCTION = "expand_image"
|
|
|
|
CATEGORY = "image"
|
|
|
|
def expand_image(self, image, left, top, right, bottom, feathering):
|
|
d1, d2, d3, d4 = image.size()
|
|
|
|
new_image = torch.ones(
|
|
(d1, d2 + top + bottom, d3 + left + right, d4),
|
|
dtype=torch.float32,
|
|
) * 0.5
|
|
|
|
new_image[:, top:top + d2, left:left + d3, :] = image
|
|
|
|
mask = torch.ones(
|
|
(d2 + top + bottom, d3 + left + right),
|
|
dtype=torch.float32,
|
|
)
|
|
|
|
t = torch.zeros(
|
|
(d2, d3),
|
|
dtype=torch.float32
|
|
)
|
|
|
|
if feathering > 0 and feathering * 2 < d2 and feathering * 2 < d3:
|
|
|
|
for i in range(d2):
|
|
for j in range(d3):
|
|
dt = i if top != 0 else d2
|
|
db = d2 - i if bottom != 0 else d2
|
|
|
|
dl = j if left != 0 else d3
|
|
dr = d3 - j if right != 0 else d3
|
|
|
|
d = min(dt, db, dl, dr)
|
|
|
|
if d >= feathering:
|
|
continue
|
|
|
|
v = (feathering - d) / feathering
|
|
|
|
t[i, j] = v * v
|
|
|
|
mask[top:top + d2, left:left + d3] = t
|
|
|
|
return (new_image, mask)
|
|
|
|
|
|
NODE_CLASS_MAPPINGS = {
|
|
"KSampler": KSampler,
|
|
"CheckpointLoaderSimple": CheckpointLoaderSimple,
|
|
"CLIPTextEncode": CLIPTextEncode,
|
|
"CLIPSetLastLayer": CLIPSetLastLayer,
|
|
"VAEDecode": VAEDecode,
|
|
"VAEEncode": VAEEncode,
|
|
"VAEEncodeForInpaint": VAEEncodeForInpaint,
|
|
"VAELoader": VAELoader,
|
|
"EmptyLatentImage": EmptyLatentImage,
|
|
"LatentUpscale": LatentUpscale,
|
|
"LatentUpscaleBy": LatentUpscaleBy,
|
|
"LatentFromBatch": LatentFromBatch,
|
|
"RepeatLatentBatch": RepeatLatentBatch,
|
|
"SaveImage": SaveImage,
|
|
"PreviewImage": PreviewImage,
|
|
"LoadImage": LoadImage,
|
|
"LoadImageMask": LoadImageMask,
|
|
"ImageScale": ImageScale,
|
|
"ImageScaleBy": ImageScaleBy,
|
|
"ImageInvert": ImageInvert,
|
|
"ImageBatch": ImageBatch,
|
|
"ImagePadForOutpaint": ImagePadForOutpaint,
|
|
"EmptyImage": EmptyImage,
|
|
"ConditioningAverage": ConditioningAverage ,
|
|
"ConditioningCombine": ConditioningCombine,
|
|
"ConditioningConcat": ConditioningConcat,
|
|
"ConditioningSetArea": ConditioningSetArea,
|
|
"ConditioningSetAreaPercentage": ConditioningSetAreaPercentage,
|
|
"ConditioningSetAreaStrength": ConditioningSetAreaStrength,
|
|
"ConditioningSetMask": ConditioningSetMask,
|
|
"KSamplerAdvanced": KSamplerAdvanced,
|
|
"SetLatentNoiseMask": SetLatentNoiseMask,
|
|
"LatentComposite": LatentComposite,
|
|
"LatentBlend": LatentBlend,
|
|
"LatentRotate": LatentRotate,
|
|
"LatentFlip": LatentFlip,
|
|
"LatentCrop": LatentCrop,
|
|
"LoraLoader": LoraLoader,
|
|
"CLIPLoader": CLIPLoader,
|
|
"UNETLoader": UNETLoader,
|
|
"DualCLIPLoader": DualCLIPLoader,
|
|
"CLIPVisionEncode": CLIPVisionEncode,
|
|
"StyleModelApply": StyleModelApply,
|
|
"unCLIPConditioning": unCLIPConditioning,
|
|
"ControlNetApply": ControlNetApply,
|
|
"ControlNetApplyAdvanced": ControlNetApplyAdvanced,
|
|
"ControlNetLoader": ControlNetLoader,
|
|
"DiffControlNetLoader": DiffControlNetLoader,
|
|
"StyleModelLoader": StyleModelLoader,
|
|
"CLIPVisionLoader": CLIPVisionLoader,
|
|
"VAEDecodeTiled": VAEDecodeTiled,
|
|
"VAEEncodeTiled": VAEEncodeTiled,
|
|
"unCLIPCheckpointLoader": unCLIPCheckpointLoader,
|
|
"GLIGENLoader": GLIGENLoader,
|
|
"GLIGENTextBoxApply": GLIGENTextBoxApply,
|
|
"InpaintModelConditioning": InpaintModelConditioning,
|
|
|
|
"CheckpointLoader": CheckpointLoader,
|
|
"DiffusersLoader": DiffusersLoader,
|
|
|
|
"LoadLatent": LoadLatent,
|
|
"SaveLatent": SaveLatent,
|
|
|
|
"ConditioningZeroOut": ConditioningZeroOut,
|
|
"ConditioningSetTimestepRange": ConditioningSetTimestepRange,
|
|
"LoraLoaderModelOnly": LoraLoaderModelOnly,
|
|
}
|
|
|
|
NODE_DISPLAY_NAME_MAPPINGS = {
|
|
# Sampling
|
|
"KSampler": "KSampler",
|
|
"KSamplerAdvanced": "KSampler (Advanced)",
|
|
# Loaders
|
|
"CheckpointLoader": "Load Checkpoint With Config (DEPRECATED)",
|
|
"CheckpointLoaderSimple": "Load Checkpoint",
|
|
"VAELoader": "Load VAE",
|
|
"LoraLoader": "Load LoRA",
|
|
"CLIPLoader": "Load CLIP",
|
|
"ControlNetLoader": "Load ControlNet Model",
|
|
"DiffControlNetLoader": "Load ControlNet Model (diff)",
|
|
"StyleModelLoader": "Load Style Model",
|
|
"CLIPVisionLoader": "Load CLIP Vision",
|
|
"UpscaleModelLoader": "Load Upscale Model",
|
|
"UNETLoader": "Load Diffusion Model",
|
|
# Conditioning
|
|
"CLIPVisionEncode": "CLIP Vision Encode",
|
|
"StyleModelApply": "Apply Style Model",
|
|
"CLIPTextEncode": "CLIP Text Encode (Prompt)",
|
|
"CLIPSetLastLayer": "CLIP Set Last Layer",
|
|
"ConditioningCombine": "Conditioning (Combine)",
|
|
"ConditioningAverage ": "Conditioning (Average)",
|
|
"ConditioningConcat": "Conditioning (Concat)",
|
|
"ConditioningSetArea": "Conditioning (Set Area)",
|
|
"ConditioningSetAreaPercentage": "Conditioning (Set Area with Percentage)",
|
|
"ConditioningSetMask": "Conditioning (Set Mask)",
|
|
"ControlNetApply": "Apply ControlNet (OLD)",
|
|
"ControlNetApplyAdvanced": "Apply ControlNet",
|
|
# Latent
|
|
"VAEEncodeForInpaint": "VAE Encode (for Inpainting)",
|
|
"SetLatentNoiseMask": "Set Latent Noise Mask",
|
|
"VAEDecode": "VAE Decode",
|
|
"VAEEncode": "VAE Encode",
|
|
"LatentRotate": "Rotate Latent",
|
|
"LatentFlip": "Flip Latent",
|
|
"LatentCrop": "Crop Latent",
|
|
"EmptyLatentImage": "Empty Latent Image",
|
|
"LatentUpscale": "Upscale Latent",
|
|
"LatentUpscaleBy": "Upscale Latent By",
|
|
"LatentComposite": "Latent Composite",
|
|
"LatentBlend": "Latent Blend",
|
|
"LatentFromBatch" : "Latent From Batch",
|
|
"RepeatLatentBatch": "Repeat Latent Batch",
|
|
# Image
|
|
"SaveImage": "Save Image",
|
|
"PreviewImage": "Preview Image",
|
|
"LoadImage": "Load Image",
|
|
"LoadImageMask": "Load Image (as Mask)",
|
|
"ImageScale": "Upscale Image",
|
|
"ImageScaleBy": "Upscale Image By",
|
|
"ImageUpscaleWithModel": "Upscale Image (using Model)",
|
|
"ImageInvert": "Invert Image",
|
|
"ImagePadForOutpaint": "Pad Image for Outpainting",
|
|
"ImageBatch": "Batch Images",
|
|
"ImageCrop": "Image Crop",
|
|
"ImageBlend": "Image Blend",
|
|
"ImageBlur": "Image Blur",
|
|
"ImageQuantize": "Image Quantize",
|
|
"ImageSharpen": "Image Sharpen",
|
|
"ImageScaleToTotalPixels": "Scale Image to Total Pixels",
|
|
# _for_testing
|
|
"VAEDecodeTiled": "VAE Decode (Tiled)",
|
|
"VAEEncodeTiled": "VAE Encode (Tiled)",
|
|
}
|
|
|
|
EXTENSION_WEB_DIRS = {}
|
|
|
|
|
|
def get_module_name(module_path: str) -> str:
|
|
"""
|
|
Returns the module name based on the given module path.
|
|
Examples:
|
|
get_module_name("C:/Users/username/ComfyUI/custom_nodes/my_custom_node.py") -> "my_custom_node"
|
|
get_module_name("C:/Users/username/ComfyUI/custom_nodes/my_custom_node") -> "my_custom_node"
|
|
get_module_name("C:/Users/username/ComfyUI/custom_nodes/my_custom_node/") -> "my_custom_node"
|
|
get_module_name("C:/Users/username/ComfyUI/custom_nodes/my_custom_node/__init__.py") -> "my_custom_node"
|
|
get_module_name("C:/Users/username/ComfyUI/custom_nodes/my_custom_node/__init__") -> "my_custom_node"
|
|
get_module_name("C:/Users/username/ComfyUI/custom_nodes/my_custom_node/__init__/") -> "my_custom_node"
|
|
get_module_name("C:/Users/username/ComfyUI/custom_nodes/my_custom_node.disabled") -> "custom_nodes
|
|
Args:
|
|
module_path (str): The path of the module.
|
|
Returns:
|
|
str: The module name.
|
|
"""
|
|
base_path = os.path.basename(module_path)
|
|
if os.path.isfile(module_path):
|
|
base_path = os.path.splitext(base_path)[0]
|
|
return base_path
|
|
|
|
|
|
def load_custom_node(module_path: str, ignore=set(), module_parent="custom_nodes") -> bool:
|
|
module_name = os.path.basename(module_path)
|
|
if os.path.isfile(module_path):
|
|
sp = os.path.splitext(module_path)
|
|
module_name = sp[0]
|
|
try:
|
|
logging.debug("Trying to load custom node {}".format(module_path))
|
|
if os.path.isfile(module_path):
|
|
module_spec = importlib.util.spec_from_file_location(module_name, module_path)
|
|
module_dir = os.path.split(module_path)[0]
|
|
else:
|
|
module_spec = importlib.util.spec_from_file_location(module_name, os.path.join(module_path, "__init__.py"))
|
|
module_dir = module_path
|
|
|
|
module = importlib.util.module_from_spec(module_spec)
|
|
sys.modules[module_name] = module
|
|
module_spec.loader.exec_module(module)
|
|
|
|
if hasattr(module, "WEB_DIRECTORY") and getattr(module, "WEB_DIRECTORY") is not None:
|
|
web_dir = os.path.abspath(os.path.join(module_dir, getattr(module, "WEB_DIRECTORY")))
|
|
if os.path.isdir(web_dir):
|
|
EXTENSION_WEB_DIRS[module_name] = web_dir
|
|
|
|
if hasattr(module, "NODE_CLASS_MAPPINGS") and getattr(module, "NODE_CLASS_MAPPINGS") is not None:
|
|
for name, node_cls in module.NODE_CLASS_MAPPINGS.items():
|
|
if name not in ignore:
|
|
NODE_CLASS_MAPPINGS[name] = node_cls
|
|
node_cls.RELATIVE_PYTHON_MODULE = "{}.{}".format(module_parent, get_module_name(module_path))
|
|
if hasattr(module, "NODE_DISPLAY_NAME_MAPPINGS") and getattr(module, "NODE_DISPLAY_NAME_MAPPINGS") is not None:
|
|
NODE_DISPLAY_NAME_MAPPINGS.update(module.NODE_DISPLAY_NAME_MAPPINGS)
|
|
return True
|
|
else:
|
|
logging.warning(f"Skip {module_path} module for custom nodes due to the lack of NODE_CLASS_MAPPINGS.")
|
|
return False
|
|
except Exception as e:
|
|
logging.warning(traceback.format_exc())
|
|
logging.warning(f"Cannot import {module_path} module for custom nodes: {e}")
|
|
return False
|
|
|
|
def init_external_custom_nodes():
|
|
"""
|
|
Initializes the external custom nodes.
|
|
|
|
This function loads custom nodes from the specified folder paths and imports them into the application.
|
|
It measures the import times for each custom node and logs the results.
|
|
|
|
Returns:
|
|
None
|
|
"""
|
|
base_node_names = set(NODE_CLASS_MAPPINGS.keys())
|
|
node_paths = folder_paths.get_folder_paths("custom_nodes")
|
|
node_import_times = []
|
|
for custom_node_path in node_paths:
|
|
possible_modules = os.listdir(os.path.realpath(custom_node_path))
|
|
if "__pycache__" in possible_modules:
|
|
possible_modules.remove("__pycache__")
|
|
|
|
for possible_module in possible_modules:
|
|
module_path = os.path.join(custom_node_path, possible_module)
|
|
if os.path.isfile(module_path) and os.path.splitext(module_path)[1] != ".py": continue
|
|
if module_path.endswith(".disabled"): continue
|
|
time_before = time.perf_counter()
|
|
success = load_custom_node(module_path, base_node_names, module_parent="custom_nodes")
|
|
node_import_times.append((time.perf_counter() - time_before, module_path, success))
|
|
|
|
if len(node_import_times) > 0:
|
|
logging.info("\nImport times for custom nodes:")
|
|
for n in sorted(node_import_times):
|
|
if n[2]:
|
|
import_message = ""
|
|
else:
|
|
import_message = " (IMPORT FAILED)"
|
|
logging.info("{:6.1f} seconds{}: {}".format(n[0], import_message, n[1]))
|
|
logging.info("")
|
|
|
|
def init_builtin_extra_nodes():
|
|
"""
|
|
Initializes the built-in extra nodes in ComfyUI.
|
|
|
|
This function loads the extra node files located in the "comfy_extras" directory and imports them into ComfyUI.
|
|
If any of the extra node files fail to import, a warning message is logged.
|
|
|
|
Returns:
|
|
None
|
|
"""
|
|
extras_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "comfy_extras")
|
|
extras_files = [
|
|
"nodes_latent.py",
|
|
"nodes_hypernetwork.py",
|
|
"nodes_upscale_model.py",
|
|
"nodes_post_processing.py",
|
|
"nodes_mask.py",
|
|
"nodes_compositing.py",
|
|
"nodes_rebatch.py",
|
|
"nodes_model_merging.py",
|
|
"nodes_tomesd.py",
|
|
"nodes_clip_sdxl.py",
|
|
"nodes_canny.py",
|
|
"nodes_freelunch.py",
|
|
"nodes_custom_sampler.py",
|
|
"nodes_hypertile.py",
|
|
"nodes_model_advanced.py",
|
|
"nodes_model_downscale.py",
|
|
"nodes_images.py",
|
|
"nodes_video_model.py",
|
|
"nodes_sag.py",
|
|
"nodes_perpneg.py",
|
|
"nodes_stable3d.py",
|
|
"nodes_sdupscale.py",
|
|
"nodes_photomaker.py",
|
|
"nodes_cond.py",
|
|
"nodes_morphology.py",
|
|
"nodes_stable_cascade.py",
|
|
"nodes_differential_diffusion.py",
|
|
"nodes_ip2p.py",
|
|
"nodes_model_merging_model_specific.py",
|
|
"nodes_pag.py",
|
|
"nodes_align_your_steps.py",
|
|
"nodes_attention_multiply.py",
|
|
"nodes_advanced_samplers.py",
|
|
"nodes_webcam.py",
|
|
"nodes_audio.py",
|
|
"nodes_sd3.py",
|
|
"nodes_gits.py",
|
|
"nodes_controlnet.py",
|
|
"nodes_hunyuan.py",
|
|
"nodes_flux.py",
|
|
"nodes_lora_extract.py",
|
|
"nodes_torch_compile.py",
|
|
"nodes_mochi.py",
|
|
"nodes_slg.py",
|
|
"nodes_lt.py",
|
|
]
|
|
|
|
import_failed = []
|
|
for node_file in extras_files:
|
|
if not load_custom_node(os.path.join(extras_dir, node_file), module_parent="comfy_extras"):
|
|
import_failed.append(node_file)
|
|
|
|
return import_failed
|
|
|
|
|
|
def init_extra_nodes(init_custom_nodes=True):
|
|
import_failed = init_builtin_extra_nodes()
|
|
|
|
if init_custom_nodes:
|
|
init_external_custom_nodes()
|
|
else:
|
|
logging.info("Skipping loading of custom nodes")
|
|
|
|
if len(import_failed) > 0:
|
|
logging.warning("WARNING: some comfy_extras/ nodes did not import correctly. This may be because they are missing some dependencies.\n")
|
|
for node in import_failed:
|
|
logging.warning("IMPORT FAILED: {}".format(node))
|
|
logging.warning("\nThis issue might be caused by new missing dependencies added the last time you updated ComfyUI.")
|
|
if args.windows_standalone_build:
|
|
logging.warning("Please run the update script: update/update_comfyui.bat")
|
|
else:
|
|
logging.warning("Please do a: pip install -r requirements.txt")
|
|
logging.warning("")
|
|
|
|
return import_failed
|