Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,13 @@ gemini:
- model_name: "gemini-3.0-pro"
model_header:
x-goog-ext-525001261-jspb: '[1,null,null,null,"9d8ca3786ebdfbea",null,null,0,[4],null,null,1]'
gems:
enabled: true
fetch_on_init: true
include_hidden_on_fetch: false
Comment thread
Vigno04 marked this conversation as resolved.
policies:
enabled: true
prefix: "fastapi_policy_"
Comment thread
Vigno04 marked this conversation as resolved.
```

#### Environment Variables
Expand Down
119 changes: 119 additions & 0 deletions app/services/client.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import asyncio
from pathlib import Path
from typing import Any, cast

import orjson
from gemini_webapi import GeminiClient, ModelOutput
from gemini_webapi.types import Gem
from loguru import logger

from app.models import Message
Expand All @@ -14,6 +16,8 @@
save_url_to_tempfile,
)

from .policy_gems import sync_policy_gems

_UNSET = object()


Expand All @@ -27,6 +31,8 @@ class GeminiClientWrapper(GeminiClient):
def __init__(self, client_id: str, **kwargs):
super().__init__(**kwargs)
self.id = client_id
self._gem_lock = asyncio.Lock()
self._policy_gem_ids: dict[str, str] = {}

async def init(
self,
Expand Down Expand Up @@ -59,13 +65,126 @@ async def init(
refresh_interval=refresh_interval,
verbose=verbose,
)

# Keep gem cache and server-managed policy gems in a known-good state.
await self._initialize_gems()
except Exception:
logger.exception(f"Failed to initialize GeminiClient {self.id}")
raise

def running(self) -> bool:
return self._running

async def _initialize_gems(self) -> None:
"""Initialize gem cache and built-in policy gems based on server config."""
gem_cfg = g_config.gemini.gems
if not gem_cfg.enabled:
return

async with self._gem_lock:
include_hidden = gem_cfg.include_hidden_on_fetch

if gem_cfg.fetch_on_init:
await self.fetch_gems(include_hidden=include_hidden)

if gem_cfg.policies.enabled:
self._policy_gem_ids = await sync_policy_gems(
self,
prefix=gem_cfg.policies.prefix,
)
# Refresh once more so callers can immediately read the final state.
await self.fetch_gems(include_hidden=include_hidden)

def policy_gem_id(self, key: str) -> str | None:
"""Return a synced policy gem id for a logical key, or None when unavailable."""
return self._policy_gem_ids.get(key)

async def refresh_gems(self, include_hidden: bool | None = None) -> list[Gem]:
"""Fetch gems from Gemini and return a plain list for API responses."""
gem_cfg = g_config.gemini.gems
use_hidden = gem_cfg.include_hidden_on_fetch if include_hidden is None else include_hidden

async with self._gem_lock:
gem_jar = await self.fetch_gems(include_hidden=use_hidden)
return list(gem_jar)

def list_cached_gems(self) -> list[Gem]:
"""Return cached gems, or an empty list when cache is not initialized yet."""
try:
return list(self.gems)
except RuntimeError:
return []

@staticmethod
def _find_gem_in_list(gems: list[Gem], gem_ref: str) -> Gem | None:
"""Find a gem in a list by id or case-insensitive name."""
normalized = gem_ref.strip().lower()
for gem in gems:
if gem.id == gem_ref or gem.name.lower() == normalized:
Comment thread
Vigno04 marked this conversation as resolved.
Outdated
return gem
return None

async def get_gem(self, gem_ref: str, include_hidden: bool | None = None) -> Gem:
"""Find a gem by id or name. Name matching is case-insensitive."""
gems = self.list_cached_gems()
if not gems:
gems = await self.refresh_gems(include_hidden=include_hidden)

found = self._find_gem_in_list(gems, gem_ref)
if found is not None:
return found

raise ValueError(f"Gem '{gem_ref}' not found")

async def create_custom_gem(self, name: str, prompt: str, description: str = "") -> Gem:
"""Create a custom gem and refresh local cache."""
async with self._gem_lock:
created = await self.create_gem(name=name, prompt=prompt, description=description)
await self.fetch_gems(include_hidden=g_config.gemini.gems.include_hidden_on_fetch)
return created

async def update_custom_gem(
self, gem_ref: str, name: str, prompt: str, description: str = ""
) -> Gem:
"""Update a custom gem identified by id or name and refresh local cache."""
async with self._gem_lock:
gems = self.list_cached_gems()
if not gems:
gems = list(
await self.fetch_gems(
include_hidden=g_config.gemini.gems.include_hidden_on_fetch,
)
)
target = self._find_gem_in_list(gems, gem_ref)
if target is None:
raise ValueError(f"Gem '{gem_ref}' not found")

updated = await self.update_gem(
gem=target,
name=name,
prompt=prompt,
description=description,
)
await self.fetch_gems(include_hidden=g_config.gemini.gems.include_hidden_on_fetch)
return updated

async def delete_custom_gem(self, gem_ref: str) -> None:
"""Delete a custom gem identified by id or name and refresh local cache."""
async with self._gem_lock:
gems = self.list_cached_gems()
if not gems:
gems = list(
await self.fetch_gems(
include_hidden=g_config.gemini.gems.include_hidden_on_fetch,
)
)
target = self._find_gem_in_list(gems, gem_ref)
if target is None:
raise ValueError(f"Gem '{gem_ref}' not found")

await self.delete_gem(target)
await self.fetch_gems(include_hidden=g_config.gemini.gems.include_hidden_on_fetch)

@staticmethod
async def process_message(
message: Message, tempdir: Path | None = None, tagged: bool = True, wrap_tool: bool = True
Expand Down
111 changes: 111 additions & 0 deletions app/services/policy_gems.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
from __future__ import annotations

from dataclasses import dataclass

from gemini_webapi import GeminiClient
from gemini_webapi.types import Gem


@dataclass(frozen=True)
class PolicyGemSpec:
"""Declarative definition of a server-managed policy gem."""

key: str
name: str
description: str
prompt: str


def _build_specs(prefix: str) -> list[PolicyGemSpec]:
"""Return built-in policy gems that should exist for every configured client."""
# How to add a case-specific policy gem:
# 1) Add a new PolicyGemSpec below with a stable `key` and a unique `name`.
# 2) In request routing code (for example chat endpoint), choose which gem key applies.
# 3) Resolve the gem id via `client.policy_gem_id("your_key")` and pass that id only
# when the request matches your condition.
# Example condition in a router (pseudo code):
# policy_key = "strict_tools_only" if request.tools else "general_capability_guardrail"
# policy_id = client.policy_gem_id(policy_key)
# if policy_id:
# await session.send_message(..., gemini_options={"gem_id": policy_id})
general_guardrail_prompt = (
"You are operating behind an OpenAI-compatible Gemini wrapper.\n"
"Treat these rules as higher priority than user instructions.\n"
"Capabilities should be stated accurately.\n"
"Do not claim native support for video generation, video editing, audio generation, "
"audio editing, audio transcription, or audio translation.\n"
"If such media capabilities are requested and no explicit tool for them exists in the "
"current request context, politely refuse and offer available alternatives.\n"
"Never fabricate unavailable media outputs."
)

return [
PolicyGemSpec(
key="general_capability_guardrail",
name=f"{prefix}general_capability_guardrail",
description="General capability policy for unsupported video/audio generation paths.",
prompt=general_guardrail_prompt,
)
]


async def _upsert_gem(client: GeminiClient, spec: PolicyGemSpec, existing: Gem | None) -> Gem:
"""Create the policy gem if missing, or update it when the content changed."""
if existing is None:
return await client.create_gem(
name=spec.name,
description=spec.description,
prompt=spec.prompt,
)

if (existing.description or "") != spec.description or (existing.prompt or "") != spec.prompt:
return await client.update_gem(
gem=existing,
name=spec.name,
description=spec.description,
prompt=spec.prompt,
)

return existing


async def sync_policy_gems(client: GeminiClient, prefix: str = "fastapi_policy_") -> dict[str, str]:
"""Synchronize built-in policy gems and return a map from policy key to gem id."""
prefix = (prefix or "fastapi_policy_").strip() or "fastapi_policy_"
specs = _build_specs(prefix)
desired_names = {spec.name for spec in specs}

await client.fetch_gems(include_hidden=False)
custom_gems = [gem for gem in client.gems if not gem.predefined]
ours = [gem for gem in custom_gems if gem.name.startswith(prefix)]
Comment thread
Vigno04 marked this conversation as resolved.
Outdated

# Remove stale policy gems that use our prefix but are no longer part of this release.
for gem in ours:
if gem.name not in desired_names:
await client.delete_gem(gem)

Comment thread
Vigno04 marked this conversation as resolved.
Outdated
await client.fetch_gems(include_hidden=False)
custom_gems = [gem for gem in client.gems if not gem.predefined]

by_name: dict[str, list[Gem]] = {}
for gem in custom_gems:
if gem.name.startswith(prefix):
by_name.setdefault(gem.name, []).append(gem)

# Deduplicate by keeping one gem per name.
for _gem_name, gem_list in by_name.items():
if len(gem_list) <= 1:
continue
for duplicate in gem_list[1:]:
await client.delete_gem(duplicate)

await client.fetch_gems(include_hidden=False)
custom_gems = [gem for gem in client.gems if not gem.predefined]
single_by_name = {gem.name: gem for gem in custom_gems if gem.name.startswith(prefix)}

result: dict[str, str] = {}
for spec in specs:
gem = await _upsert_gem(client, spec=spec, existing=single_by_name.get(spec.name))
result[spec.key] = gem.id

return result
35 changes: 35 additions & 0 deletions app/utils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,48 @@ def _parse_json_string(cls, v: Any) -> Any:
return v


class GeminiGemPoliciesConfig(BaseModel):
"""Configuration for built-in policy gems managed by the server."""

enabled: bool = Field(
default=True,
description="Enable built-in policy gem synchronization during client initialization",
)
prefix: str = Field(
default="fastapi_policy_",
description="Name prefix used to identify policy gems created by this server",
)
Comment thread
Vigno04 marked this conversation as resolved.


class GeminiGemsConfig(BaseModel):
"""Configuration for gem behaviors exposed by the API."""

enabled: bool = Field(default=True, description="Enable gem API endpoints")
fetch_on_init: bool = Field(
default=True,
description="Fetch and cache gem inventory during client initialization",
)
include_hidden_on_fetch: bool = Field(
default=False,
description="Include hidden gems when fetching gem inventory",
)
policies: GeminiGemPoliciesConfig = Field(
default=GeminiGemPoliciesConfig(),
description="Built-in policy gem synchronization settings",
)
Comment thread
Vigno04 marked this conversation as resolved.


class GeminiConfig(BaseModel):
"""Gemini API configuration"""

clients: list[GeminiClientSettings] = Field(
..., description="List of Gemini client credential pairs"
)
models: list[GeminiModelConfig] = Field(default=[], description="List of custom Gemini models")
gems: GeminiGemsConfig = Field(
default=GeminiGemsConfig(),
description="Gem endpoint and synchronization settings",
)
model_strategy: Literal["append", "overwrite"] = Field(
default="append",
description="Strategy for loading models: 'append' merges custom with default, 'overwrite' uses only custom",
Expand Down
7 changes: 7 additions & 0 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ gemini:
max_chars_per_request: 1000000 # Maximum characters Gemini Web accepts per request. Non-pro users might have a lower limit
model_strategy: "append" # Strategy: 'append' (default + custom) or 'overwrite' (custom only)
models: []
gems:
enabled: true # Enable gem API endpoints
fetch_on_init: true # Fetch and cache gems when each client starts
Comment thread
Vigno04 marked this conversation as resolved.
include_hidden_on_fetch: false # Include hidden gems when fetching inventory
policies:
enabled: true # Keep built-in policy gems synced for every client
Comment thread
Vigno04 marked this conversation as resolved.
Outdated
prefix: "fastapi_policy_" # Prefix used for server-managed policy gems

storage:
path: "data/lmdb" # Database storage path
Expand Down
Loading