Some checks failed
Python Linting / Run Ruff (push) Has been cancelled
Python Linting / Run Pylint (push) Has been cancelled
Full Comfy CI Workflow Runs / test-stable (12.1, , linux, 3.10, [self-hosted Linux], stable) (push) Has been cancelled
Full Comfy CI Workflow Runs / test-stable (12.1, , linux, 3.11, [self-hosted Linux], stable) (push) Has been cancelled
Full Comfy CI Workflow Runs / test-stable (12.1, , linux, 3.12, [self-hosted Linux], stable) (push) Has been cancelled
Full Comfy CI Workflow Runs / test-unix-nightly (12.1, , linux, 3.11, [self-hosted Linux], nightly) (push) Has been cancelled
Execution Tests / test (macos-latest) (push) Has been cancelled
Execution Tests / test (ubuntu-latest) (push) Has been cancelled
Execution Tests / test (windows-latest) (push) Has been cancelled
Test server launches without errors / test (push) Has been cancelled
Unit Tests / test (macos-latest) (push) Has been cancelled
Unit Tests / test (ubuntu-latest) (push) Has been cancelled
Unit Tests / test (windows-2022) (push) Has been cancelled
Includes 30 custom nodes committed directly, 7 Civitai-exclusive loras stored via Git LFS, and a setup script that installs all dependencies and downloads HuggingFace-hosted models on vast.ai. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
585 lines
33 KiB
Python
585 lines
33 KiB
Python
import os
|
|
from datetime import datetime
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Any
|
|
import json
|
|
import numpy as np
|
|
import re
|
|
|
|
from PIL import Image
|
|
import torch
|
|
|
|
import folder_paths
|
|
from nodes import MAX_RESOLUTION
|
|
|
|
from .saver.saver import save_image
|
|
from .utils import sanitize_filename, get_sha256, full_checkpoint_path_for
|
|
from .utils_civitai import get_civitai_sampler_name, get_civitai_metadata, MAX_HASH_LENGTH
|
|
from .prompt_metadata_extractor import PromptMetadataExtractor
|
|
|
|
def parse_checkpoint_name(ckpt_name: str) -> str:
|
|
return os.path.basename(ckpt_name)
|
|
|
|
def parse_checkpoint_name_without_extension(ckpt_name: str) -> str:
|
|
filename = parse_checkpoint_name(ckpt_name)
|
|
name_without_ext, ext = os.path.splitext(filename)
|
|
supported_extensions = folder_paths.supported_pt_extensions | {".gguf"}
|
|
|
|
# Only remove extension if it's a known model file extension
|
|
if ext.lower() in supported_extensions:
|
|
return name_without_ext
|
|
else:
|
|
return filename # Keep full name if extension isn't recognized
|
|
|
|
def get_timestamp(time_format: str) -> str:
|
|
now = datetime.now()
|
|
try:
|
|
timestamp = now.strftime(time_format)
|
|
except:
|
|
timestamp = now.strftime("%Y-%m-%d-%H%M%S")
|
|
|
|
return timestamp
|
|
|
|
def apply_custom_time_format(filename: str) -> str:
|
|
"""
|
|
Replace %time_format<strftime_format> patterns with formatted datetime.
|
|
Example: %time_format<%Y-%m-%d> becomes 2026-01-17
|
|
"""
|
|
now = datetime.now()
|
|
# Pattern to match %time_format<XXX> where XXX is any strftime format string
|
|
# Use negative lookahead to exclude %time_format itself from variable delimiters
|
|
pattern = r'%time_format<([^>]*)>'
|
|
def replace_format(match):
|
|
format_str = match.group(1)
|
|
try:
|
|
return now.strftime(format_str)
|
|
except:
|
|
# If format is invalid, return original
|
|
return match.group(0)
|
|
|
|
return re.sub(pattern, replace_format, filename)
|
|
|
|
def save_json(image_info: dict[str, Any] | None, filename: str) -> None:
|
|
try:
|
|
workflow = (image_info or {}).get('workflow')
|
|
if workflow is None:
|
|
print('No image info found, skipping saving of JSON')
|
|
with open(f'{filename}.json', 'w') as workflow_file:
|
|
json.dump(workflow, workflow_file)
|
|
print(f'Saved workflow to {filename}.json')
|
|
except Exception as e:
|
|
print(f'Failed to save workflow as json due to: {e}, proceeding with the remainder of saving execution')
|
|
|
|
def make_pathname(filename: str, width: int, height: int, seed: int, modelname: str, counter: int, time_format: str, sampler_name: str, steps: int, cfg: float, scheduler_name: str, denoise: float, clip_skip: int, custom: str) -> str:
|
|
# Process custom time_format patterns first
|
|
filename = apply_custom_time_format(filename)
|
|
filename = filename.replace("%date", get_timestamp("%Y-%m-%d"))
|
|
filename = filename.replace("%time", get_timestamp(time_format))
|
|
filename = filename.replace("%model", parse_checkpoint_name(modelname))
|
|
filename = filename.replace("%width", str(width))
|
|
filename = filename.replace("%height", str(height))
|
|
filename = filename.replace("%seed", str(seed))
|
|
filename = filename.replace("%counter", str(counter))
|
|
filename = filename.replace("%sampler_name", sampler_name)
|
|
filename = filename.replace("%steps", str(steps))
|
|
filename = filename.replace("%cfg", str(cfg))
|
|
filename = filename.replace("%scheduler_name", scheduler_name)
|
|
filename = filename.replace("%basemodelname", parse_checkpoint_name_without_extension(modelname))
|
|
filename = filename.replace("%denoise", str(denoise))
|
|
filename = filename.replace("%clip_skip", str(clip_skip))
|
|
filename = filename.replace("%custom", custom)
|
|
|
|
directory, basename = os.path.split(filename)
|
|
sanitized_basename = sanitize_filename(basename)
|
|
return os.path.join(directory, sanitized_basename)
|
|
|
|
def make_filename(filename: str, width: int, height: int, seed: int, modelname: str, counter: int, time_format: str, sampler_name: str, steps: int, cfg: float, scheduler_name: str, denoise: float, clip_skip: int, custom: str) -> str:
|
|
filename = make_pathname(filename, width, height, seed, modelname, counter, time_format, sampler_name, steps, cfg, scheduler_name, denoise, clip_skip, custom)
|
|
return get_timestamp(time_format) if filename == "" else filename
|
|
|
|
@dataclass
|
|
class Metadata:
|
|
modelname: str
|
|
positive: str
|
|
negative: str
|
|
width: int
|
|
height: int
|
|
seed: int
|
|
steps: int
|
|
cfg: float
|
|
sampler_name: str
|
|
scheduler_name: str
|
|
denoise: float
|
|
clip_skip: int
|
|
custom: str
|
|
additional_hashes: str
|
|
ckpt_path: str
|
|
a111_params: str
|
|
final_hashes: str
|
|
|
|
class ImageSaverMetadata:
|
|
@classmethod
|
|
def INPUT_TYPES(cls) -> dict[str, Any]:
|
|
return {
|
|
"optional": {
|
|
"modelname": ("STRING", {"default": '', "multiline": False, "tooltip": "model name (can be multiple, separated by commas)"}),
|
|
"positive": ("STRING", {"default": 'unknown', "multiline": True, "tooltip": "positive prompt"}),
|
|
"negative": ("STRING", {"default": 'unknown', "multiline": True, "tooltip": "negative prompt"}),
|
|
"width": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 8, "tooltip": "image width"}),
|
|
"height": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 8, "tooltip": "image height"}),
|
|
"seed_value": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff, "tooltip": "seed"}),
|
|
"steps": ("INT", {"default": 20, "min": 1, "max": 10000, "tooltip": "number of steps"}),
|
|
"cfg": ("FLOAT", {"default": 7.0, "min": 0.0, "max": 100.0, "tooltip": "CFG value"}),
|
|
"sampler_name": ("STRING", {"default": '', "multiline": False, "tooltip": "sampler name (as string)"}),
|
|
"scheduler_name": ("STRING", {"default": 'normal', "multiline": False, "tooltip": "scheduler name (as string)"}),
|
|
"denoise": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "tooltip": "denoise value"}),
|
|
"clip_skip": ("INT", {"default": 0, "min": -24, "max": 24, "tooltip": "skip last CLIP layers (positive or negative value, 0 for no skip)"}),
|
|
"additional_hashes": ("STRING", {"default": "", "multiline": False, "tooltip": "hashes separated by commas, optionally with names. 'Name:HASH' (e.g., 'MyLoRA:FF735FF83F98')\nWith download_civitai_data set to true, weights can be added as well. (e.g., 'HASH:Weight', 'Name:HASH:Weight')"}),
|
|
"download_civitai_data": ("BOOLEAN", {"default": True, "tooltip": "Download and cache data from civitai.com to save correct metadata. Allows LoRA weights to be saved to the metadata."}),
|
|
"easy_remix": ("BOOLEAN", {"default": True, "tooltip": "Strip LoRAs and simplify 'embedding:path' from the prompt to make the Remix option on civitai.com more seamless."}),
|
|
"custom": ("STRING", {"default": "", "multiline": False, "tooltip": "custom string to add to the metadata, inserted into the a111 string between clip skip and model hash"}),
|
|
},
|
|
}
|
|
|
|
RETURN_TYPES = ("METADATA","STRING","STRING")
|
|
RETURN_NAMES = ("metadata","hashes","a1111_params")
|
|
OUTPUT_TOOLTIPS = ("metadata for Image Saver Simple","Comma-separated list of the hashes to chain with other Image Saver additional_hashes","Written parameters to the image metadata")
|
|
FUNCTION = "get_metadata"
|
|
CATEGORY = "ImageSaver"
|
|
DESCRIPTION = "Prepare metadata for Image Saver Simple"
|
|
|
|
def get_metadata(
|
|
self,
|
|
modelname: str = "",
|
|
positive: str = "unknown",
|
|
negative: str = "unknown",
|
|
width: int = 512,
|
|
height: int = 512,
|
|
seed_value: int = 0,
|
|
steps: int = 20,
|
|
cfg: float = 7.0,
|
|
sampler_name: str = "",
|
|
scheduler_name: str = "normal",
|
|
denoise: float = 1.0,
|
|
clip_skip: int = 0,
|
|
custom: str = "",
|
|
additional_hashes: str = "",
|
|
download_civitai_data: bool = True,
|
|
easy_remix: bool = True,
|
|
) -> tuple[Metadata, str, str]:
|
|
metadata = ImageSaverMetadata.make_metadata(modelname, positive, negative, width, height, seed_value, steps, cfg, sampler_name, scheduler_name, denoise, clip_skip, custom, additional_hashes, download_civitai_data, easy_remix)
|
|
return (metadata, metadata.final_hashes, metadata.a111_params)
|
|
|
|
@staticmethod
|
|
def make_metadata(modelname: str, positive: str, negative: str, width: int, height: int, seed_value: int, steps: int, cfg: float, sampler_name: str, scheduler_name: str, denoise: float, clip_skip: int, custom: str, additional_hashes: str, download_civitai_data: bool, easy_remix: bool) -> Metadata:
|
|
modelname, additional_hashes = ImageSaver.get_multiple_models(modelname, additional_hashes)
|
|
|
|
ckpt_path = full_checkpoint_path_for(modelname)
|
|
if ckpt_path:
|
|
modelhash = get_sha256(ckpt_path)[:10]
|
|
else:
|
|
modelhash = ""
|
|
|
|
metadata_extractor = PromptMetadataExtractor([positive, negative])
|
|
embeddings = metadata_extractor.get_embeddings()
|
|
loras = metadata_extractor.get_loras()
|
|
civitai_sampler_name = get_civitai_sampler_name(sampler_name.replace('_gpu', ''), scheduler_name)
|
|
basemodelname = parse_checkpoint_name_without_extension(modelname)
|
|
|
|
# Get existing hashes from model, loras, and embeddings
|
|
existing_hashes = {modelhash.lower()} | {t[2].lower() for t in loras.values()} | {t[2].lower() for t in embeddings.values()}
|
|
# Parse manual hashes
|
|
manual_entries = ImageSaver.parse_manual_hashes(additional_hashes, existing_hashes, download_civitai_data)
|
|
# Get Civitai metadata
|
|
civitai_resources, hashes, add_model_hash = get_civitai_metadata(modelname, ckpt_path, modelhash, loras, embeddings, manual_entries, download_civitai_data)
|
|
|
|
if easy_remix:
|
|
positive = ImageSaver.clean_prompt(positive, metadata_extractor)
|
|
negative = ImageSaver.clean_prompt(negative, metadata_extractor)
|
|
|
|
positive_a111_params = positive.strip()
|
|
negative_a111_params = f"\nNegative prompt: {negative.strip()}"
|
|
clip_skip_str = f", Clip skip: {abs(clip_skip)}" if clip_skip != 0 else ""
|
|
custom_str = f", {custom}" if custom else ""
|
|
model_hash_str = f", Model hash: {add_model_hash}" if add_model_hash else ""
|
|
hashes_str = f", Hashes: {json.dumps(hashes, separators=(',', ':'))}" if hashes else ""
|
|
|
|
a111_params = (
|
|
f"{positive_a111_params}{negative_a111_params}\n"
|
|
f"Steps: {steps}, Sampler: {civitai_sampler_name}, CFG scale: {cfg}, Seed: {seed_value}, "
|
|
f"Size: {width}x{height}{clip_skip_str}{custom_str}{model_hash_str}, Model: {basemodelname}{hashes_str}, Version: ComfyUI"
|
|
)
|
|
|
|
# Add Civitai resource listing
|
|
if download_civitai_data and civitai_resources:
|
|
a111_params += f", Civitai resources: {json.dumps(civitai_resources, separators=(',', ':'))}"
|
|
|
|
# Combine all resources (model, loras, embeddings, manual entries) for final hash string
|
|
all_resources = { modelname: ( ckpt_path, None, modelhash ) } | loras | embeddings | manual_entries
|
|
|
|
hash_parts = []
|
|
for name, (_, weight, hash_value) in (all_resources.items() if isinstance(all_resources, dict) else all_resources):
|
|
# Format: "name:hash" or "name:hash:weight" depending on download_civitai_data
|
|
if name:
|
|
# Extract clean name (only remove actual model file extensions, preserve dots in model names)
|
|
filename = name.split(':')[-1]
|
|
name_without_ext, ext = os.path.splitext(filename)
|
|
supported_extensions = folder_paths.supported_pt_extensions | {".gguf"}
|
|
|
|
# Only remove extension if it's a known model file extension
|
|
if ext.lower() in supported_extensions:
|
|
clean_name = name_without_ext
|
|
else:
|
|
clean_name = filename # Keep full name if extension isn't recognized
|
|
|
|
name_part = f"{clean_name}:"
|
|
else:
|
|
name_part = ""
|
|
|
|
# Skip entries without a valid hash
|
|
if not hash_value:
|
|
continue
|
|
|
|
weight_part = f":{weight}" if weight is not None and download_civitai_data else ""
|
|
hash_parts.append(f"{name_part}{hash_value}{weight_part}")
|
|
|
|
final_hashes = ",".join(hash_parts)
|
|
|
|
metadata = Metadata(modelname, positive, negative, width, height, seed_value, steps, cfg, sampler_name, scheduler_name, denoise, clip_skip, custom, additional_hashes, ckpt_path, a111_params, final_hashes)
|
|
return metadata
|
|
|
|
class ImageSaverSimple:
|
|
@classmethod
|
|
def INPUT_TYPES(cls) -> dict[str, Any]:
|
|
return {
|
|
"required": {
|
|
"images": ("IMAGE", { "tooltip": "image(s) to save"}),
|
|
"filename": ("STRING", {"default": '%time_%basemodelname_%seed', "multiline": False, "tooltip": "filename (available variables: %date, %time, %time_format<format>, %model, %width, %height, %seed, %counter, %sampler_name, %steps, %cfg, %scheduler_name, %basemodelname, %denoise, %clip_skip)"}),
|
|
"path": ("STRING", {"default": '', "multiline": False, "tooltip": "path to save the images (under Comfy's save directory)"}),
|
|
"extension": (['png', 'jpeg', 'jpg', 'webp'], { "tooltip": "file extension/type to save image as"}),
|
|
"lossless_webp": ("BOOLEAN", {"default": True, "tooltip": "if True, saved WEBP files will be lossless"}),
|
|
"quality_jpeg_or_webp": ("INT", {"default": 100, "min": 1, "max": 100, "tooltip": "quality setting of JPEG/WEBP"}),
|
|
"optimize_png": ("BOOLEAN", {"default": False, "tooltip": "if True, saved PNG files will be optimized (can reduce file size but is slower)"}),
|
|
"embed_workflow": ("BOOLEAN", {"default": True, "tooltip": "if True, embeds the workflow in the saved image files.\nStable for PNG, experimental for WEBP.\nJPEG experimental and only if metadata size is below 65535 bytes"}),
|
|
"save_workflow_as_json": ("BOOLEAN", {"default": False, "tooltip": "if True, also saves the workflow as a separate JSON file"}),
|
|
},
|
|
"optional": {
|
|
"metadata": ("METADATA", {"default": None, "tooltip": "metadata to embed in the image"}),
|
|
"counter": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff, "tooltip": "counter"}),
|
|
"time_format": ("STRING", {"default": "%Y-%m-%d-%H%M%S", "multiline": False, "tooltip": "timestamp format"}),
|
|
"show_preview": ("BOOLEAN", {"default": True, "tooltip": "if True, displays saved images in the UI preview"}),
|
|
},
|
|
"hidden": {
|
|
"prompt": "PROMPT",
|
|
"extra_pnginfo": "EXTRA_PNGINFO",
|
|
},
|
|
}
|
|
|
|
RETURN_TYPES = ("STRING","STRING")
|
|
RETURN_NAMES = ("hashes","a1111_params")
|
|
OUTPUT_TOOLTIPS = ("Comma-separated list of the hashes to chain with other Image Saver additional_hashes","Written parameters to the image metadata")
|
|
FUNCTION = "save_images"
|
|
|
|
OUTPUT_NODE = True
|
|
|
|
CATEGORY = "ImageSaver"
|
|
DESCRIPTION = "Save images with civitai-compatible generation metadata"
|
|
|
|
def save_images(self,
|
|
images: list[torch.Tensor],
|
|
filename: str,
|
|
path: str,
|
|
extension: str,
|
|
lossless_webp: bool,
|
|
quality_jpeg_or_webp: int,
|
|
optimize_png: bool,
|
|
embed_workflow: bool = True,
|
|
save_workflow_as_json: bool = False,
|
|
show_preview: bool = True,
|
|
metadata: Metadata | None = None,
|
|
counter: int = 0,
|
|
time_format: str = "%Y-%m-%d-%H%M%S",
|
|
prompt: dict[str, Any] | None = None,
|
|
extra_pnginfo: dict[str, Any] | None = None,
|
|
) -> dict[str, Any]:
|
|
if metadata is None:
|
|
metadata = Metadata('', '', '', 512, 512, 0, 20, 7.0, '', 'normal', 1.0, 0, '', '', '', '', '')
|
|
|
|
path = make_pathname(path, metadata.width, metadata.height, metadata.seed, metadata.modelname, counter, time_format, metadata.sampler_name, metadata.steps, metadata.cfg, metadata.scheduler_name, metadata.denoise, metadata.clip_skip, metadata.custom)
|
|
|
|
filenames = ImageSaver.save_images(images, filename, extension, path, quality_jpeg_or_webp, lossless_webp, optimize_png, prompt, extra_pnginfo, save_workflow_as_json, embed_workflow, counter, time_format, metadata)
|
|
|
|
subfolder = os.path.normpath(path)
|
|
|
|
result: dict[str, Any] = {
|
|
"result": (metadata.final_hashes, metadata.a111_params),
|
|
}
|
|
|
|
if show_preview:
|
|
result["ui"] = {"images": [{"filename": filename, "subfolder": subfolder if subfolder != '.' else '', "type": 'output'} for filename in filenames]}
|
|
|
|
return result
|
|
|
|
class ImageSaver:
|
|
@classmethod
|
|
def INPUT_TYPES(cls) -> dict[str, Any]:
|
|
return {
|
|
"required": {
|
|
"images": ("IMAGE", { "tooltip": "image(s) to save"}),
|
|
"filename": ("STRING", {"default": '%time_%basemodelname_%seed', "multiline": False, "tooltip": "filename (available variables: %date, %time, %time_format<format>, %model, %width, %height, %seed, %counter, %sampler_name, %steps, %cfg, %scheduler_name, %basemodelname, %denoise, %clip_skip)"}),
|
|
"path": ("STRING", {"default": '', "multiline": False, "tooltip": "path to save the images (under Comfy's save directory)"}),
|
|
"extension": (['png', 'jpeg', 'jpg', 'webp'], { "tooltip": "file extension/type to save image as"}),
|
|
},
|
|
"optional": {
|
|
"steps": ("INT", {"default": 20, "min": 1, "max": 10000, "tooltip": "number of steps"}),
|
|
"cfg": ("FLOAT", {"default": 7.0, "min": 0.0, "max": 100.0, "tooltip": "CFG value"}),
|
|
"modelname": ("STRING", {"default": '', "multiline": False, "tooltip": "model name (can be multiple, separated by commas)"}),
|
|
"sampler_name": ("STRING", {"default": '', "multiline": False, "tooltip": "sampler name (as string)"}),
|
|
"scheduler_name": ("STRING", {"default": 'normal', "multiline": False, "tooltip": "scheduler name (as string)"}),
|
|
"positive": ("STRING", {"default": 'unknown', "multiline": True, "tooltip": "positive prompt"}),
|
|
"negative": ("STRING", {"default": 'unknown', "multiline": True, "tooltip": "negative prompt"}),
|
|
"seed_value": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff, "tooltip": "seed"}),
|
|
"width": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 8, "tooltip": "image width"}),
|
|
"height": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 8, "tooltip": "image height"}),
|
|
"lossless_webp": ("BOOLEAN", {"default": True, "tooltip": "if True, saved WEBP files will be lossless"}),
|
|
"quality_jpeg_or_webp": ("INT", {"default": 100, "min": 1, "max": 100, "tooltip": "quality setting of JPEG/WEBP"}),
|
|
"optimize_png": ("BOOLEAN", {"default": False, "tooltip": "if True, saved PNG files will be optimized (can reduce file size but is slower)"}),
|
|
"counter": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff, "tooltip": "counter"}),
|
|
"denoise": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "tooltip": "denoise value"}),
|
|
"clip_skip": ("INT", {"default": 0, "min": -24, "max": 24, "tooltip": "skip last CLIP layers (positive or negative value, 0 for no skip)"}),
|
|
"time_format": ("STRING", {"default": "%Y-%m-%d-%H%M%S", "multiline": False, "tooltip": "timestamp format"}),
|
|
"save_workflow_as_json": ("BOOLEAN", {"default": False, "tooltip": "if True, also saves the workflow as a separate JSON file"}),
|
|
"embed_workflow": ("BOOLEAN", {"default": True, "tooltip": "if True, embeds the workflow in the saved image files.\nStable for PNG, experimental for WEBP.\nJPEG experimental and only if metadata size is below 65535 bytes"}),
|
|
"additional_hashes": ("STRING", {"default": "", "multiline": False, "tooltip": "hashes separated by commas, optionally with names. 'Name:HASH' (e.g., 'MyLoRA:FF735FF83F98')\nWith download_civitai_data set to true, weights can be added as well. (e.g., 'HASH:Weight', 'Name:HASH:Weight')"}),
|
|
"download_civitai_data": ("BOOLEAN", {"default": True, "tooltip": "Download and cache data from civitai.com to save correct metadata. Allows LoRA weights to be saved to the metadata."}),
|
|
"easy_remix": ("BOOLEAN", {"default": True, "tooltip": "Strip LoRAs and simplify 'embedding:path' from the prompt to make the Remix option on civitai.com more seamless."}),
|
|
"show_preview": ("BOOLEAN", {"default": True, "tooltip": "if True, displays saved images in the UI preview"}),
|
|
"custom": ("STRING", {"default": "", "multiline": False, "tooltip": "custom string to add to the metadata, inserted into the a111 string between clip skip and model hash"}),
|
|
},
|
|
"hidden": {
|
|
"prompt": "PROMPT",
|
|
"extra_pnginfo": "EXTRA_PNGINFO",
|
|
},
|
|
}
|
|
|
|
RETURN_TYPES = ("STRING","STRING")
|
|
RETURN_NAMES = ("hashes","a1111_params")
|
|
OUTPUT_TOOLTIPS = ("Comma-separated list of the hashes to chain with other Image Saver additional_hashes","Written parameters to the image metadata")
|
|
FUNCTION = "save_files"
|
|
|
|
OUTPUT_NODE = True
|
|
|
|
CATEGORY = "ImageSaver"
|
|
DESCRIPTION = "Save images with civitai-compatible generation metadata"
|
|
|
|
def save_files(
|
|
self,
|
|
images: list[torch.Tensor],
|
|
filename: str,
|
|
path: str,
|
|
extension: str,
|
|
steps: int = 20,
|
|
cfg: float = 7.0,
|
|
modelname: str = "",
|
|
sampler_name: str = "",
|
|
scheduler_name: str = "normal",
|
|
positive: str = "unknown",
|
|
negative: str = "unknown",
|
|
seed_value: int = 0,
|
|
width: int = 512,
|
|
height: int = 512,
|
|
lossless_webp: bool = True,
|
|
quality_jpeg_or_webp: int = 100,
|
|
optimize_png: bool = False,
|
|
counter: int = 0,
|
|
denoise: float = 1.0,
|
|
clip_skip: int = 0,
|
|
time_format: str = "%Y-%m-%d-%H%M%S",
|
|
save_workflow_as_json: bool = False,
|
|
embed_workflow: bool = True,
|
|
additional_hashes: str = "",
|
|
download_civitai_data: bool = True,
|
|
easy_remix: bool = True,
|
|
show_preview: bool = True,
|
|
custom: str = "",
|
|
prompt: dict[str, Any] | None = None,
|
|
extra_pnginfo: dict[str, Any] | None = None,
|
|
) -> dict[str, Any]:
|
|
metadata = ImageSaverMetadata.make_metadata(modelname, positive, negative, width, height, seed_value, steps, cfg, sampler_name, scheduler_name, denoise, clip_skip, custom, additional_hashes, download_civitai_data, easy_remix)
|
|
|
|
path = make_pathname(path, metadata.width, metadata.height, metadata.seed, metadata.modelname, counter, time_format, metadata.sampler_name, metadata.steps, metadata.cfg, metadata.scheduler_name, metadata.denoise, metadata.clip_skip, metadata.custom)
|
|
|
|
filenames = ImageSaver.save_images(images, filename, extension, path, quality_jpeg_or_webp, lossless_webp, optimize_png, prompt, extra_pnginfo, save_workflow_as_json, embed_workflow, counter, time_format, metadata)
|
|
|
|
subfolder = os.path.normpath(path)
|
|
|
|
result: dict[str, Any] = {
|
|
"result": (metadata.final_hashes, metadata.a111_params),
|
|
}
|
|
|
|
if show_preview:
|
|
result["ui"] = {"images": [{"filename": filename, "subfolder": subfolder if subfolder != '.' else '', "type": 'output'} for filename in filenames]}
|
|
|
|
return result
|
|
|
|
@staticmethod
|
|
def save_images(
|
|
images: list[torch.Tensor],
|
|
filename_pattern: str,
|
|
extension: str,
|
|
path: str,
|
|
quality_jpeg_or_webp: int,
|
|
lossless_webp: bool,
|
|
optimize_png: bool,
|
|
prompt: dict[str, Any] | None,
|
|
extra_pnginfo: dict[str, Any] | None,
|
|
save_workflow_as_json: bool,
|
|
embed_workflow: bool,
|
|
counter: int,
|
|
time_format: str,
|
|
metadata: Metadata
|
|
) -> list[str]:
|
|
filename_prefix = make_filename(filename_pattern, metadata.width, metadata.height, metadata.seed, metadata.modelname, counter, time_format, metadata.sampler_name, metadata.steps, metadata.cfg, metadata.scheduler_name, metadata.denoise, metadata.clip_skip, metadata.custom)
|
|
|
|
output_path = os.path.join(folder_paths.output_directory, path)
|
|
|
|
if output_path.strip() != '':
|
|
if not os.path.exists(output_path.strip()):
|
|
print(f'The path `{output_path.strip()}` specified doesn\'t exist! Creating directory.')
|
|
os.makedirs(output_path, exist_ok=True)
|
|
|
|
result_paths: list[str] = list()
|
|
num_images = len(images)
|
|
for idx, image in enumerate(images):
|
|
i = 255. * image.cpu().numpy()
|
|
img = Image.fromarray(np.clip(i, 0, 255).astype(np.uint8))
|
|
|
|
current_filename_prefix = ImageSaver.get_unique_filename(output_path, filename_prefix, extension, batch_size=num_images, batch_index=idx)
|
|
final_filename = f"{current_filename_prefix}.{extension}"
|
|
filepath = os.path.join(output_path, final_filename)
|
|
|
|
save_image(img, filepath, extension, quality_jpeg_or_webp, lossless_webp, optimize_png, metadata.a111_params, prompt, extra_pnginfo, embed_workflow)
|
|
|
|
if save_workflow_as_json:
|
|
save_json(extra_pnginfo, os.path.join(output_path, current_filename_prefix))
|
|
|
|
result_paths.append(final_filename)
|
|
return result_paths
|
|
|
|
# Match 'anything' or 'anything:anything' with trimmed white space
|
|
re_manual_hash = re.compile(r'^\s*([^:]+?)(?:\s*:\s*([^\s:][^:]*?))?\s*$')
|
|
# Match 'anything', 'anything:anything' or 'anything:anything:number' with trimmed white space
|
|
re_manual_hash_weights = re.compile(r'^\s*([^:]+?)(?:\s*:\s*([^\s:][^:]*?))?(?:\s*:\s*([-+]?(?:\d+(?:\.\d*)?|\.\d+)))?\s*$')
|
|
|
|
@staticmethod
|
|
def get_multiple_models(modelname: str, additional_hashes: str) -> tuple[str, str]:
|
|
model_names = [m.strip() for m in modelname.split(',')]
|
|
modelname = model_names[0] # Use the first model as the primary one
|
|
|
|
# Process additional model names and add to additional_hashes
|
|
for additional_model in model_names[1:]:
|
|
additional_ckpt_path = full_checkpoint_path_for(additional_model)
|
|
if additional_ckpt_path:
|
|
additional_modelhash = get_sha256(additional_ckpt_path)[:10]
|
|
# Add to additional_hashes in "name:HASH" format
|
|
if additional_hashes:
|
|
additional_hashes += ","
|
|
additional_hashes += f"{additional_model}:{additional_modelhash}"
|
|
return modelname, additional_hashes
|
|
|
|
@staticmethod
|
|
def parse_manual_hashes(additional_hashes: str, existing_hashes: set[str], download_civitai_data: bool) -> dict[str, tuple[str | None, float | None, str]]:
|
|
"""Process additional_hashes input (a string) by normalizing, removing extra spaces/newlines, and splitting by comma"""
|
|
manual_entries: dict[str, tuple[str | None, float | None, str]] = {}
|
|
unnamed_count = 0
|
|
|
|
additional_hash_split = additional_hashes.replace("\n", ",").split(",") if additional_hashes else []
|
|
for entry in additional_hash_split:
|
|
match = (ImageSaver.re_manual_hash_weights if download_civitai_data else ImageSaver.re_manual_hash).search(entry)
|
|
if match is None:
|
|
print(f"ComfyUI-Image-Saver: Invalid additional hash string: '{entry}'")
|
|
continue
|
|
|
|
groups = tuple(group for group in match.groups() if group)
|
|
|
|
# Read weight and remove from groups, if needed
|
|
weight = None
|
|
if download_civitai_data and len(groups) > 1:
|
|
try:
|
|
weight = float(groups[-1])
|
|
groups = groups[:-1]
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
# Read hash, optionally preceded by name
|
|
name, hash = groups if len(groups) > 1 else (None, groups[0])
|
|
|
|
if len(hash) > MAX_HASH_LENGTH:
|
|
print(f"ComfyUI-Image-Saver: Skipping hash. Length exceeds maximum of {MAX_HASH_LENGTH} characters: {hash}")
|
|
continue
|
|
|
|
if any(hash.lower() == existing_hash.lower() for _, _, existing_hash in manual_entries.values()):
|
|
print(f"ComfyUI-Image-Saver: Skipping duplicate hash: {hash}")
|
|
continue # Skip duplicates
|
|
|
|
if hash.lower() in existing_hashes:
|
|
print(f"ComfyUI-Image-Saver: Skipping manual hash already present in resources: {hash}")
|
|
continue
|
|
|
|
if name is None:
|
|
unnamed_count += 1
|
|
name = f"manual{unnamed_count}"
|
|
elif name in manual_entries:
|
|
print(f"ComfyUI-Image-Saver: Duplicate manual hash name '{name}' is being overwritten.")
|
|
|
|
manual_entries[name] = (None, weight, hash)
|
|
|
|
if len(manual_entries) > 29:
|
|
print("ComfyUI-Image-Saver: Reached maximum limit of 30 manual hashes. Skipping the rest.")
|
|
break
|
|
|
|
return manual_entries
|
|
|
|
@staticmethod
|
|
def clean_prompt(prompt: str, metadata_extractor: PromptMetadataExtractor) -> str:
|
|
"""Clean prompts for easier remixing by removing LoRAs and simplifying embeddings."""
|
|
# Strip loras
|
|
prompt = re.sub(metadata_extractor.LORA, "", prompt)
|
|
# Shorten 'embedding:path/to/my_embedding' -> 'my_embedding'
|
|
# Note: Possible inaccurate embedding name if the filename has been renamed from the default
|
|
prompt = re.sub(metadata_extractor.EMBEDDING, lambda match: Path(match.group(1)).stem, prompt)
|
|
# Remove prompt control edits. e.g., 'STYLE(A1111, mean)', 'SHIFT(1)`, etc.`
|
|
prompt = re.sub(r'\b[A-Z]+\([^)]*\)', "", prompt)
|
|
return prompt
|
|
|
|
@staticmethod
|
|
def get_unique_filename(output_path: str, filename_prefix: str, extension: str, batch_size: int = 1, batch_index: int = 0) -> str:
|
|
existing_files = [f for f in os.listdir(output_path) if f.startswith(filename_prefix) and f.endswith(extension)]
|
|
|
|
# For single images with no existing files, return plain filename
|
|
if batch_size == 1 and not existing_files:
|
|
return f"{filename_prefix}"
|
|
|
|
# For batches or when files exist, always use numbered suffix
|
|
suffixes: list[int] = []
|
|
for f in existing_files:
|
|
name, _ = os.path.splitext(f)
|
|
parts = name.split('_')
|
|
if parts[-1].isdigit():
|
|
suffixes.append(int(parts[-1]))
|
|
|
|
if suffixes:
|
|
# Start numbering after the highest existing suffix
|
|
base_suffix = max(suffixes) + 1
|
|
else:
|
|
# No numbered files exist yet
|
|
if existing_files:
|
|
# Plain file exists, start at 1 (the plain file is effectively 0)
|
|
base_suffix = 1
|
|
else:
|
|
# No files at all, start at 1
|
|
base_suffix = 1
|
|
|
|
return f"{filename_prefix}_{base_suffix + batch_index:02d}"
|