diff --git a/src/pylon/deploy/pylon-config/location.conf.template b/src/pylon/deploy/pylon-config/location.conf.template index 51f1ac81..3853ccaa 100644 --- a/src/pylon/deploy/pylon-config/location.conf.template +++ b/src/pylon/deploy/pylon-config/location.conf.template @@ -46,10 +46,19 @@ location ~ ^/log-manager/([^/]+):(\d+)/(.*)$ { # job server location ~ ^/job-server/([^/]+):(\d+)/(.*)$ { proxy_pass http://$1:$2/$3$is_args$args; + proxy_http_version 1.1; + proxy_set_header Connection ""; proxy_connect_timeout 180m; proxy_read_timeout 180m; proxy_send_timeout 180m; + # Before nginx commits any response bytes to the client, retry once on + # connection-level errors (TCP RST / ETIMEDOUT during connect). + # non_idempotent is required because inference endpoints are POST. + proxy_next_upstream error timeout non_idempotent; + proxy_next_upstream_tries 2; + proxy_next_upstream_timeout 5s; + # Disable buffering based on content type proxy_buffering off; proxy_cache off; @@ -65,7 +74,9 @@ location ~ ^/copilot/api/operation(.*)$ { # Model proxy backend location ~ ^/model-proxy/(.*)$ { - proxy_pass {{MODEL_PROXY_URI}}/$1$is_args$args; + proxy_pass http://model_proxy_upstream/$1$is_args$args; + proxy_http_version 1.1; + proxy_set_header Connection ""; proxy_connect_timeout 60m; proxy_read_timeout 60m; proxy_send_timeout 60m; diff --git a/src/pylon/deploy/pylon-config/nginx.conf.template b/src/pylon/deploy/pylon-config/nginx.conf.template index e47cfae3..3ca316e1 100644 --- a/src/pylon/deploy/pylon-config/nginx.conf.template +++ b/src/pylon/deploy/pylon-config/nginx.conf.template @@ -15,10 +15,10 @@ # DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. -worker_processes 1; +worker_processes 1; events { - worker_connections 1024; + worker_connections 65535; } http { @@ -65,8 +65,21 @@ http { # allow 10.1.0.0/16; # deny all; + {%- if MODEL_PROXY_URI %} + # Upstream keepalive pool for model-proxy. + # Reuses persistent connections and enables automatic stale-connection retry + # before nginx has committed to the client response -- eliminating the race + # that causes [Errno 104] Connection reset by peer under high concurrency. + upstream model_proxy_upstream { + server {{MODEL_PROXY_URI | replace('http://', '') | replace('https://', '')}}; + keepalive 32; + keepalive_requests 1000; + keepalive_timeout 60s; + } + {%- endif %} + server { - listen 80; + listen 80 backlog=4096; server_name localhost; client_max_body_size 0; # Disable checking of client request body size. client_body_buffer_size 256M; @@ -84,7 +97,7 @@ http { {% if SSL_ENABLE %} server { - listen 443 ssl; + listen 443 ssl backlog=4096; server_name localhost; ssl_certificate /root/{{CRT_NAME}}; diff --git a/src/pylon/deploy/pylon-config/run.sh.template b/src/pylon/deploy/pylon-config/run.sh.template index 845fe328..5cfe07c0 100644 --- a/src/pylon/deploy/pylon-config/run.sh.template +++ b/src/pylon/deploy/pylon-config/run.sh.template @@ -21,6 +21,10 @@ python3 /pylon-config/render.py cp /root/nginx.conf /etc/nginx/nginx.conf cp /root/location.conf /etc/nginx/location.conf +# Increase TCP listen backlog to match nginx backlog=4096. +# Requires NET_ADMIN capability; if it fails, fall back to OS default silently. +sysctl -w net.core.somaxconn=4096 2>/dev/null || true + {% if 'ssl' in cluster_cfg['pylon'] %} cp /https-config/{{cluster_cfg['pylon']['ssl']['crt_name']}} /root/{{cluster_cfg['pylon']['ssl']['crt_name']}} cp /https-config/{{cluster_cfg['pylon']['ssl']['key_name']}} /root/{{cluster_cfg['pylon']['ssl']['key_name']}} diff --git a/src/pylon/deploy/pylon.yaml.template b/src/pylon/deploy/pylon.yaml.template index 75b561cb..47644552 100644 --- a/src/pylon/deploy/pylon.yaml.template +++ b/src/pylon/deploy/pylon.yaml.template @@ -34,6 +34,9 @@ spec: - name: pylon image: {{ cluster_cfg['cluster']['docker-registry']['prefix'] }}pylon:{{ cluster_cfg['cluster']['docker-registry']['tag'] }} imagePullPolicy: Always + securityContext: + capabilities: + add: ["NET_ADMIN"] volumeMounts: - mountPath: /pylon-config name: pylon-configuration diff --git a/tools/mock_inference_server.py b/tools/mock_inference_server.py new file mode 100644 index 00000000..adf6644e --- /dev/null +++ b/tools/mock_inference_server.py @@ -0,0 +1,156 @@ +import random +import string +import time +import asyncio +from typing import List + +from fastapi import FastAPI, Request +from fastapi.responses import JSONResponse, StreamingResponse +import uvicorn +import argparse +import json + +app = FastAPI() + +# ====== 全局配置 ====== +CONFIG = { + "models": ["mock-gpt-4"], + "min_delay": 0.1, + "max_delay": 1.0, + "min_tokens": 5, + "max_tokens": 50, +} + + +# ====== utils ====== +def random_text(n: int) -> str: + return "".join(random.choices(string.ascii_letters + string.digits, k=n)) + + +def random_delay(): + return random.uniform(CONFIG["min_delay"], CONFIG["max_delay"]) + + +def validate_model(model: str): + if model not in CONFIG["models"]: + return CONFIG["models"][0] # fallback + return model + + +# ====== /v1/models ====== +@app.get("/v1/models") +async def list_models(): + return { + "object": "list", + "data": [ + { + "id": m, + "object": "model", + "created": int(time.time()), + "owned_by": "mock", + } + for m in CONFIG["models"] + ], + } + + +# ====== response ====== +def build_chat_response(model: str, content: str): + return { + "id": f"chatcmpl-{random_text(12)}", + "object": "chat.completion", + "created": int(time.time()), + "model": model, + "choices": [ + { + "index": 0, + "message": { + "role": "assistant", + "content": content, + }, + "finish_reason": "stop", + } + ], + "usage": { + "prompt_tokens": random.randint(5, 20), + "completion_tokens": len(content), + "total_tokens": len(content) + random.randint(5, 20), + }, + } + + +async def stream_response(model: str, full_text: str): + chunk_size = 5 + + for i in range(0, len(full_text), chunk_size): + chunk = full_text[i : i + chunk_size] + + data = { + "id": f"chatcmpl-{random_text(12)}", + "object": "chat.completion.chunk", + "model": model, + "choices": [ + { + "delta": {"content": chunk}, + "index": 0, + "finish_reason": None, + } + ], + } + + yield f"data: {json.dumps(data)}\n\n" + await asyncio.sleep(0.05) + + yield "data: [DONE]\n\n" + + +# ====== /v1/chat/completions ====== +@app.post("/v1/chat/completions") +async def chat_completions(request: Request): + body = await request.json() + + model = validate_model(body.get("model", CONFIG["models"][0])) + stream = body.get("stream", False) + + # 随机输出 + token_len = random.randint(CONFIG["min_tokens"], CONFIG["max_tokens"]) + content = random_text(token_len) + + # 随机延迟 + await asyncio.sleep(random_delay()) + + if stream: + return StreamingResponse( + stream_response(model, content), + media_type="text/event-stream", + ) + else: + return JSONResponse(build_chat_response(model, content)) + + +# ====== main ====== +def main(): + parser = argparse.ArgumentParser() + + parser.add_argument("--host", default="0.0.0.0") + parser.add_argument("--port", type=int, default=8000) + + parser.add_argument("--models", type=str, default="mock-gpt-4") + parser.add_argument("--min-delay", type=float, default=0.1) + parser.add_argument("--max-delay", type=float, default=1.0) + parser.add_argument("--min-tokens", type=int, default=5) + parser.add_argument("--max-tokens", type=int, default=50) + + args = parser.parse_args() + + CONFIG["models"] = args.models.split(",") + CONFIG["min_delay"] = args.min_delay + CONFIG["max_delay"] = args.max_delay + CONFIG["min_tokens"] = args.min_tokens + CONFIG["max_tokens"] = args.max_tokens + + uvicorn.run(app, host=args.host, port=args.port) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/tools/test_inference.py b/tools/test_inference.py new file mode 100644 index 00000000..1decb23b --- /dev/null +++ b/tools/test_inference.py @@ -0,0 +1,182 @@ +import asyncio +import aiohttp +import time +import argparse +import json +import os + +FAILED_COUNT = 0 +TOTAL_COUNT = 0 +LOCK = asyncio.Lock() + + +async def log_failure(file, record): + global FAILED_COUNT, TOTAL_COUNT + async with LOCK: + FAILED_COUNT += 1 + TOTAL_COUNT += 1 + pct = round(FAILED_COUNT / TOTAL_COUNT * 100, 2) + record["failure_pct"] = pct + record["failed_count"] = FAILED_COUNT + record["total_count"] = TOTAL_COUNT + file.write(json.dumps(record) + "\n") + file.flush() + + +async def worker(session, url, model, file): + global TOTAL_COUNT + + payload = { + "model": model, + "messages": [{"role": "user", "content": "hello"}], + } + + try: + async with session.post(url, json=payload) as resp: + status = resp.status + + if status != 200: + text = await resp.text() + + record = { + "ts": time.time(), + "model": model, + "status": status, + "body": text[:200], + } + + await log_failure(file, record) # increments FAILED_COUNT + TOTAL_COUNT + else: + await resp.text() + async with LOCK: + TOTAL_COUNT += 1 + + except Exception as e: + record = { + "ts": time.time(), + "model": model, + "status": "EXCEPTION", + "error": str(e), + } + + await log_failure(file, record) # increments FAILED_COUNT + TOTAL_COUNT + + +async def run_for_model(model, args, file, headers): + connector = aiohttp.TCPConnector( + limit=args.concurrency, + force_close=True + ) + timeout = aiohttp.ClientTimeout(total=args.timeout) + + stats = { + "model": model, + "requests": 0, + "start": time.time(), + } + + async with aiohttp.ClientSession( + connector=connector, + timeout=timeout, + headers=headers, + ) as session: + + tasks = [] + + for _ in range(args.requests): + tasks.append(worker(session, args.url, model, file)) + + if len(tasks) >= args.concurrency: + await asyncio.gather(*tasks) + stats["requests"] += len(tasks) + tasks = [] + + if tasks: + await asyncio.gather(*tasks) + stats["requests"] += len(tasks) + + stats["duration"] = time.time() - stats["start"] + return stats + + +async def main_async(args): + global FAILED_COUNT + + # ====== API KEY ====== + api_key = args.api_key or os.getenv("OPENAI_API_KEY") + headers = {} + + if api_key: + headers["Authorization"] = f"Bearer {api_key}" + + models = args.models + + # ====== 打开失败日志文件 ====== + with open(args.failed_log, "w") as f: + + # 👉 每个 model 独立并发执行 + results = await asyncio.gather( + *[run_for_model(m, args, f, headers) for m in models] + ) + + # ====== 打印统计 ====== + print("\n=== PER MODEL STATS ===") + total_requests = 0 + total_time = 0 + + for r in results: + rps = r["requests"] / r["duration"] + print(f"Model: {r['model']}") + print(f" Requests: {r['requests']}") + print(f" Time: {r['duration']:.2f}s") + print(f" RPS: {rps:.2f}") + print() + + total_requests += r["requests"] + total_time = max(total_time, r["duration"]) + + print("=== OVERALL ===") + print(f"Total Requests: {total_requests}") + print(f"Total Time: {total_time:.2f}s") + print(f"Effective RPS: {total_requests / total_time:.2f}") + print(f"Failures: {FAILED_COUNT}") + print(f"Failed log file: {args.failed_log}") + + +def main(): + parser = argparse.ArgumentParser() + + parser.add_argument( + "--url", + default="https:///model-proxy/v1/chat/completions", + ) + parser.add_argument("--requests", type=int, default=600) + parser.add_argument("--concurrency", type=int, default=50) + parser.add_argument("--timeout", type=float, default=500) + + parser.add_argument( + "--models", + type=lambda s: s.split(","), + default="gpt-3,gpt-4,gpt-5,gpt-6,gpt-7,gpt-8,gpt-9", + ) + + parser.add_argument( + "--failed-log", + type=str, + default="failed_requests.jsonl", + ) + + parser.add_argument( + "--api-key", + type=str, + default=None, + help="API key or use OPENAI_API_KEY env", + ) + + args = parser.parse_args() + + asyncio.run(main_async(args)) + + +if __name__ == "__main__": + main() \ No newline at end of file