Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 49 additions & 42 deletions fastchat/serve/launch_all_serve.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@

sys.path.append(os.path.dirname(os.path.dirname(__file__)))

import subprocess
import re
import runpy
import shlex
import time
import argparse

LOGDIR = "./logs/"
Expand Down Expand Up @@ -190,20 +192,41 @@
)
os.environ["CUDA_VISIBLE_DEVICES"] = args.gpus

# 0,controller, model_worker, openai_api_server
# 1, cmd options
# 2,LOGDIR
# 3, log file name
base_launch_sh = "nohup python3 -m fastchat.serve.{0} {1} >{2}/{3}.log 2>&1 &"
def launch_process(module, str_args, log_dir, log_name):
import sys
module_name = f"fastchat.serve.{module}"
argv = [module_name] + shlex.split(str_args)
log_path = os.path.join(log_dir, f"{log_name}.log")
# Double-fork: grandchild is fully detached (adopted by init)
pid = os.fork()
if pid == 0:
if os.fork() != 0:
os._exit(0) # intermediate child exits; grandchild continues
log_fd = os.open(log_path, os.O_WRONLY | os.O_CREAT | os.O_APPEND, 0o644)
os.dup2(log_fd, 1)
os.dup2(log_fd, 2)
os.close(log_fd)
os.setsid()
sys.argv = argv
try:
runpy.run_module(module_name, run_name="__main__", alter_sys=True)
finally:
os._exit(0)
os.waitpid(pid, 0) # reap intermediate child immediately

# 0 LOGDIR
#! 1 log file name
# 2 controller, worker, openai_api_server
base_check_sh = """while [ `grep -c "Uvicorn running on" {0}/{1}.log` -eq '0' ];do
sleep 1s;
echo "wait {2} running"
done
echo '{2} running' """

def wait_for_service(log_dir, log_name, service_name):
log_path = os.path.join(log_dir, f"{log_name}.log")
while True:
try:
with open(log_path, "r") as f:
if "Uvicorn running on" in f.read():
break
except FileNotFoundError:
pass
time.sleep(1)
print(f"wait {service_name} running")
print(f"{service_name} running")


def string_args(args, args_list):
Expand All @@ -224,43 +247,33 @@ def string_args(args, args_list):
or isinstance(value, tuple)
or isinstance(value, set)
):
value = " ".join(value)
value = " ".join(shlex.quote(str(v)) for v in value)
args_str += f" --{key} {value} "
else:
args_str += f" --{key} {value} "
args_str += f" --{key} {shlex.quote(str(value))} "

return args_str


def launch_worker(item):
log_name = (
item.split("/")[-1]
.split("\\")[-1]
.replace("-", "_")
.replace("@", "_")
.replace(".", "_")
log_name = re.sub(
r"[^a-zA-Z0-9_]",
"_",
item.split("/")[-1].split("\\")[-1],
)

args.model_path, args.worker_host, args.worker_port = item.split("@")
print("*" * 80)
worker_str_args = string_args(args, worker_args)
print(worker_str_args)
worker_sh = base_launch_sh.format(
"model_worker", worker_str_args, LOGDIR, f"worker_{log_name}"
)
worker_check_sh = base_check_sh.format(LOGDIR, f"worker_{log_name}", "model_worker")
subprocess.run(worker_sh, shell=True, check=True)
subprocess.run(worker_check_sh, shell=True, check=True)
launch_process("model_worker", worker_str_args, LOGDIR, f"worker_{log_name}")
wait_for_service(LOGDIR, f"worker_{log_name}", "model_worker")


def launch_all():
controller_str_args = string_args(args, controller_args)
controller_sh = base_launch_sh.format(
"controller", controller_str_args, LOGDIR, "controller"
)
controller_check_sh = base_check_sh.format(LOGDIR, "controller", "controller")
subprocess.run(controller_sh, shell=True, check=True)
subprocess.run(controller_check_sh, shell=True, check=True)
launch_process("controller", controller_str_args, LOGDIR, "controller")
wait_for_service(LOGDIR, "controller", "controller")

if isinstance(args.model_path_address, str):
launch_worker(args.model_path_address)
Expand All @@ -270,14 +283,8 @@ def launch_all():
launch_worker(item)

server_str_args = string_args(args, server_args)
server_sh = base_launch_sh.format(
"openai_api_server", server_str_args, LOGDIR, "openai_api_server"
)
server_check_sh = base_check_sh.format(
LOGDIR, "openai_api_server", "openai_api_server"
)
subprocess.run(server_sh, shell=True, check=True)
subprocess.run(server_check_sh, shell=True, check=True)
launch_process("openai_api_server", server_str_args, LOGDIR, "openai_api_server")
wait_for_service(LOGDIR, "openai_api_server", "openai_api_server")


if __name__ == "__main__":
Expand Down
Loading