diff --git a/fastchat/serve/launch_all_serve.py b/fastchat/serve/launch_all_serve.py index 2f4ad7b0b..f0a35d8ed 100644 --- a/fastchat/serve/launch_all_serve.py +++ b/fastchat/serve/launch_all_serve.py @@ -13,8 +13,10 @@ sys.path.append(os.path.dirname(os.path.dirname(__file__))) -import subprocess import re +import runpy +import shlex +import time import argparse LOGDIR = "./logs/" @@ -190,20 +192,41 @@ ) os.environ["CUDA_VISIBLE_DEVICES"] = args.gpus -# 0,controller, model_worker, openai_api_server -# 1, cmd options -# 2,LOGDIR -# 3, log file name -base_launch_sh = "nohup python3 -m fastchat.serve.{0} {1} >{2}/{3}.log 2>&1 &" +def launch_process(module, str_args, log_dir, log_name): + import sys + module_name = f"fastchat.serve.{module}" + argv = [module_name] + shlex.split(str_args) + log_path = os.path.join(log_dir, f"{log_name}.log") + # Double-fork: grandchild is fully detached (adopted by init) + pid = os.fork() + if pid == 0: + if os.fork() != 0: + os._exit(0) # intermediate child exits; grandchild continues + log_fd = os.open(log_path, os.O_WRONLY | os.O_CREAT | os.O_APPEND, 0o644) + os.dup2(log_fd, 1) + os.dup2(log_fd, 2) + os.close(log_fd) + os.setsid() + sys.argv = argv + try: + runpy.run_module(module_name, run_name="__main__", alter_sys=True) + finally: + os._exit(0) + os.waitpid(pid, 0) # reap intermediate child immediately -# 0 LOGDIR -#! 1 log file name -# 2 controller, worker, openai_api_server -base_check_sh = """while [ `grep -c "Uvicorn running on" {0}/{1}.log` -eq '0' ];do - sleep 1s; - echo "wait {2} running" - done - echo '{2} running' """ + +def wait_for_service(log_dir, log_name, service_name): + log_path = os.path.join(log_dir, f"{log_name}.log") + while True: + try: + with open(log_path, "r") as f: + if "Uvicorn running on" in f.read(): + break + except FileNotFoundError: + pass + time.sleep(1) + print(f"wait {service_name} running") + print(f"{service_name} running") def string_args(args, args_list): @@ -224,43 +247,33 @@ def string_args(args, args_list): or isinstance(value, tuple) or isinstance(value, set) ): - value = " ".join(value) + value = " ".join(shlex.quote(str(v)) for v in value) args_str += f" --{key} {value} " else: - args_str += f" --{key} {value} " + args_str += f" --{key} {shlex.quote(str(value))} " return args_str def launch_worker(item): - log_name = ( - item.split("/")[-1] - .split("\\")[-1] - .replace("-", "_") - .replace("@", "_") - .replace(".", "_") + log_name = re.sub( + r"[^a-zA-Z0-9_]", + "_", + item.split("/")[-1].split("\\")[-1], ) args.model_path, args.worker_host, args.worker_port = item.split("@") print("*" * 80) worker_str_args = string_args(args, worker_args) print(worker_str_args) - worker_sh = base_launch_sh.format( - "model_worker", worker_str_args, LOGDIR, f"worker_{log_name}" - ) - worker_check_sh = base_check_sh.format(LOGDIR, f"worker_{log_name}", "model_worker") - subprocess.run(worker_sh, shell=True, check=True) - subprocess.run(worker_check_sh, shell=True, check=True) + launch_process("model_worker", worker_str_args, LOGDIR, f"worker_{log_name}") + wait_for_service(LOGDIR, f"worker_{log_name}", "model_worker") def launch_all(): controller_str_args = string_args(args, controller_args) - controller_sh = base_launch_sh.format( - "controller", controller_str_args, LOGDIR, "controller" - ) - controller_check_sh = base_check_sh.format(LOGDIR, "controller", "controller") - subprocess.run(controller_sh, shell=True, check=True) - subprocess.run(controller_check_sh, shell=True, check=True) + launch_process("controller", controller_str_args, LOGDIR, "controller") + wait_for_service(LOGDIR, "controller", "controller") if isinstance(args.model_path_address, str): launch_worker(args.model_path_address) @@ -270,14 +283,8 @@ def launch_all(): launch_worker(item) server_str_args = string_args(args, server_args) - server_sh = base_launch_sh.format( - "openai_api_server", server_str_args, LOGDIR, "openai_api_server" - ) - server_check_sh = base_check_sh.format( - LOGDIR, "openai_api_server", "openai_api_server" - ) - subprocess.run(server_sh, shell=True, check=True) - subprocess.run(server_check_sh, shell=True, check=True) + launch_process("openai_api_server", server_str_args, LOGDIR, "openai_api_server") + wait_for_service(LOGDIR, "openai_api_server", "openai_api_server") if __name__ == "__main__":