Some checks failed
Python Linting / Run Ruff (push) Has been cancelled
Python Linting / Run Pylint (push) Has been cancelled
Full Comfy CI Workflow Runs / test-stable (12.1, , linux, 3.10, [self-hosted Linux], stable) (push) Has been cancelled
Full Comfy CI Workflow Runs / test-stable (12.1, , linux, 3.11, [self-hosted Linux], stable) (push) Has been cancelled
Full Comfy CI Workflow Runs / test-stable (12.1, , linux, 3.12, [self-hosted Linux], stable) (push) Has been cancelled
Full Comfy CI Workflow Runs / test-unix-nightly (12.1, , linux, 3.11, [self-hosted Linux], nightly) (push) Has been cancelled
Execution Tests / test (macos-latest) (push) Has been cancelled
Execution Tests / test (ubuntu-latest) (push) Has been cancelled
Execution Tests / test (windows-latest) (push) Has been cancelled
Test server launches without errors / test (push) Has been cancelled
Unit Tests / test (macos-latest) (push) Has been cancelled
Unit Tests / test (ubuntu-latest) (push) Has been cancelled
Unit Tests / test (windows-2022) (push) Has been cancelled
Includes 30 custom nodes committed directly, 7 Civitai-exclusive loras stored via Git LFS, and a setup script that installs all dependencies and downloads HuggingFace-hosted models on vast.ai. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
301 lines
11 KiB
Python
301 lines
11 KiB
Python
import os
|
|
import hashlib
|
|
import sys
|
|
import json
|
|
import shutil
|
|
import folder_paths
|
|
from aiohttp import web
|
|
from server import PromptServer
|
|
from .config import RESOURCES_DIR, FOOOCUS_STYLES_DIR, FOOOCUS_STYLES_SAMPLES
|
|
from .libs.model import easyModelManager
|
|
from .libs.utils import getMetadata, cleanGPUUsedForce, get_local_filepath
|
|
from .libs.cache import remove_cache
|
|
from .libs.translate import has_chinese, zh_to_en
|
|
|
|
@PromptServer.instance.routes.get('/easyuse/version')
|
|
def get_version(request):
|
|
try:
|
|
from .. import __version__
|
|
return web.json_response({"version": __version__})
|
|
except Exception as e:
|
|
print(e)
|
|
return web.Response(status=500)
|
|
|
|
@PromptServer.instance.routes.post("/easyuse/cleangpu")
|
|
def cleanGPU(request):
|
|
try:
|
|
cleanGPUUsedForce()
|
|
remove_cache('*')
|
|
return web.Response(status=200)
|
|
except Exception as e:
|
|
return web.Response(status=500)
|
|
pass
|
|
|
|
@PromptServer.instance.routes.post("/easyuse/removecache")
|
|
async def removecache(request):
|
|
post = await request.post()
|
|
key = post.get("key")
|
|
try:
|
|
remove_cache(key)
|
|
return web.Response(status=200)
|
|
except Exception as e:
|
|
return web.Response(status=500)
|
|
pass
|
|
|
|
@PromptServer.instance.routes.post("/easyuse/translate")
|
|
async def translate(request):
|
|
post = await request.post()
|
|
text = post.get("text")
|
|
if has_chinese(text):
|
|
return web.json_response({"text": zh_to_en([text])[0]})
|
|
else:
|
|
return web.json_response({"text": text})
|
|
|
|
@PromptServer.instance.routes.get("/easyuse/reboot")
|
|
def reboot(request):
|
|
try:
|
|
sys.stdout.close_log()
|
|
except Exception as e:
|
|
pass
|
|
|
|
return os.execv(sys.executable, [sys.executable] + sys.argv)
|
|
|
|
# parse csv
|
|
@PromptServer.instance.routes.post("/easyuse/upload/csv")
|
|
async def parse_csv(request):
|
|
post = await request.post()
|
|
csv = post.get("csv")
|
|
if csv and csv.file:
|
|
file = csv.file
|
|
text = ''
|
|
for line in file.readlines():
|
|
line = str(line.strip())
|
|
line = line.replace("'", "").replace("b",'')
|
|
text += line + '; \n'
|
|
return web.json_response(text)
|
|
|
|
#get style list
|
|
@PromptServer.instance.routes.get("/easyuse/prompt/styles")
|
|
async def getStylesList(request):
|
|
if "name" in request.rel_url.query:
|
|
style_name = request.rel_url.query["name"]
|
|
fooocus_custom_dir = os.path.join(FOOOCUS_STYLES_DIR, 'fooocus_styles.json')
|
|
if style_name == 'fooocus_styles' and not os.path.exists(fooocus_custom_dir):
|
|
file = os.path.join(RESOURCES_DIR, style_name+'.json')
|
|
cn_file = os.path.join(RESOURCES_DIR, style_name + '_cn.json')
|
|
else:
|
|
file = os.path.join(FOOOCUS_STYLES_DIR, style_name+'.json')
|
|
cn_file = os.path.join(FOOOCUS_STYLES_DIR, style_name + '_cn.json')
|
|
cn_data = None
|
|
if os.path.isfile(cn_file):
|
|
f = open(cn_file, 'r', encoding='utf-8')
|
|
cn_data = json.load(f)
|
|
f.close()
|
|
if os.path.isfile(file):
|
|
f = open(file, 'r', encoding='utf-8')
|
|
data = json.load(f)
|
|
f.close()
|
|
if data:
|
|
ndata = []
|
|
for d in data:
|
|
nd = {}
|
|
name = d['name'].replace('-', ' ')
|
|
words = name.split(' ')
|
|
key = ' '.join(
|
|
word.upper() if word.lower() in ['mre', 'sai', '3d'] else word.capitalize() for word in
|
|
words)
|
|
if "name_cn" in d:
|
|
nd['name_cn'] = d['name_cn']
|
|
elif cn_data:
|
|
nd['name_cn'] = cn_data[key] if key in cn_data else key
|
|
nd["name"] = d['name']
|
|
if "thumbnail" in d:
|
|
thumbnail = d['thumbnail']
|
|
if isinstance(d['thumbnail'], str):
|
|
nd['thumbnail'] = thumbnail if "http" in thumbnail else f'/easyuse/prompt/styles/image?path={thumbnail}'
|
|
elif isinstance(d['thumbnail'], list):
|
|
nd['thumbnail'] = [thumb if "http" in thumb else f'/easyuse/prompt/styles/image?path={thumb}' for thumb in thumbnail]
|
|
else:
|
|
nd['thumbnail'] = f'/easyuse/prompt/styles/image?name={name}&styles_name={style_name}'
|
|
if "thumbnail_variant" in d:
|
|
nd['thumbnailVariant'] = d['thumbnail_variant']
|
|
if "media_type" in d:
|
|
nd['mediaType'] = d['media_type']
|
|
if "media_subtype" in d:
|
|
nd['mediaSubtype'] = d['media_subtype']
|
|
if "prompt" in d:
|
|
nd['prompt'] = d['prompt']
|
|
if "negative_prompt" in d:
|
|
nd['negative_prompt'] = d['negative_prompt']
|
|
ndata.append(nd)
|
|
return web.json_response(ndata)
|
|
return web.Response(status=400)
|
|
|
|
# get style preview image
|
|
@PromptServer.instance.routes.get("/easyuse/prompt/styles/image")
|
|
async def getStylesImage(request):
|
|
styles_name = request.rel_url.query["styles_name"] if "styles_name" in request.rel_url.query else None
|
|
if "path" in request.rel_url.query:
|
|
path = request.rel_url.query["path"]
|
|
file = os.path.join(FOOOCUS_STYLES_DIR, 'samples', path)
|
|
parent_file = os.path.join(FOOOCUS_STYLES_DIR, path)
|
|
if os.path.isfile(file):
|
|
return web.FileResponse(file)
|
|
elif os.path.isfile(parent_file):
|
|
return web.FileResponse(parent_file)
|
|
elif "name" in request.rel_url.query:
|
|
name = request.rel_url.query["name"]
|
|
if os.path.exists(os.path.join(FOOOCUS_STYLES_DIR, 'samples')):
|
|
file = os.path.join(FOOOCUS_STYLES_DIR, 'samples', name + '.jpg')
|
|
if os.path.isfile(file):
|
|
return web.FileResponse(file)
|
|
elif styles_name == 'fooocus_styles':
|
|
return web.Response(text=FOOOCUS_STYLES_SAMPLES + name + '.jpg')
|
|
elif styles_name == 'fooocus_styles':
|
|
return web.Response(text=FOOOCUS_STYLES_SAMPLES + name + '.jpg')
|
|
return web.Response(status=400)
|
|
|
|
# get models lists
|
|
@PromptServer.instance.routes.get("/easyuse/models/list")
|
|
async def getModelsList(request):
|
|
if "type" in request.rel_url.query:
|
|
type = request.rel_url.query["type"]
|
|
if type not in ['checkpoints', 'loras']:
|
|
return web.Response(status=400)
|
|
manager = easyModelManager()
|
|
return web.json_response(manager.get_model_lists(type))
|
|
else:
|
|
return web.Response(status=400)
|
|
|
|
@PromptServer.instance.routes.post("/easyuse/metadata/notes/{name}")
|
|
async def save_notes(request):
|
|
name = request.match_info["name"]
|
|
pos = name.index("/")
|
|
type = name[0:pos]
|
|
name = name[pos+1:]
|
|
|
|
file_path = None
|
|
if type == "embeddings" or type == "loras":
|
|
name = name.lower()
|
|
files = folder_paths.get_filename_list(type)
|
|
for f in files:
|
|
lower_f = f.lower()
|
|
if lower_f == name:
|
|
file_path = folder_paths.get_full_path(type, f)
|
|
else:
|
|
n = os.path.splitext(f)[0].lower()
|
|
if n == name:
|
|
file_path = folder_paths.get_full_path(type, f)
|
|
|
|
if file_path is not None:
|
|
break
|
|
else:
|
|
file_path = folder_paths.get_full_path(
|
|
type, name)
|
|
if not file_path:
|
|
return web.Response(status=404)
|
|
|
|
file_no_ext = os.path.splitext(file_path)[0]
|
|
info_file = file_no_ext + ".txt"
|
|
with open(info_file, "w") as f:
|
|
f.write(await request.text())
|
|
|
|
return web.Response(status=200)
|
|
|
|
@PromptServer.instance.routes.get("/easyuse/metadata/{name}")
|
|
async def load_metadata(request):
|
|
name = request.match_info["name"]
|
|
pos = name.index("/")
|
|
type = name[0:pos]
|
|
name = name[pos+1:]
|
|
|
|
file_path = None
|
|
if type == "embeddings":
|
|
name = name.lower()
|
|
files = folder_paths.get_filename_list(type)
|
|
for f in files:
|
|
lower_f = f.lower()
|
|
if lower_f == name:
|
|
file_path = folder_paths.get_full_path(type, f)
|
|
else:
|
|
n = os.path.splitext(f)[0].lower()
|
|
if n == name:
|
|
file_path = folder_paths.get_full_path(type, f)
|
|
|
|
if file_path is not None:
|
|
break
|
|
else:
|
|
file_path = folder_paths.get_full_path(type, name)
|
|
if not file_path:
|
|
return web.Response(status=404)
|
|
|
|
try:
|
|
header = getMetadata(file_path)
|
|
header_json = json.loads(header)
|
|
meta = header_json["__metadata__"] if "__metadata__" in header_json else None
|
|
except:
|
|
meta = None
|
|
|
|
if meta is None:
|
|
meta = {}
|
|
|
|
file_no_ext = os.path.splitext(file_path)[0]
|
|
|
|
info_file = file_no_ext + ".txt"
|
|
if os.path.isfile(info_file):
|
|
with open(info_file, "r") as f:
|
|
meta["easyuse.notes"] = f.read()
|
|
|
|
hash_file = file_no_ext + ".sha256"
|
|
if os.path.isfile(hash_file):
|
|
with open(hash_file, "rt") as f:
|
|
meta["easyuse.sha256"] = f.read()
|
|
else:
|
|
with open(file_path, "rb") as f:
|
|
meta["easyuse.sha256"] = hashlib.sha256(f.read()).hexdigest()
|
|
with open(hash_file, "wt") as f:
|
|
f.write(meta["easyuse.sha256"])
|
|
|
|
return web.json_response(meta)
|
|
|
|
@PromptServer.instance.routes.post("/easyuse/save/{name}")
|
|
async def save_preview(request):
|
|
name = request.match_info["name"]
|
|
pos = name.index("/")
|
|
type = name[0:pos]
|
|
name = name[pos+1:]
|
|
|
|
body = await request.json()
|
|
|
|
dir = folder_paths.get_directory_by_type(body.get("type", "output"))
|
|
subfolder = body.get("subfolder", "")
|
|
full_output_folder = os.path.join(dir, os.path.normpath(subfolder))
|
|
|
|
if os.path.commonpath((dir, os.path.abspath(full_output_folder))) != dir:
|
|
return web.Response(status=400)
|
|
|
|
filepath = os.path.join(full_output_folder, body.get("filename", ""))
|
|
image_path = folder_paths.get_full_path(type, name)
|
|
image_path = os.path.splitext(
|
|
image_path)[0] + os.path.splitext(filepath)[1]
|
|
|
|
shutil.copyfile(filepath, image_path)
|
|
|
|
return web.json_response({
|
|
"image": type + "/" + os.path.basename(image_path)
|
|
})
|
|
|
|
@PromptServer.instance.routes.post("/easyuse/model/download")
|
|
async def download_model(request):
|
|
post = await request.post()
|
|
url = post.get("url")
|
|
local_dir = post.get("local_dir")
|
|
if local_dir not in ['checkpoints', 'loras', 'controlnet', 'onnx', 'instantid', 'ipadapter', 'dynamicrafter_models', 'mediapipe', 'rembg', 'layer_model']:
|
|
return web.Response(status=400)
|
|
local_path = os.path.join(folder_paths.models_dir, local_dir)
|
|
try:
|
|
get_local_filepath(url, local_path)
|
|
return web.Response(status=200)
|
|
except:
|
|
return web.Response(status=500)
|