diff --git a/collectivus-plugin-kernel-types.d.ts b/collectivus-plugin-kernel-types.d.ts index 02a2c77..69aebad 100644 --- a/collectivus-plugin-kernel-types.d.ts +++ b/collectivus-plugin-kernel-types.d.ts @@ -44,6 +44,13 @@ export type PluginName = string * The capability VALUE is a `TableFormatProvider`. * - `hypaware.http-endpoint` — request destination capability, provided * by request sinks (`@hypaware/central`, future `@hypaware/webhook`). + * - `hypaware.embedder` — text embedding production, provided by + * embedder plugins (`@hypaware/embedder-openai`, future local + * embedders). The capability VALUE is an `EmbedderCapability`. + * Consumed by `@hypaware/vector-search`. + * - `hypaware.vector-search` — vector similarity search over cached + * datasets, provided by `@hypaware/vector-search`. The capability + * VALUE is a `VectorSearchCapability`. * * Plugins are free to define new capability names; the kernel does not * gate registration on an enum. @@ -1364,6 +1371,124 @@ export interface AiGatewayProjectedMessage { stop_reason?: string } +// ============================================================================= +// Embedder capability (`hypaware.embedder@1.0.0`) +// ============================================================================= + +/** + * Text-embedding production, provided by embedder plugins (the first is + * `@hypaware/embedder-openai`; a local embedder is the intended + * follow-up). Consumers (`@hypaware/vector-search`) require this + * capability rather than binding to a specific provider, so swapping + * embedders is a config change, not a refactor. + * + * The same `model` MUST be used at index-build and query time; index + * consumers store `model` and `dimension` alongside their artifacts and + * treat a mismatch as staleness (rebuild) or a hard error, never a + * silent degraded result. + */ +export interface EmbedderCapability { + /** Stable provider identifier (e.g. `"openai-compatible"`). */ + provider: string + /** Model identifier sent with every request (e.g. `"text-embedding-3-small"`). */ + model: string + /** + * Requested output dimension, when the provider is configured to + * shorten vectors (e.g. OpenAI v3 models' `dimensions` parameter). + * Index consumers treat drift between this and a stored artifact's + * dimension as staleness, exactly like a model change. + */ + dimensions?: number + /** + * Embed a batch of texts. Returns one vector per input text, in input + * order. Implementations chunk into provider-sized requests + * internally; callers hand over whatever batch they have. + */ + embed(texts: string[], opts?: EmbedOptions): Promise +} + +export interface EmbedOptions { + signal?: AbortSignal +} + +export interface EmbedResult { + /** One vector per input text, aligned with the input order. */ + vectors: Float32Array[] + /** Vector length; identical across the batch. */ + dimension: number + /** Model that actually produced the vectors. */ + model: string + usage?: EmbedUsage +} + +export interface EmbedUsage { + prompt_tokens?: number + total_tokens?: number +} + +// ============================================================================= +// Vector search capability (`hypaware.vector-search@1.0.0`) +// ============================================================================= + +/** + * Vector similarity search over cached datasets, provided by + * `@hypaware/vector-search`. Indexes are declared in config; artifacts + * are per-host plugin state sharded one file per cache partition. + */ +export interface VectorSearchCapability { + /** Embed the query text and return the merged top-K across shards. */ + search(opts: VectorSearchOptions): Promise + /** Per-index, per-partition shard coverage and staleness. */ + status(): Promise +} + +export interface VectorSearchOptions { + query: string + /** Restrict to one configured index (default: all indexes). */ + index?: string + /** Restrict to indexes over one dataset (default: all datasets). */ + dataset?: string + topK?: number + /** + * `auto` (default) refreshes missing/stale shards before searching + * (declaration, model, and dimension drift all classify stale); + * `never` searches existing shards only and hard-errors on an + * embedder model or dimension mismatch. + */ + refresh?: 'auto' | 'never' + signal?: AbortSignal +} + +export interface VectorSearchHit { + index: string + dataset: string + partition: Record + /** Shard row id (content hash by default, or the index's `id_column` value). */ + id: string + /** Similarity score; higher is better (cosine over normalized vectors). */ + score: number + /** Source column text for the hit, when resolvable from the cache. */ + text?: string +} + +export interface VectorIndexStatus { + index: string + dataset: string + column: string + model: string + shards: VectorShardStatus[] +} + +export interface VectorShardStatus { + partition: Record + state: 'fresh' | 'stale_rows' | 'stale_model' | 'stale_dimension' | 'stale_config' | 'missing' | 'orphan' + /** Embedded (deduplicated) vector count in the shard, when built. */ + rows?: number + model?: string + dimension?: number + built_at?: string +} + // ============================================================================= // Skills and init presets // ============================================================================= diff --git a/hypaware-core/plugins-workspace/embedder-openai/hypaware.plugin.json b/hypaware-core/plugins-workspace/embedder-openai/hypaware.plugin.json new file mode 100644 index 0000000..df78d84 --- /dev/null +++ b/hypaware-core/plugins-workspace/embedder-openai/hypaware.plugin.json @@ -0,0 +1,19 @@ +{ + "schema_version": 1, + "name": "@hypaware/embedder-openai", + "version": "1.0.0", + "hypaware_api": "^1.0.0", + "runtime": "node", + "node_engine": ">=20", + "entrypoint": "./src/index.js", + "description": "OpenAI-compatible text embedder for HypAware. Speaks POST /v1/embeddings against a configurable base_url, so one plugin covers OpenAI, compatible proxies, and local servers (Ollama, LM Studio). Provides hypaware.embedder for @hypaware/vector-search.", + "permissions": ["network", "read_env"], + "provides": { + "capabilities": { + "hypaware.embedder": "1.0.0" + } + }, + "contributes": { + "config_sections": [{ "section": "embedder-openai" }] + } +} diff --git a/hypaware-core/plugins-workspace/embedder-openai/src/client.js b/hypaware-core/plugins-workspace/embedder-openai/src/client.js new file mode 100644 index 0000000..e4c9e5d --- /dev/null +++ b/hypaware-core/plugins-workspace/embedder-openai/src/client.js @@ -0,0 +1,244 @@ +// @ts-check + +import { Attr, withSpan } from '../../../../src/core/observability/index.js' +import { embeddingsEndpoint } from './config.js' + +/** + * @import { EmbedderCapability, EmbedResult, HypError } from '../../../../collectivus-plugin-kernel-types.d.ts' + * @import { CreateEmbedderOptions, EmbedderOpenAiConfig, FetchLike } from './types.d.ts' + */ + +const PLUGIN_NAME = '@hypaware/embedder-openai' + +/** + * Build the `hypaware.embedder` capability value: an HTTP client for + * the OpenAI-compatible `POST /v1/embeddings` shape. + * + * Batches larger than `max_batch` are chunked into sequential requests; + * the returned vectors are always aligned with the input order. There + * is deliberately no retry layer — callers (the vector-search refresh + * tick, an interactive search) already re-run on their own cadence, so + * a failed request surfaces immediately instead of stalling a tick. + * + * @param {CreateEmbedderOptions} opts + * @returns {EmbedderCapability} + */ +export function createOpenAiEmbedder(opts) { + const { config, env, log } = opts + const fetchImpl = opts.fetchImpl ?? /** @type {FetchLike} */ (/** @type {unknown} */ (globalThis.fetch)) + const endpoint = embeddingsEndpoint(config.base_url) + + return { + provider: 'openai-compatible', + model: config.model, + ...(config.dimensions !== undefined ? { dimensions: config.dimensions } : {}), + + async embed(texts, embedOpts) { + if (!Array.isArray(texts) || texts.length === 0) { + throw newEmbedderError('embedder_empty_input', 'embed() requires a non-empty array of texts') + } + return withSpan( + 'embedder.embed', + { + [Attr.COMPONENT]: 'embedder', + [Attr.OPERATION]: 'embedder.embed', + [Attr.PLUGIN]: PLUGIN_NAME, + embed_model: config.model, + text_count: texts.length, + status: 'ok', + }, + async (span) => { + /** @type {Float32Array[]} */ + const vectors = [] + let dimension = 0 + let promptTokens = 0 + let totalTokens = 0 + let requestCount = 0 + + for (let i = 0; i < texts.length; i += config.max_batch) { + const batch = texts.slice(i, i + config.max_batch) + const result = await requestEmbeddings({ + batch, + config, + env, + endpoint, + fetchImpl, + signal: embedOpts?.signal, + }) + requestCount++ + for (const v of result.vectors) { + if (dimension === 0) dimension = v.length + else if (v.length !== dimension) { + throw newEmbedderError( + 'embedder_dimension_mismatch', + `embedding dimension changed mid-batch (${dimension} -> ${v.length}) for model '${config.model}'` + ) + } + vectors.push(v) + } + promptTokens += result.usage?.prompt_tokens ?? 0 + totalTokens += result.usage?.total_tokens ?? 0 + } + + span.setAttribute('request_count', requestCount) + span.setAttribute('dimension', dimension) + span.setAttribute('prompt_tokens', promptTokens) + + /** @type {EmbedResult} */ + const out = { + vectors, + dimension, + model: config.model, + usage: { prompt_tokens: promptTokens, total_tokens: totalTokens }, + } + return out + }, + { component: 'embedder' } + ).catch((/** @type {unknown} */ err) => { + const errorKind = /** @type {HypError} */ (err)?.hypErrorKind ?? 'embedder_failed' + // Text content and key material never reach logs — counts only. + log.error('embedder.embed_failed', { + [Attr.ERROR_KIND]: errorKind, + embed_model: config.model, + text_count: texts.length, + message: err instanceof Error ? err.message : String(err), + }) + throw err + }) + }, + } +} + +/** + * One `POST /v1/embeddings` request. The API key resolves from the + * environment at call time and is used only for the Authorization + * header — it is never logged, never thrown in a message, and never + * stored on the embedder. When the configured env var is unset the + * request goes out without an Authorization header so localhost + * servers (Ollama, LM Studio) work with zero credential config; a 401 + * from a real provider then names the env var to set. + * + * @param {{ + * batch: string[], + * config: EmbedderOpenAiConfig, + * env: NodeJS.ProcessEnv, + * endpoint: string, + * fetchImpl: FetchLike, + * signal: AbortSignal | undefined, + * }} args + * @returns {Promise<{ vectors: Float32Array[], usage?: { prompt_tokens?: number, total_tokens?: number } }>} + * @ref LLP 0024#embedder-speaks-openai-compatible-base_url-configurable [implements] — config names the env var; the key resolves at call time and never reaches logs + */ +async function requestEmbeddings({ batch, config, env, endpoint, fetchImpl, signal }) { + const apiKey = env[config.api_key_env] + /** @type {Record} */ + const headers = { 'content-type': 'application/json' } + if (apiKey) headers.authorization = `Bearer ${apiKey}` + + const timeoutSignal = AbortSignal.timeout(config.timeout_ms) + const requestSignal = signal ? AbortSignal.any([signal, timeoutSignal]) : timeoutSignal + + /** @type {Record} */ + const body = { + model: config.model, + input: batch, + encoding_format: 'float', + } + if (config.dimensions !== undefined) body.dimensions = config.dimensions + + /** @type {Awaited>} */ + let response + try { + response = await fetchImpl(endpoint, { + method: 'POST', + headers, + body: JSON.stringify(body), + signal: requestSignal, + }) + } catch (err) { + if (timeoutSignal.aborted) { + throw newEmbedderError( + 'embedder_timeout', + `embeddings request to ${endpoint} timed out after ${config.timeout_ms}ms` + ) + } + const message = err instanceof Error ? err.message : String(err) + throw newEmbedderError('embedder_request_failed', `embeddings request to ${endpoint} failed: ${message}`) + } + + if (!response.ok) { + // The response body is deliberately not read into the error: error + // messages get logged, and a provider or proxy may echo the input + // texts (captured content) or credential material back in its error + // detail. Status, endpoint, and error kind are enough to diagnose; + // every value here comes from config, never from the provider. + const hint = + (response.status === 401 || response.status === 403) && !apiKey + ? ` (no API key sent: env var ${config.api_key_env} is unset)` + : '' + const err = newEmbedderError( + `embedder_http_${response.status}`, + `embeddings request to ${endpoint} failed with HTTP ${response.status}${hint}` + ) + err.status = response.status + throw err + } + + /** @type {unknown} */ + let payload + try { + payload = await response.json() + } catch (err) { + const message = err instanceof Error ? err.message : String(err) + throw newEmbedderError('embedder_bad_response', `embeddings response from ${endpoint} is not JSON: ${message}`) + } + + return parseEmbeddingsPayload(payload, batch.length, endpoint) +} + +/** + * Validate and convert the response payload. `data[]` entries carry an + * `index` field; sort by it rather than trusting array order, per the + * OpenAI contract. + * + * @param {unknown} payload + * @param {number} expectedCount + * @param {string} endpoint + * @returns {{ vectors: Float32Array[], usage?: { prompt_tokens?: number, total_tokens?: number } }} + */ +export function parseEmbeddingsPayload(payload, expectedCount, endpoint) { + const data = /** @type {{ data?: unknown, usage?: { prompt_tokens?: number, total_tokens?: number } }} */ (payload ?? {}) + if (!Array.isArray(data.data) || data.data.length !== expectedCount) { + throw newEmbedderError( + 'embedder_bad_response', + `embeddings response from ${endpoint} returned ${Array.isArray(data.data) ? data.data.length : 'no'} embeddings for ${expectedCount} inputs` + ) + } + /** @type {Float32Array[]} */ + const vectors = new Array(expectedCount) + for (const entry of data.data) { + const item = /** @type {{ index?: unknown, embedding?: unknown }} */ (entry ?? {}) + const index = typeof item.index === 'number' ? item.index : -1 + if (index < 0 || index >= expectedCount || !Array.isArray(item.embedding) || item.embedding.length === 0) { + throw newEmbedderError('embedder_bad_response', `embeddings response from ${endpoint} has a malformed data entry`) + } + vectors[index] = Float32Array.from(/** @type {number[]} */ (item.embedding)) + } + for (let i = 0; i < expectedCount; i++) { + if (!vectors[i]) { + throw newEmbedderError('embedder_bad_response', `embeddings response from ${endpoint} is missing index ${i}`) + } + } + return { vectors, usage: data.usage } +} + +/** + * @param {string} kind + * @param {string} message + * @returns {HypError} + */ +function newEmbedderError(kind, message) { + const err = /** @type {HypError} */ (new Error(message)) + err.hypErrorKind = kind + return err +} diff --git a/hypaware-core/plugins-workspace/embedder-openai/src/config.js b/hypaware-core/plugins-workspace/embedder-openai/src/config.js new file mode 100644 index 0000000..1caa098 --- /dev/null +++ b/hypaware-core/plugins-workspace/embedder-openai/src/config.js @@ -0,0 +1,130 @@ +// @ts-check + +/** + * Config validation for `@hypaware/embedder-openai`. Mirrors the + * `@hypaware/s3` validator shape: pure, dependency-free, returns a + * normalized config or a list of `embedder_config_invalid` errors so it + * is callable from tests without spinning up observability. + */ + +/** + * @import { EmbedderConfigError, EmbedderConfigResult } from './types.d.ts' + */ + +// @ref LLP 0024#embedder-speaks-openai-compatible-base_url-configurable [implements] — defaults cover OpenAI; base_url override covers proxies and localhost servers +export const DEFAULT_BASE_URL = 'https://api.openai.com' +export const DEFAULT_MODEL = 'text-embedding-3-small' +export const DEFAULT_API_KEY_ENV = 'OPENAI_API_KEY' +export const DEFAULT_MAX_BATCH = 128 +export const DEFAULT_TIMEOUT_MS = 30_000 + +/** + * Validate the plugin's config slice. Every field is optional — the + * zero-config default targets OpenAI with `OPENAI_API_KEY`. + * + * @param {unknown} value + * @returns {EmbedderConfigResult} + */ +export function validateEmbedderConfig(value) { + /** @type {EmbedderConfigError[]} */ + const errors = [] + + if (value !== undefined && (value === null || typeof value !== 'object' || Array.isArray(value))) { + errors.push(invalid('', 'embedder-openai config must be an object')) + return { ok: false, errors } + } + + const raw = /** @type {Record} */ (value ?? {}) + + const baseUrl = readString(raw, 'base_url', errors) ?? DEFAULT_BASE_URL + const model = readString(raw, 'model', errors) ?? DEFAULT_MODEL + const apiKeyEnv = readString(raw, 'api_key_env', errors) ?? DEFAULT_API_KEY_ENV + const dimensions = readPositiveInt(raw, 'dimensions', errors) + const maxBatch = readPositiveInt(raw, 'max_batch', errors) ?? DEFAULT_MAX_BATCH + const timeoutMs = readPositiveInt(raw, 'timeout_ms', errors) ?? DEFAULT_TIMEOUT_MS + + if (raw.base_url !== undefined && typeof raw.base_url === 'string') { + try { + const u = new URL(/** @type {string} */ (raw.base_url)) + if (u.protocol !== 'http:' && u.protocol !== 'https:') { + errors.push(invalid('/base_url', `base_url must be http(s); got '${u.protocol}'`)) + } + } catch { + errors.push(invalid('/base_url', `base_url is not a valid URL: '${raw.base_url}'`)) + } + } + + if (errors.length > 0) return { ok: false, errors } + + return { + ok: true, + config: { + base_url: stripTrailingSlash(baseUrl), + model, + api_key_env: apiKeyEnv, + ...(dimensions !== undefined ? { dimensions } : {}), + max_batch: maxBatch, + timeout_ms: timeoutMs, + }, + } +} + +/** + * Endpoint for a normalized base_url. Accepts both bare origins + * (`https://api.openai.com`) and `/v1`-suffixed bases + * (`http://localhost:11434/v1`, the form Ollama documents) so users can + * paste either without a double `/v1/v1`. + * + * @param {string} baseUrl + * @returns {string} + */ +export function embeddingsEndpoint(baseUrl) { + const base = stripTrailingSlash(baseUrl) + return base.endsWith('/v1') ? `${base}/embeddings` : `${base}/v1/embeddings` +} + +/** @param {string} url */ +function stripTrailingSlash(url) { + return url.replace(/\/+$/, '') +} + +/** + * @param {string} pointer + * @param {string} message + * @returns {EmbedderConfigError} + */ +function invalid(pointer, message) { + return { pointer, message, errorKind: 'embedder_config_invalid' } +} + +/** + * @param {Record} raw + * @param {string} key + * @param {EmbedderConfigError[]} errors + * @returns {string | undefined} + */ +function readString(raw, key, errors) { + const v = raw[key] + if (v === undefined) return undefined + if (typeof v !== 'string' || v.length === 0) { + errors.push(invalid(`/${key}`, `${key} must be a non-empty string`)) + return undefined + } + return v +} + +/** + * @param {Record} raw + * @param {string} key + * @param {EmbedderConfigError[]} errors + * @returns {number | undefined} + */ +function readPositiveInt(raw, key, errors) { + const v = raw[key] + if (v === undefined) return undefined + if (typeof v !== 'number' || !Number.isInteger(v) || v <= 0) { + errors.push(invalid(`/${key}`, `${key} must be a positive integer`)) + return undefined + } + return v +} diff --git a/hypaware-core/plugins-workspace/embedder-openai/src/index.js b/hypaware-core/plugins-workspace/embedder-openai/src/index.js new file mode 100644 index 0000000..bee8766 --- /dev/null +++ b/hypaware-core/plugins-workspace/embedder-openai/src/index.js @@ -0,0 +1,63 @@ +// @ts-check + +import { createOpenAiEmbedder } from './client.js' +import { validateEmbedderConfig } from './config.js' + +/** + * @import { HypError, PluginActivationContext, ValidationResult } from '../../../../collectivus-plugin-kernel-types.d.ts' + */ + +const PLUGIN_NAME = '@hypaware/embedder-openai' +const CAPABILITY_VERSION = '1.0.0' + +/** + * Activate `@hypaware/embedder-openai`. Registers the + * `hypaware.embedder@1` capability backed by an OpenAI-compatible + * `POST /v1/embeddings` client. + * + * Activation itself performs no network IO and reads no credentials — + * the API key resolves from the environment per request. Enabling this + * plugin is the explicit opt-in that allows captured content (indexed + * text and query strings) to leave the machine when `base_url` points + * at a remote provider; a localhost `base_url` keeps the path fully + * local. + * + * @param {PluginActivationContext} ctx + * @ref LLP 0024#embedding-is-a-separate-capability [implements] — embedding is its own capability; choosing this provider is an explicit `plugins[]` config decision + */ +export async function activate(ctx) { + ctx.configRegistry.registerSection({ + plugin: PLUGIN_NAME, + section: 'embedder-openai', + validate: (value) => toValidationResult(validateEmbedderConfig(value)), + }) + + const validated = validateEmbedderConfig(ctx.config) + if (!validated.ok) { + const detail = validated.errors.map((e) => `${e.pointer || '/'}: ${e.message}`).join('; ') + const err = /** @type {HypError} */ (new Error(`${PLUGIN_NAME}: invalid config — ${detail}`)) + err.hypErrorKind = 'embedder_config_invalid' + throw err + } + const config = validated.config + + const embedder = createOpenAiEmbedder({ config, env: ctx.env, log: ctx.log }) + ctx.provideCapability('hypaware.embedder', CAPABILITY_VERSION, embedder) + + // base_url, model, and the env var NAME are safe to log; the key + // value never is. + ctx.log.info('embedder.activated', { + base_url: config.base_url, + embed_model: config.model, + api_key_env: config.api_key_env, + }) +} + +/** + * @param {ReturnType} result + * @returns {ValidationResult} + */ +function toValidationResult(result) { + if (result.ok) return { ok: true } + return { ok: false, errors: result.errors.map((e) => ({ pointer: e.pointer, message: e.message })) } +} diff --git a/hypaware-core/plugins-workspace/embedder-openai/src/types.d.ts b/hypaware-core/plugins-workspace/embedder-openai/src/types.d.ts new file mode 100644 index 0000000..05957b8 --- /dev/null +++ b/hypaware-core/plugins-workspace/embedder-openai/src/types.d.ts @@ -0,0 +1,54 @@ +/** + * Plugin-local types for `@hypaware/embedder-openai`. The capability + * surface (`EmbedderCapability` and friends) lives in the kernel types + * file; these shapes cover config validation and client construction. + */ + +import type { EmbedderCapability, PluginLogger } from '../../../../collectivus-plugin-kernel-types.d.ts' + +export interface EmbedderOpenAiConfig { + /** Origin (or origin + `/v1`) of an OpenAI-compatible embeddings API. */ + base_url: string + /** Model identifier sent with every request. */ + model: string + /** Name of the environment variable holding the API key. */ + api_key_env: string + /** Optional fixed output dimension (v3 models support shortening). */ + dimensions?: number + /** Max texts per HTTP request; larger batches are chunked. */ + max_batch: number + /** Per-request timeout in milliseconds. */ + timeout_ms: number +} + +export interface EmbedderConfigError { + pointer: string + message: string + errorKind: 'embedder_config_invalid' +} + +export type EmbedderConfigResult = + | { ok: true, config: EmbedderOpenAiConfig } + | { ok: false, errors: EmbedderConfigError[] } + +export type FetchLike = (url: string, init: { + method: string, + headers: Record, + body: string, + signal?: AbortSignal, +}) => Promise<{ + ok: boolean, + status: number, + json(): Promise, + text(): Promise, +}> + +export interface CreateEmbedderOptions { + config: EmbedderOpenAiConfig + env: NodeJS.ProcessEnv + log: PluginLogger + /** Injection seam for tests; defaults to global fetch. */ + fetchImpl?: FetchLike +} + +export type CreateEmbedder = (opts: CreateEmbedderOptions) => EmbedderCapability diff --git a/hypaware-core/plugins-workspace/vector-search/hypaware.plugin.json b/hypaware-core/plugins-workspace/vector-search/hypaware.plugin.json new file mode 100644 index 0000000..12ea102 --- /dev/null +++ b/hypaware-core/plugins-workspace/vector-search/hypaware.plugin.json @@ -0,0 +1,30 @@ +{ + "schema_version": 1, + "name": "@hypaware/vector-search", + "version": "1.0.0", + "hypaware_api": "^1.0.0", + "runtime": "node", + "node_engine": ">=20", + "entrypoint": "./src/index.js", + "description": "Vector similarity search over cached datasets via hypvector. Indexes are declared in config, sharded one file per cache partition under plugin state, and kept fresh by a daemon timer plus search-time refresh.", + "permissions": ["read_state", "write_state"], + "requires": { + "capabilities": { + "hypaware.embedder": "^1.0.0" + } + }, + "provides": { + "capabilities": { + "hypaware.vector-search": "1.0.0" + } + }, + "contributes": { + "commands": [ + { "name": "vector" }, + { "name": "vector search" }, + { "name": "vector status" } + ], + "sources": [{ "name": "vector-search-refresh" }], + "config_sections": [{ "section": "vector-search" }] + } +} diff --git a/hypaware-core/plugins-workspace/vector-search/src/commands.js b/hypaware-core/plugins-workspace/vector-search/src/commands.js new file mode 100644 index 0000000..84de8bb --- /dev/null +++ b/hypaware-core/plugins-workspace/vector-search/src/commands.js @@ -0,0 +1,185 @@ +// @ts-check + +import { applyContextControls, renderResult } from '../../../../src/core/query/format.js' +import { getVectorSearchRuntime } from './runtime.js' +import { searchVectorIndexes } from './search.js' +import { partitionLabel } from './shards.js' +import { collectIndexStatus } from './status.js' + +/** + * @import { CommandRunContext, HypError } from '../../../../collectivus-plugin-kernel-types.d.ts' + * @import { QueryFormat } from '../../../../src/core/query/types.d.ts' + */ + +// Mirrors the `hyp query sql` inline-output defaults so vector results +// are bounded the same way SQL results are. +const DEFAULT_MAX_CELL = 200 +const DEFAULT_MAX_BYTES = 32_768 + +const SEARCH_USAGE = + 'usage: hyp vector search [--index ] [--dataset ] [--top-k ] [--no-refresh] [--format ] [--max-cell ] [--max-bytes ]' + +/** + * @param {string[]} _argv + * @param {CommandRunContext} ctx + */ +export async function runVector(_argv, ctx) { + ctx.stdout.write('hyp vector \n') + ctx.stdout.write(' search similarity search across configured indexes\n') + ctx.stdout.write(' status per-index shard coverage and staleness\n') + return 0 +} + +/** + * @param {string[]} argv + * @param {CommandRunContext} ctx + * @ref LLP 0024#cli-surface [implements] — contributed through the CLI registry; results format through the intrinsic formatter + */ +export async function runVectorSearch(argv, ctx) { + const parsed = parseVectorSearchArgv(argv) + if (!parsed.ok) { + ctx.stderr.write(parsed.error + '\n') + return 2 + } + try { + const runtime = getVectorSearchRuntime() + const hits = await searchVectorIndexes({ + runtime, + opts: { + query: parsed.query, + index: parsed.index, + dataset: parsed.dataset, + topK: parsed.topK, + refresh: parsed.refresh, + }, + onProgress: (line) => ctx.stderr.write(line + '\n'), + }) + + const rows = hits.map((hit) => ({ + score: Math.round(hit.score * 10_000) / 10_000, + index: hit.index, + partition: partitionLabel(hit.partition), + id: hit.id, + text: hit.text ?? null, + })) + const { result, notice } = applyContextControls( + { columns: ['score', 'index', 'partition', 'id', 'text'], rows }, + { maxCell: parsed.maxCell, maxBytes: parsed.maxBytes } + ) + if (notice) ctx.stderr.write(notice + '\n') + ctx.stdout.write(renderResult(result, parsed.format)) + return 0 + } catch (err) { + const kind = /** @type {HypError} */ (err)?.hypErrorKind + const message = err instanceof Error ? err.message : String(err) + ctx.stderr.write(`hyp vector search: ${message}\n`) + return kind === 'vector_no_indexes' ? 2 : 1 + } +} + +/** + * @param {string[]} argv + * @param {CommandRunContext} ctx + */ +export async function runVectorStatus(argv, ctx) { + const json = argv.includes('--json') + try { + const runtime = getVectorSearchRuntime() + const statuses = await collectIndexStatus(runtime) + if (json) { + ctx.stdout.write(JSON.stringify(statuses, null, 2) + '\n') + return 0 + } + if (statuses.length === 0) { + ctx.stdout.write('no vector indexes configured\n') + return 0 + } + for (const status of statuses) { + ctx.stdout.write(`index: ${status.index} (dataset=${status.dataset} column=${status.column} model=${status.model})\n`) + if (status.shards.length === 0) { + ctx.stdout.write(' (no cache partitions yet)\n') + continue + } + for (const shard of status.shards) { + const partition = partitionLabel(shard.partition) + const extras = [] + if (shard.rows !== undefined) extras.push(`rows=${shard.rows}`) + if (shard.dimension) extras.push(`dim=${shard.dimension}`) + if (shard.model && shard.model !== status.model) extras.push(`model=${shard.model}`) + if (shard.built_at) extras.push(`built=${shard.built_at}`) + ctx.stdout.write(` ${partition} ${shard.state}${extras.length ? ' ' + extras.join(' ') : ''}\n`) + } + } + return 0 + } catch (err) { + const message = err instanceof Error ? err.message : String(err) + ctx.stderr.write(`hyp vector status: ${message}\n`) + return 1 + } +} + +/** + * @param {string[]} argv + * @returns {{ ok: true, query: string, index: string | undefined, dataset: string | undefined, topK: number, refresh: 'auto' | 'never', format: QueryFormat, maxCell: number, maxBytes: number } | { ok: false, error: string }} + */ +export function parseVectorSearchArgv(argv) { + /** @type {string[]} */ + const positional = [] + /** @type {string | undefined} */ + let index + /** @type {string | undefined} */ + let dataset + let topK = 10 + /** @type {'auto' | 'never'} */ + let refresh = 'auto' + /** @type {QueryFormat} */ + let format = 'table' + let maxCell = DEFAULT_MAX_CELL + let maxBytes = DEFAULT_MAX_BYTES + + for (let i = 0; i < argv.length; i += 1) { + const token = argv[i] + if (token === '--index' || token === '--dataset') { + const value = argv[i + 1] + if (value === undefined || value.startsWith('--')) { + return { ok: false, error: `hyp vector search: ${token} expects a name` } + } + if (token === '--index') index = value + else dataset = value + i += 1 + } else if (token === '--top-k' || token === '-k') { + const value = argv[i + 1] + const n = Number(value) + if (value === undefined || !Number.isInteger(n) || n <= 0) { + return { ok: false, error: `hyp vector search: --top-k expects a positive integer (got ${value ?? ''})` } + } + topK = n + i += 1 + } else if (token === '--no-refresh') { + refresh = 'never' + } else if (token === '--format') { + const value = argv[i + 1] + if (value !== 'table' && value !== 'json' && value !== 'jsonl' && value !== 'markdown') { + return { ok: false, error: `hyp vector search: --format expects one of table|json|jsonl|markdown (got ${value ?? ''})` } + } + format = value + i += 1 + } else if (token === '--max-cell' || token === '--max-bytes') { + const value = argv[i + 1] + const n = Number(value) + if (value === undefined || !Number.isInteger(n) || n < 0) { + return { ok: false, error: `hyp vector search: ${token} expects a non-negative integer (got ${value ?? ''})` } + } + if (token === '--max-cell') maxCell = n + else maxBytes = n + i += 1 + } else { + positional.push(token) + } + } + + if (positional.length === 0) { + return { ok: false, error: SEARCH_USAGE } + } + return { ok: true, query: positional.join(' '), index, dataset, topK, refresh, format, maxCell, maxBytes } +} diff --git a/hypaware-core/plugins-workspace/vector-search/src/config.js b/hypaware-core/plugins-workspace/vector-search/src/config.js new file mode 100644 index 0000000..e9eaa9b --- /dev/null +++ b/hypaware-core/plugins-workspace/vector-search/src/config.js @@ -0,0 +1,200 @@ +// @ts-check + +/** + * Config validation for `@hypaware/vector-search`. Pure and + * dependency-free so tests exercise it directly. + */ + +/** + * @import { VectorConfigError, VectorConfigResult, VectorIndexDeclaration } from './types.d.ts' + */ + +// Deliberately longer than cache maintenance's 60-minute default: index +// freshness is a background nicety, and every tick can spend embedding +// API tokens. +// @ref LLP 0024#freshness-rides-the-cache-maintenance-pattern [implements] — own interval + max_tick_ms budget, modeled on the maintenance tick +export const REFRESH_DEFAULTS = Object.freeze({ + enabled: true, + interval_minutes: 240, + max_tick_ms: 30_000, + // Per-tick embedding spend bound (rows). Soft: checked before each + // shard build so one oversized partition can overshoot once rather + // than starve forever. + // @ref LLP 0024#open-questions [implements] — per-tick row budget resolves the cost-visibility question for the daemon timer + max_rows_per_tick: 5_000, +}) + +const INDEX_NAME_RE = /^[A-Za-z0-9][A-Za-z0-9._-]*$/ + +/** + * Validate the plugin's config slice. `indexes` defaults to empty (the + * plugin activates but has nothing to refresh or search); refresh + * budgets default per {@link REFRESH_DEFAULTS}. + * + * @param {unknown} value + * @returns {VectorConfigResult} + * @ref LLP 0024#indexes-are-declared-in-config-sharded-per-partition [implements] — index definitions are portable config, not per-host state + */ +export function validateVectorSearchConfig(value) { + /** @type {VectorConfigError[]} */ + const errors = [] + + if (value !== undefined && (value === null || typeof value !== 'object' || Array.isArray(value))) { + errors.push(invalid('', 'vector-search config must be an object')) + return { ok: false, errors } + } + const raw = /** @type {Record} */ (value ?? {}) + + /** @type {VectorIndexDeclaration[]} */ + const indexes = [] + if (raw.indexes !== undefined) { + if (!Array.isArray(raw.indexes)) { + errors.push(invalid('/indexes', 'indexes must be an array')) + } else { + raw.indexes.forEach((entry, i) => { + const decl = readIndexDeclaration(entry, `/indexes/${i}`, errors) + if (decl) indexes.push(decl) + }) + } + } + + const seen = new Set() + for (const decl of indexes) { + if (seen.has(decl.name)) { + errors.push(invalid('/indexes', `duplicate index name '${decl.name}'`)) + } + seen.add(decl.name) + } + + const refreshRaw = raw.refresh + if (refreshRaw !== undefined && (refreshRaw === null || typeof refreshRaw !== 'object' || Array.isArray(refreshRaw))) { + errors.push(invalid('/refresh', 'refresh must be an object')) + } + const refresh = /** @type {Record} */ ( + refreshRaw && typeof refreshRaw === 'object' && !Array.isArray(refreshRaw) ? refreshRaw : {} + ) + const enabled = readBoolean(refresh, 'enabled', '/refresh', errors) ?? REFRESH_DEFAULTS.enabled + // Positive number (not integer): smokes drive sub-minute ticks. + const intervalMinutes = readPositiveNumber(refresh, 'interval_minutes', '/refresh', errors) + ?? REFRESH_DEFAULTS.interval_minutes + const maxTickMs = readPositiveNumber(refresh, 'max_tick_ms', '/refresh', errors) ?? REFRESH_DEFAULTS.max_tick_ms + const maxRowsPerTick = readPositiveNumber(refresh, 'max_rows_per_tick', '/refresh', errors) + ?? REFRESH_DEFAULTS.max_rows_per_tick + + if (errors.length > 0) return { ok: false, errors } + + return { + ok: true, + config: { + indexes, + refresh: { + enabled, + interval_minutes: intervalMinutes, + max_tick_ms: maxTickMs, + max_rows_per_tick: maxRowsPerTick, + }, + }, + } +} + +/** + * @param {unknown} entry + * @param {string} pointer + * @param {VectorConfigError[]} errors + * @returns {VectorIndexDeclaration | null} + */ +function readIndexDeclaration(entry, pointer, errors) { + if (entry === null || typeof entry !== 'object' || Array.isArray(entry)) { + errors.push(invalid(pointer, 'index declaration must be an object')) + return null + } + const raw = /** @type {Record} */ (entry) + const dataset = readRequiredString(raw, 'dataset', pointer, errors) + const column = readRequiredString(raw, 'column', pointer, errors) + if (!dataset || !column) return null + + let name = `${dataset}.${column}` + if (raw.name !== undefined) { + if (typeof raw.name !== 'string' || raw.name.length === 0) { + errors.push(invalid(`${pointer}/name`, 'name must be a non-empty string')) + return null + } + name = raw.name + } + // The name becomes a state-dir path segment; reject traversal. + if (!INDEX_NAME_RE.test(name)) { + errors.push(invalid(`${pointer}/name`, `index name '${name}' must match ${INDEX_NAME_RE}`)) + return null + } + + /** @type {string | undefined} */ + let idColumn + if (raw.id_column !== undefined) { + if (typeof raw.id_column !== 'string' || raw.id_column.length === 0) { + errors.push(invalid(`${pointer}/id_column`, 'id_column must be a non-empty string')) + return null + } + idColumn = raw.id_column + } + + return { dataset, column, name, ...(idColumn !== undefined ? { id_column: idColumn } : {}) } +} + +/** + * @param {string} pointer + * @param {string} message + * @returns {VectorConfigError} + */ +function invalid(pointer, message) { + return { pointer, message, errorKind: 'vector_config_invalid' } +} + +/** + * @param {Record} raw + * @param {string} key + * @param {string} pointer + * @param {VectorConfigError[]} errors + * @returns {string | undefined} + */ +function readRequiredString(raw, key, pointer, errors) { + const v = raw[key] + if (typeof v !== 'string' || v.length === 0) { + errors.push(invalid(`${pointer}/${key}`, `${key} must be a non-empty string`)) + return undefined + } + return v +} + +/** + * @param {Record} raw + * @param {string} key + * @param {string} pointer + * @param {VectorConfigError[]} errors + * @returns {boolean | undefined} + */ +function readBoolean(raw, key, pointer, errors) { + const v = raw[key] + if (v === undefined) return undefined + if (typeof v !== 'boolean') { + errors.push(invalid(`${pointer}/${key}`, `${key} must be a boolean`)) + return undefined + } + return v +} + +/** + * @param {Record} raw + * @param {string} key + * @param {string} pointer + * @param {VectorConfigError[]} errors + * @returns {number | undefined} + */ +function readPositiveNumber(raw, key, pointer, errors) { + const v = raw[key] + if (v === undefined) return undefined + if (typeof v !== 'number' || !Number.isFinite(v) || v <= 0) { + errors.push(invalid(`${pointer}/${key}`, `${key} must be a positive number`)) + return undefined + } + return v +} diff --git a/hypaware-core/plugins-workspace/vector-search/src/hypvector.js b/hypaware-core/plugins-workspace/vector-search/src/hypvector.js new file mode 100644 index 0000000..783ecde --- /dev/null +++ b/hypaware-core/plugins-workspace/vector-search/src/hypvector.js @@ -0,0 +1,38 @@ +// @ts-check + +/** + * @import { HypvectorLoadResult } from './types.d.ts' + */ + +/** @type {HypvectorLoadResult | null} */ +let cached = null + +/** + * Lazily load the optional `hypvector` dependency (and the + * `hyparquet-writer` file writer it pairs with for shard writes). + * Activation never touches this, so the plugin activates cleanly on an + * install without optional dependencies; refresh and search report the + * missing dependency instead. + * + * @returns {Promise} + * @ref LLP 0024#packaging [implements] — root optionalDependency, graceful degradation when absent + */ +export async function loadHypvector() { + if (cached) return cached + try { + const [hv, hw] = await Promise.all([import('hypvector'), import('hyparquet-writer')]) + cached = { + ok: true, + searchVectors: hv.searchVectors, + writeVectors: hv.writeVectors, + fileWriter: hw.fileWriter, + } + } catch { + cached = { + ok: false, + message: + "optional dependency 'hypvector' is not installed — reinstall hypaware without --omit=optional to enable vector search", + } + } + return cached +} diff --git a/hypaware-core/plugins-workspace/vector-search/src/index.js b/hypaware-core/plugins-workspace/vector-search/src/index.js new file mode 100644 index 0000000..6a5f402 --- /dev/null +++ b/hypaware-core/plugins-workspace/vector-search/src/index.js @@ -0,0 +1,117 @@ +// @ts-check + +import path from 'node:path' + +import { runVector, runVectorSearch, runVectorStatus } from './commands.js' +import { validateVectorSearchConfig } from './config.js' +import { setVectorSearchRuntime } from './runtime.js' +import { searchVectorIndexes } from './search.js' +import { startVectorRefreshSource } from './source.js' +import { collectIndexStatus } from './status.js' + +/** + * @import { EmbedderCapability, HypError, PluginActivationContext, ValidationResult, VectorSearchCapability } from '../../../../collectivus-plugin-kernel-types.d.ts' + * @import { ExtendedQueryStorageService } from '../../../../src/core/cache/types.d.ts' + * @import { VectorSearchRuntime } from './types.d.ts' + */ + +const PLUGIN_NAME = '@hypaware/vector-search' +const CAPABILITY_VERSION = '1.0.0' + +/** + * Activate `@hypaware/vector-search`. Registers: + * - capability `hypaware.vector-search` (programmatic search/status) + * - commands `vector`, `vector search`, `vector status` + * - source `vector-search-refresh` (the daemon refresh timer) + * - config section `vector-search` + * + * Vector search is a plugin capability building on the intrinsic + * SQL/dataset surface, not kernel surface; it requires a separately + * chosen `hypaware.embedder` provider, so which embedder runs (and + * whether captured text may leave the machine) is always an explicit + * `plugins[]` decision. + * + * @param {PluginActivationContext} ctx + * @ref LLP 0024#plugin-not-kernel [implements] — vector search ships as a bundled plugin; query stays the SQL/dataset surface + */ +export async function activate(ctx) { + ctx.configRegistry.registerSection({ + plugin: PLUGIN_NAME, + section: 'vector-search', + validate: (value) => toValidationResult(validateVectorSearchConfig(value)), + }) + + const validated = validateVectorSearchConfig(ctx.config) + if (!validated.ok) { + const detail = validated.errors.map((e) => `${e.pointer || '/'}: ${e.message}`).join('; ') + const err = /** @type {HypError} */ (new Error(`${PLUGIN_NAME}: invalid config — ${detail}`)) + err.hypErrorKind = 'vector_config_invalid' + throw err + } + + // @ref LLP 0024#embedding-is-a-separate-capability [constrained-by] — embedding always resolves through the capability registry, never a baked-in provider + const embedder = /** @type {EmbedderCapability} */ (ctx.requireCapability('hypaware.embedder', '^1.0.0')) + + /** @type {VectorSearchRuntime} */ + const runtime = { + ctx, + config: validated.config, + embedder, + storage: /** @type {ExtendedQueryStorageService} */ (ctx.storage), + log: ctx.log, + indexesDir: path.join(ctx.paths.stateDir, 'indexes'), + } + setVectorSearchRuntime(runtime) + + /** @type {VectorSearchCapability} */ + const capability = { + search: (opts) => searchVectorIndexes({ runtime, opts }), + status: () => collectIndexStatus(runtime), + } + ctx.provideCapability('hypaware.vector-search', CAPABILITY_VERSION, capability) + + ctx.commands.register({ + name: 'vector', + plugin: PLUGIN_NAME, + summary: 'Vector similarity search (see subcommands: search, status)', + usage: 'hyp vector [args...]', + run: runVector, + }) + ctx.commands.register({ + name: 'vector search', + plugin: PLUGIN_NAME, + summary: 'Similarity search across configured vector indexes', + usage: 'hyp vector search [--index ] [--dataset ] [--top-k ] [--no-refresh] [--format ]', + run: runVectorSearch, + }) + ctx.commands.register({ + name: 'vector status', + plugin: PLUGIN_NAME, + summary: 'Per-index vector shard coverage and staleness', + usage: 'hyp vector status [--json]', + run: runVectorStatus, + }) + + ctx.sources.register({ + name: 'vector-search-refresh', + plugin: PLUGIN_NAME, + summary: 'Background vector index refresh timer', + configSection: 'vector-search', + start: startVectorRefreshSource, + }) + + ctx.log.info('vector.activated', { + index_count: validated.config.indexes.length, + embed_model: embedder.model, + refresh_enabled: validated.config.refresh.enabled, + }) +} + +/** + * @param {ReturnType} result + * @returns {ValidationResult} + */ +function toValidationResult(result) { + if (result.ok) return { ok: true } + return { ok: false, errors: result.errors.map((e) => ({ pointer: e.pointer, message: e.message })) } +} diff --git a/hypaware-core/plugins-workspace/vector-search/src/refresh.js b/hypaware-core/plugins-workspace/vector-search/src/refresh.js new file mode 100644 index 0000000..a12300e --- /dev/null +++ b/hypaware-core/plugins-workspace/vector-search/src/refresh.js @@ -0,0 +1,337 @@ +// @ts-check + +import { randomUUID } from 'node:crypto' +import fs from 'node:fs' +import fsPromises from 'node:fs/promises' +import path from 'node:path' +import process from 'node:process' + +import { Attr, withSpan } from '../../../../src/core/observability/index.js' +import { loadHypvector } from './hypvector.js' +import { computeShardStates, contentId, readShardMetas, REBUILD_STATES, shardPaths } from './shards.js' + +/** + * @import { CachePartitionMeta, EmbedderCapability, HypError, PluginLogger } from '../../../../collectivus-plugin-kernel-types.d.ts' + * @import { ExtendedQueryStorageService } from '../../../../src/core/cache/types.d.ts' + * @import { RefreshBudget, RefreshReport, ShardMeta, ShardState, VectorIndexDeclaration } from './types.d.ts' + */ + +const PLUGIN_NAME = '@hypaware/vector-search' + +/** + * Bring every configured index up to date, incrementally: only + * missing/stale shards are rebuilt, orphaned shards are swept, and the + * optional budget bounds wall-clock and embedding spend. Budgets are + * soft — they gate *starting* a shard, not finishing it, so one + * oversized partition overshoots a tick once instead of starving + * forever. + * + * Each shard write is durable (tmp file + rename), so an interrupted + * cold build resumes where it left off on the next run. + * + * @param {{ + * decls: VectorIndexDeclaration[], + * embedder: EmbedderCapability, + * storage: ExtendedQueryStorageService, + * indexesDir: string, + * log: PluginLogger, + * budget?: RefreshBudget, + * dimension?: number, + * onShard?: (info: { index: string, fileBase: string, state: string, rowsEmbedded: number }) => void, + * }} args + * @returns {Promise} + * @ref LLP 0024#freshness-rides-the-cache-maintenance-pattern [implements] — incremental per-partition shard builds under a tick budget; per-shard writes are durable + */ +export async function refreshIndexes({ decls, embedder, storage, indexesDir, log, budget, dimension, onShard }) { + /** @type {RefreshReport} */ + const report = { + shardsBuilt: 0, + shardsSkipped: 0, + orphansSwept: 0, + rowsEmbedded: 0, + budgetExhausted: false, + } + + for (const decl of decls) { + const partitions = await storage.discoverCachePartitions({ datasets: [decl.dataset] }) + const metas = readShardMetas(indexesDir, decl.name) + const states = computeShardStates({ + partitions, + metas, + decl, + model: embedder.model, + // Without a configured dimension the daemon cannot detect + // dimension drift; the search path closes that gap with the + // dimension the query embedded to. + dimension: embedder.dimensions ?? dimension, + }) + + // Orphans sweep even on an exhausted budget — deletion is cheap and + // never spends embedding tokens. + for (const state of states) { + if (state.state === 'orphan') { + sweepOrphan(indexesDir, decl.name, state.fileBase, log) + report.orphansSwept++ + } + } + + const pending = states.filter((s) => REBUILD_STATES.has(s.state)) + for (let i = 0; i < pending.length; i++) { + if (budgetSpent(budget, report)) { + report.shardsSkipped += pending.length - i + report.budgetExhausted = true + break + } + const state = pending[i] + const partition = /** @type {CachePartitionMeta} */ (state.partition) + const built = await buildShard({ decl, partition, state, embedder, storage, indexesDir, log }) + report.shardsBuilt++ + report.rowsEmbedded += built.rowsEmbedded + onShard?.({ index: decl.name, fileBase: state.fileBase, state: state.state, rowsEmbedded: built.rowsEmbedded }) + } + } + + return report +} + +/** + * @param {RefreshBudget | undefined} budget + * @param {RefreshReport} report + * @returns {boolean} + */ +function budgetSpent(budget, report) { + if (!budget) return false + if (budget.deadlineMs !== undefined && Date.now() > budget.deadlineMs) return true + if (budget.maxRows !== undefined && report.rowsEmbedded >= budget.maxRows) return true + return false +} + +/** + * Estimate the refresh work a search would trigger: cache rows behind + * every missing/stale shard. An over-estimate when texts repeat (the + * content-hash id dedups before embedding), which is the right + * direction for an upfront spend warning. + * + * @param {ShardState[]} states + * @returns {{ shards: number, rows: number }} + */ +export function estimatePendingWork(states) { + let shards = 0 + let rows = 0 + for (const s of states) { + if (REBUILD_STATES.has(s.state)) { + shards++ + rows += s.partition?.rowCount ?? 0 + } + } + return { shards, rows } +} + +/** + * In-process per-shard build serialization. Search-time refresh and the + * daemon tick share a process when search runs through the capability, + * so two builds of the same shard can otherwise interleave their + * parquet/sidecar writes. Keyed by the shard's final file path; entries + * are dropped once the tail build settles. + * + * @type {Map>} + */ +const shardBuildLocks = new Map() + +/** + * @template T + * @param {string} key + * @param {() => Promise} fn + * @returns {Promise} + */ +async function withShardBuildLock(key, fn) { + const tail = shardBuildLocks.get(key) ?? Promise.resolve() + const run = tail.then(fn, fn) + const settled = run.catch(() => {}) + shardBuildLocks.set(key, settled) + settled.then(() => { + if (shardBuildLocks.get(key) === settled) shardBuildLocks.delete(key) + }) + return run +} + +/** + * Build one shard: read the partition's rows from the cache, dedup by + * id, embed, and write the hypvector file + sidecar atomically. Builds + * of the same shard serialize in-process; temp names carry pid + a + * UUID so concurrent processes (CLI search vs daemon tick) can never + * write the same temp file, and the final rename stays atomic — + * last-writer-wins on identical inputs. + * + * @param {{ + * decl: VectorIndexDeclaration, + * partition: CachePartitionMeta, + * state: ShardState, + * embedder: EmbedderCapability, + * storage: ExtendedQueryStorageService, + * indexesDir: string, + * log: PluginLogger, + * }} args + * @returns {Promise<{ rowsEmbedded: number }>} + * @ref LLP 0024#index-files-are-plugin-state [implements] — shards are derived artifacts under plugin state, rebuilt from the cache + */ +async function buildShard({ decl, partition, state, embedder, storage, indexesDir, log }) { + const { file } = shardPaths(indexesDir, decl.name, state.fileBase) + return withShardBuildLock(file, () => buildShardLocked({ decl, partition, state, embedder, storage, indexesDir, log })) +} + +/** + * @param {{ + * decl: VectorIndexDeclaration, + * partition: CachePartitionMeta, + * state: ShardState, + * embedder: EmbedderCapability, + * storage: ExtendedQueryStorageService, + * indexesDir: string, + * log: PluginLogger, + * }} args + * @returns {Promise<{ rowsEmbedded: number }>} + */ +async function buildShardLocked({ decl, partition, state, embedder, storage, indexesDir, log }) { + return withSpan( + 'vector.build_shard', + { + [Attr.COMPONENT]: 'vector-search', + [Attr.OPERATION]: 'vector.build_shard', + [Attr.PLUGIN]: PLUGIN_NAME, + [Attr.DATASET]: decl.dataset, + vector_index: decl.name, + shard: state.fileBase, + shard_reason: state.state, + status: 'ok', + }, + async (span) => { + const hv = await loadHypvector() + if (!hv.ok) throw newVectorError('vector_dependency_missing', hv.message) + + const texts = await collectShardTexts({ decl, partition, storage }) + span.setAttribute('unique_text_count', texts.size) + + const { file, meta } = shardPaths(indexesDir, decl.name, state.fileBase) + await fsPromises.mkdir(path.dirname(file), { recursive: true }) + + let dimension = 0 + let rowsEmbedded = 0 + if (texts.size > 0) { + const ids = Array.from(texts.keys()) + const result = await embedder.embed(Array.from(texts.values())) + dimension = result.dimension + rowsEmbedded = result.vectors.length + + const tmpFile = `${file}.tmp-${process.pid}-${randomUUID()}` + try { + await hv.writeVectors({ + writer: hv.fileWriter(tmpFile), + vectors: ids.map((id, i) => ({ id, vector: result.vectors[i] })), + dimension, + normalize: true, + metric: 'cosine', + }) + await fsPromises.rename(tmpFile, file) + } catch (err) { + await fsPromises.rm(tmpFile, { force: true }) + throw err + } + } else { + // Nothing embeddable: drop any previous generation so search + // cannot hit stale vectors, and record an empty shard so the + // next tick does not re-read the partition. + await fsPromises.rm(file, { force: true }) + } + + /** @type {ShardMeta} */ + const shardMeta = { + schema_version: 1, + index: decl.name, + dataset: decl.dataset, + column: decl.column, + ...(decl.id_column !== undefined ? { id_column: decl.id_column } : {}), + partition: partition.partition, + model: embedder.model, + dimension, + row_count: rowsEmbedded, + source_row_count: partition.rowCount, + built_at: new Date().toISOString(), + } + const tmpMeta = `${meta}.tmp-${process.pid}-${randomUUID()}` + await fsPromises.writeFile(tmpMeta, JSON.stringify(shardMeta, null, 2) + '\n', 'utf8') + await fsPromises.rename(tmpMeta, meta) + + span.setAttribute('row_count', rowsEmbedded) + span.setAttribute('dimension', dimension) + span.setAttribute('embed_model', embedder.model) + + log.info('vector.shard_built', { + [Attr.DATASET]: decl.dataset, + vector_index: decl.name, + shard: state.fileBase, + shard_reason: state.state, + row_count: rowsEmbedded, + source_row_count: partition.rowCount, + }) + + return { rowsEmbedded } + }, + { component: 'vector-search' } + ) +} + +/** + * Materialize the partition's embeddable texts, deduplicated by id. + * Only non-empty string cells embed; everything else is skipped. The + * id is the configured `id_column` value, or a hash of the text itself + * (which collapses repeated texts into one vector). + * + * @param {{ decl: VectorIndexDeclaration, partition: CachePartitionMeta, storage: ExtendedQueryStorageService }} args + * @returns {Promise>} + */ +export async function collectShardTexts({ decl, partition, storage }) { + /** @type {Map} */ + const texts = new Map() + const columns = decl.id_column ? [decl.column, decl.id_column] : [decl.column] + for await (const row of storage.readRows(partition.path, columns)) { + const text = row[decl.column] + if (typeof text !== 'string' || text.length === 0) continue + const id = decl.id_column ? row[decl.id_column] : contentId(text) + if (typeof id !== 'string' || id.length === 0) continue + if (!texts.has(id)) texts.set(id, text) + } + return texts +} + +/** + * @param {string} indexesDir + * @param {string} indexName + * @param {string} fileBase + * @param {PluginLogger} log + */ +function sweepOrphan(indexesDir, indexName, fileBase, log) { + const { file, meta } = shardPaths(indexesDir, indexName, fileBase) + try { + fs.rmSync(file, { force: true }) + fs.rmSync(meta, { force: true }) + log.info('vector.orphan_swept', { vector_index: indexName, shard: fileBase }) + } catch (err) { + log.warn('vector.orphan_sweep_failed', { + vector_index: indexName, + shard: fileBase, + message: err instanceof Error ? err.message : String(err), + }) + } +} + +/** + * @param {string} kind + * @param {string} message + * @returns {HypError} + */ +export function newVectorError(kind, message) { + const err = /** @type {HypError} */ (new Error(message)) + err.hypErrorKind = kind + return err +} diff --git a/hypaware-core/plugins-workspace/vector-search/src/runtime.js b/hypaware-core/plugins-workspace/vector-search/src/runtime.js new file mode 100644 index 0000000..904e5a6 --- /dev/null +++ b/hypaware-core/plugins-workspace/vector-search/src/runtime.js @@ -0,0 +1,31 @@ +// @ts-check + +/** + * Module-local activation state, mirroring the `@hypaware/gascity` + * pattern: `activate()` captures what command bodies and the refresh + * source need (paths, validated config, the resolved embedder), and + * they read it back here — `CommandRunContext` deliberately does not + * carry per-plugin paths. + */ + +/** + * @import { VectorSearchRuntime } from './types.d.ts' + */ + +/** @type {VectorSearchRuntime | null} */ +let runtime = null + +/** @param {VectorSearchRuntime} value */ +export function setVectorSearchRuntime(value) { + runtime = value +} + +/** + * @returns {VectorSearchRuntime} + */ +export function getVectorSearchRuntime() { + if (!runtime) { + throw new Error('@hypaware/vector-search: runtime not initialized — plugin is not activated') + } + return runtime +} diff --git a/hypaware-core/plugins-workspace/vector-search/src/search.js b/hypaware-core/plugins-workspace/vector-search/src/search.js new file mode 100644 index 0000000..f33a587 --- /dev/null +++ b/hypaware-core/plugins-workspace/vector-search/src/search.js @@ -0,0 +1,238 @@ +// @ts-check + +import { Attr, withSpan } from '../../../../src/core/observability/index.js' +import { loadHypvector } from './hypvector.js' +import { estimatePendingWork, newVectorError, refreshIndexes } from './refresh.js' +import { computeShardStates, contentId, mergeTopK, readShardMetas, shardFileBase, shardPaths } from './shards.js' + +/** + * @import { VectorSearchHit, VectorSearchOptions } from '../../../../collectivus-plugin-kernel-types.d.ts' + * @import { ShardState, VectorIndexDeclaration, VectorSearchRuntime } from './types.d.ts' + */ + +const PLUGIN_NAME = '@hypaware/vector-search' +const DEFAULT_TOP_K = 10 + +/** + * Embed the query, fan out across every shard of the matching indexes, + * and merge the global top-K. The query embeds *before* any refresh: + * its dimension is the staleness signal that catches dimension drift + * (same model, different vector length — a changed embedder + * `dimensions` config, or a different server behind the same model + * name) so auto-refresh re-embeds instead of hard-failing on shards it + * just declared fresh. + * + * Two refresh modes, mirroring `hyp query sql --refresh`: + * + * - `auto` (default): missing/stale shards rebuild first. `onProgress` + * receives an upfront row estimate and one line per shard so an + * interactive caller sees where embedding spend goes. + * - `never`: search existing shards as-is. A shard built with a + * different model or dimension than the live embedder is a hard + * error here — cross-model scores are meaningless and must not + * silently degrade. + * + * @param {{ runtime: VectorSearchRuntime, opts: VectorSearchOptions, onProgress?: (line: string) => void }} args + * @returns {Promise} + * @ref LLP 0024#indexes-are-declared-in-config-sharded-per-partition [implements] — partition discovery via the registry-backed cache, fan-out, top-K merge; model/dimension mismatch is never a silent degraded search + */ +export async function searchVectorIndexes({ runtime, opts, onProgress }) { + const decls = selectIndexes(runtime.config.indexes, opts) + if (decls.length === 0) { + throw newVectorError( + 'vector_no_indexes', + runtime.config.indexes.length === 0 + ? 'no vector indexes configured — add indexes[] to the vector-search plugin config' + : `no configured vector index matches${opts.index ? ` index '${opts.index}'` : ''}${opts.dataset ? ` dataset '${opts.dataset}'` : ''}` + ) + } + const refresh = opts.refresh ?? 'auto' + const topK = opts.topK ?? DEFAULT_TOP_K + + return withSpan( + 'vector.search', + { + [Attr.COMPONENT]: 'vector-search', + [Attr.OPERATION]: 'vector.search', + [Attr.PLUGIN]: PLUGIN_NAME, + index_count: decls.length, + refresh_mode: refresh, + top_k: topK, + status: 'ok', + }, + async (span) => { + const hv = await loadHypvector() + if (!hv.ok) throw newVectorError('vector_dependency_missing', hv.message) + + const queryEmbed = await runtime.embedder.embed([opts.query], { signal: opts.signal }) + const queryVector = queryEmbed.vectors[0] + + if (refresh === 'auto') { + await refreshForSearch({ runtime, decls, dimension: queryEmbed.dimension, onProgress }) + } + + /** @type {Array<{ decl: VectorIndexDeclaration, state: ShardState }>} */ + const searchable = [] + for (const decl of decls) { + const partitions = await runtime.storage.discoverCachePartitions({ datasets: [decl.dataset] }) + const metas = readShardMetas(runtime.indexesDir, decl.name) + const states = computeShardStates({ + partitions, + metas, + decl, + model: runtime.embedder.model, + dimension: queryEmbed.dimension, + }) + for (const state of states) { + if (!state.meta || state.meta.row_count === 0) continue + if (state.state === 'orphan') continue + if (state.meta.model !== runtime.embedder.model) { + // Only reachable under refresh=never (auto just rebuilt). + throw newVectorError( + 'vector_model_mismatch', + `shard ${decl.name}/${state.fileBase} was built with model '${state.meta.model}' but the configured embedder is '${runtime.embedder.model}'; rerun without --no-refresh to re-embed` + ) + } + if (state.meta.dimension !== queryEmbed.dimension) { + // Under refresh=never this is dimension drift the rebuild + // would fix; under auto the shard was just rebuilt, so a + // mismatch means the embedder itself is non-deterministic. + throw newVectorError( + 'vector_dimension_mismatch', + refresh === 'never' + ? `shard ${decl.name}/${state.fileBase} has dimension ${state.meta.dimension} but the query embedded to ${queryEmbed.dimension}; rerun without --no-refresh to re-embed` + : `shard ${decl.name}/${state.fileBase} was just rebuilt at dimension ${state.meta.dimension} but the query embedded to ${queryEmbed.dimension} — the embedder is returning inconsistent dimensions for model '${runtime.embedder.model}'` + ) + } + searchable.push({ decl, state }) + } + } + span.setAttribute('shard_count', searchable.length) + if (searchable.length === 0) { + span.setAttribute('row_count', 0) + return [] + } + + const hitLists = await Promise.all( + searchable.map(async ({ decl, state }) => { + const { file } = shardPaths(runtime.indexesDir, decl.name, state.fileBase) + const raw = await hv.searchVectors({ + source: file, + query: queryVector, + topK, + signal: opts.signal, + }) + return raw.map((hit) => ({ + index: decl.name, + dataset: decl.dataset, + partition: state.meta?.partition ?? {}, + id: String(hit.id), + score: hit.score, + })) + }) + ) + + const merged = mergeTopK(hitLists, topK) + await attachHitTexts({ runtime, decls, hits: merged }) + span.setAttribute('row_count', merged.length) + return merged + }, + { component: 'vector-search' } + ) +} + +/** + * @param {VectorIndexDeclaration[]} indexes + * @param {VectorSearchOptions} opts + * @returns {VectorIndexDeclaration[]} + */ +function selectIndexes(indexes, opts) { + return indexes.filter((decl) => { + if (opts.index && decl.name !== opts.index) return false + if (opts.dataset && decl.dataset !== opts.dataset) return false + return true + }) +} + +/** + * Search-time refresh: report the pending work upfront (so the caller + * sees the embedding spend before it happens), then rebuild with no row + * budget — an interactive search wants a complete answer. `dimension` + * is the length the query embedded to, so dimension drift classifies + * stale here even when the embedder has no configured `dimensions`. + * + * @param {{ runtime: VectorSearchRuntime, decls: VectorIndexDeclaration[], dimension: number, onProgress?: (line: string) => void }} args + */ +async function refreshForSearch({ runtime, decls, dimension, onProgress }) { + let pendingShards = 0 + let pendingRows = 0 + for (const decl of decls) { + const partitions = await runtime.storage.discoverCachePartitions({ datasets: [decl.dataset] }) + const metas = readShardMetas(runtime.indexesDir, decl.name) + const states = computeShardStates({ partitions, metas, decl, model: runtime.embedder.model, dimension }) + const estimate = estimatePendingWork(states) + pendingShards += estimate.shards + pendingRows += estimate.rows + } + if (pendingShards === 0) return + + onProgress?.(`vector: refreshing ${pendingShards} shard(s), ~${pendingRows} row(s) to embed (use --no-refresh to skip)`) + await refreshIndexes({ + decls, + embedder: runtime.embedder, + storage: runtime.storage, + indexesDir: runtime.indexesDir, + log: runtime.log, + dimension, + onShard: (info) => { + onProgress?.(`vector: built ${info.index}/${info.fileBase} (${info.rowsEmbedded} embedded, was ${info.state})`) + }, + }) +} + +/** + * Resolve hit texts back out of the cache. One pass per partition that + * actually holds hits, stopping early once every id in that partition + * is resolved — never a scan over partitions without hits. + * + * @param {{ runtime: VectorSearchRuntime, decls: VectorIndexDeclaration[], hits: VectorSearchHit[] }} args + */ +async function attachHitTexts({ runtime, decls, hits }) { + const declByName = new Map(decls.map((d) => [d.name, d])) + + /** @type {Map }>} */ + const groups = new Map() + for (const hit of hits) { + const decl = declByName.get(hit.index) + if (!decl) continue + const key = `${hit.index}|${shardFileBase(hit.partition)}` + let group = groups.get(key) + if (!group) { + const partitions = await runtime.storage.discoverCachePartitions({ datasets: [decl.dataset] }) + const partition = partitions.find((p) => shardFileBase(p.partition) === shardFileBase(hit.partition)) + if (!partition) continue + group = { decl, partitionPath: partition.path, byId: new Map() } + groups.set(key, group) + } + const list = group.byId.get(hit.id) ?? [] + list.push(hit) + group.byId.set(hit.id, list) + } + + for (const group of groups.values()) { + const { decl } = group + const columns = decl.id_column ? [decl.column, decl.id_column] : [decl.column] + let remaining = group.byId.size + for await (const row of runtime.storage.readRows(group.partitionPath, columns)) { + const text = row[decl.column] + if (typeof text !== 'string' || text.length === 0) continue + const id = decl.id_column ? row[decl.id_column] : contentId(text) + if (typeof id !== 'string') continue + const matched = group.byId.get(id) + if (!matched || matched[0].text !== undefined) continue + for (const hit of matched) hit.text = text + remaining-- + if (remaining === 0) break + } + } +} diff --git a/hypaware-core/plugins-workspace/vector-search/src/shards.js b/hypaware-core/plugins-workspace/vector-search/src/shards.js new file mode 100644 index 0000000..18e34c0 --- /dev/null +++ b/hypaware-core/plugins-workspace/vector-search/src/shards.js @@ -0,0 +1,239 @@ +// @ts-check + +import { createHash } from 'node:crypto' +import fs from 'node:fs' +import path from 'node:path' + +/** + * @import { CachePartitionMeta } from '../../../../collectivus-plugin-kernel-types.d.ts' + * @import { RawShardHit, ShardMeta, ShardState, VectorIndexDeclaration } from './types.d.ts' + */ + +/** + * Shard layout: one hypvector parquet file per cache partition, plus a + * JSON sidecar carrying what the parquet KV metadata cannot — the + * embedder model and the source partition's row count at build time. + * Files live under `/indexes//`: + * + * indexes//.parquet + * indexes//.meta.json + * + * Shards are derived artifacts: rebuildable from the cache, never the + * system of record. Deleting the whole tree costs one cold rebuild. + */ + +/** + * @param {Record} partition + * @returns {[string, string][]} + */ +function sortedEntries(partition) { + return Object.entries(partition ?? {}).sort(([a], [b]) => a.localeCompare(b)) +} + +/** + * Render a partition kv-bag into a stable, filename-safe base name: + * a human-readable label plus a short hash of the canonical partition + * JSON. The sanitized label alone is lossy (`source=a/b` and + * `source=a_b` both render `source=a_b`, and a value containing `,` + * or `=` can mimic another partition's entry list), so the hash — + * not the label — is what makes distinct partitions map to distinct + * shard files. Keys are sorted so discovery order can never produce + * two names for one partition; `all` covers partition-less datasets. + * + * @param {Record} partition + * @returns {string} + * @ref LLP 0024#indexes-are-declared-in-config-sharded-per-partition [implements] — shard file names are label + partition hash so sanitization can never collide two partitions + */ +export function shardFileBase(partition) { + const entries = sortedEntries(partition) + if (entries.length === 0) return 'all' + const label = entries + .map(([k, v]) => `${k}=${v}`) + .join(',') + .replace(/[^A-Za-z0-9._=,-]/g, '_') + .slice(0, 80) + const hash = createHash('sha256').update(JSON.stringify(entries)).digest('hex').slice(0, 8) + return `${label}-${hash}` +} + +/** + * @param {string} indexesDir + * @param {string} indexName + * @param {string} fileBase + * @returns {{ file: string, meta: string }} + */ +export function shardPaths(indexesDir, indexName, fileBase) { + const dir = path.join(indexesDir, indexName) + return { + file: path.join(dir, `${fileBase}.parquet`), + meta: path.join(dir, `${fileBase}.meta.json`), + } +} + +/** + * Default shard row id: a content hash of the embedded text. Identical + * texts collapse to one vector, so denormalized columns (the same + * value repeated on every row) cost one embedding call instead of one + * per row. + * + * @param {string} text + * @returns {string} + */ +export function contentId(text) { + return createHash('sha256').update(text, 'utf8').digest('hex').slice(0, 32) +} + +/** + * Read every shard sidecar under one index dir. Unreadable or + * malformed sidecars are skipped — the shard will classify as + * `missing` and rebuild through the normal path. + * + * @param {string} indexesDir + * @param {string} indexName + * @returns {Map} + */ +export function readShardMetas(indexesDir, indexName) { + /** @type {Map} */ + const metas = new Map() + const dir = path.join(indexesDir, indexName) + /** @type {string[]} */ + let entries + try { + entries = fs.readdirSync(dir) + } catch { + return metas + } + for (const entry of entries) { + if (!entry.endsWith('.meta.json')) continue + const fileBase = entry.slice(0, -'.meta.json'.length) + try { + const parsed = JSON.parse(fs.readFileSync(path.join(dir, entry), 'utf8')) + if (parsed && parsed.schema_version === 1 && typeof parsed.model === 'string') { + // A sidecar without its parquet file is a torn write; treat as + // missing so the shard rebuilds. Empty shards (zero embeddable + // texts) legitimately have a sidecar only. + if (parsed.row_count > 0 && !fs.existsSync(path.join(dir, `${fileBase}.parquet`))) continue + metas.set(fileBase, /** @type {ShardMeta} */ (parsed)) + } + } catch { /* malformed sidecar — rebuild path handles it */ } + } + return metas +} + +/** + * States that the refresh path resolves by rebuilding the shard. + * Everything here re-embeds through the same build; `orphan` deletes + * instead, and `fresh` is left alone. + * + * @type {ReadonlySet} + */ +export const REBUILD_STATES = new Set(['missing', 'stale_config', 'stale_model', 'stale_dimension', 'stale_rows']) + +/** + * @param {ShardMeta} meta + * @param {VectorIndexDeclaration} decl + * @param {Record} partition + * @returns {boolean} + */ +function matchesDeclaration(meta, decl, partition) { + return ( + meta.index === decl.name && + meta.dataset === decl.dataset && + meta.column === decl.column && + (meta.id_column ?? null) === (decl.id_column ?? null) && + JSON.stringify(sortedEntries(meta.partition)) === JSON.stringify(sortedEntries(partition)) + ) +} + +/** + * Classify every shard of one index against the live cache partitions. + * Pure over its inputs so the staleness rules are unit-testable: + * + * - no shard for a live partition → `missing` + * - sidecar identity (dataset, column, id_column, exact partition) + * differs from the live declaration → `stale_config` + * (an index name reused over a different dataset/column must never + * pass row-count + model checks and serve the old vectors) + * - sidecar model differs from config model → `stale_model` + * (stale, not an error — the refresh path re-embeds) + * - sidecar dimension differs from the expected dimension, when the + * caller knows one → `stale_dimension` + * (the embedder's configured `dimensions`, or the dimension the + * query embedded to — same model, different vector length) + * - sidecar source_row_count differs from the partition's current + * rowCount → `stale_rows` + * (compaction dedup can shrink rowCount without new content; the + * re-embed is wasted work but never wrong) + * - shard whose partition no longer exists → `orphan` + * (cache retention evicted it; the next sweep deletes the shard, + * so there is no index-over-deleted-rows staleness class) + * - otherwise → `fresh` + * + * @param {{ partitions: CachePartitionMeta[], metas: Map, decl: VectorIndexDeclaration, model: string, dimension?: number }} args + * @returns {ShardState[]} + * @ref LLP 0024#indexes-are-declared-in-config-sharded-per-partition [implements] — declaration, model, and dimension drift are all staleness; retention coupling dissolves into orphan sweep + */ +export function computeShardStates({ partitions, metas, decl, model, dimension }) { + /** @type {ShardState[]} */ + const states = [] + const liveBases = new Set() + + for (const partition of partitions) { + const fileBase = shardFileBase(partition.partition) + liveBases.add(fileBase) + const meta = metas.get(fileBase) + if (!meta) { + states.push({ fileBase, state: 'missing', partition }) + } else if (!matchesDeclaration(meta, decl, partition.partition)) { + states.push({ fileBase, state: 'stale_config', partition, meta }) + } else if (meta.model !== model) { + states.push({ fileBase, state: 'stale_model', partition, meta }) + } else if (dimension !== undefined && meta.row_count > 0 && meta.dimension !== dimension) { + states.push({ fileBase, state: 'stale_dimension', partition, meta }) + } else if (meta.source_row_count !== partition.rowCount) { + states.push({ fileBase, state: 'stale_rows', partition, meta }) + } else { + states.push({ fileBase, state: 'fresh', partition, meta }) + } + } + + for (const [fileBase, meta] of metas) { + if (!liveBases.has(fileBase)) { + states.push({ fileBase, state: 'orphan', meta }) + } + } + + return states +} + +/** + * Merge per-shard hit lists into one global top-K. Scores are cosine + * over normalized vectors (higher = better) — fixed at shard write + * time, so a plain descending sort is a correct merge. + * + * @template {RawShardHit} T + * @param {T[][]} hitLists + * @param {number} topK + * @returns {T[]} + */ +export function mergeTopK(hitLists, topK) { + /** @type {T[]} */ + const all = [] + for (const hits of hitLists) all.push(...hits) + all.sort((a, b) => b.score - a.score) + return all.slice(0, Math.max(0, topK)) +} + +/** + * Render a partition for display (`source=claude`, or `all`). Unlike + * {@link shardFileBase} this is unsanitized and unhashed — display + * strings don't need to be collision-free file names. + * + * @param {Record} partition + * @returns {string} + */ +export function partitionLabel(partition) { + const entries = sortedEntries(partition) + if (entries.length === 0) return 'all' + return entries.map(([k, v]) => `${k}=${v}`).join(',') +} diff --git a/hypaware-core/plugins-workspace/vector-search/src/source.js b/hypaware-core/plugins-workspace/vector-search/src/source.js new file mode 100644 index 0000000..9fe59ec --- /dev/null +++ b/hypaware-core/plugins-workspace/vector-search/src/source.js @@ -0,0 +1,150 @@ +// @ts-check + +import { Attr, withSpan } from '../../../../src/core/observability/index.js' +import { refreshIndexes } from './refresh.js' +import { getVectorSearchRuntime } from './runtime.js' +import { validateVectorSearchConfig } from './config.js' + +/** + * @import { PluginActivationContext, SourceStatus, StartedSource } from '../../../../collectivus-plugin-kernel-types.d.ts' + * @import { RefreshReport } from './types.d.ts' + */ + +/** + * Daemon refresh timer, modeled on the kernel's cache-maintenance loop + * (`src/core/cache/maintenance.js` via `daemon/runtime.js`): its own + * interval, a wall-clock budget per tick, an in-flight guard so a slow + * tick never stacks, and an unref'd handle so the timer cannot keep the + * process alive. Registered as a source contribution because the daemon + * starts every registered source — that is the kernel's "give a plugin + * a periodic foothold" seam. + * + * Per-tick embedding spend is additionally bounded by + * `refresh.max_rows_per_tick`; bounded per-partition shard writes match + * the work-shape the maintenance tick already performs in-daemon, so + * the monolithic parquet-export OOM constraint does not apply here. + * + * @param {PluginActivationContext} _ctx + * @returns {Promise} + * @ref LLP 0024#freshness-rides-the-cache-maintenance-pattern [implements] — daemon timer with interval + per-tick wall-clock and row budgets + */ +export async function startVectorRefreshSource(_ctx) { + const runtime = getVectorSearchRuntime() + + /** @type {NodeJS.Timeout | null} */ + let handle = null + /** @type {Promise | null} */ + let inFlight = null + /** @type {RefreshReport | null} */ + let lastReport = null + /** @type {string | null} */ + let lastError = null + /** @type {string | null} */ + let lastTickAt = null + + async function runTick() { + const cfg = runtime.config.refresh + lastTickAt = new Date().toISOString() + await withSpan( + 'vector.refresh_tick', + { + [Attr.COMPONENT]: 'vector-search', + [Attr.OPERATION]: 'vector.refresh_tick', + [Attr.PLUGIN]: '@hypaware/vector-search', + index_count: runtime.config.indexes.length, + status: 'ok', + }, + async (span) => { + const report = await refreshIndexes({ + decls: runtime.config.indexes, + embedder: runtime.embedder, + storage: runtime.storage, + indexesDir: runtime.indexesDir, + log: runtime.log, + budget: { + deadlineMs: Date.now() + cfg.max_tick_ms, + maxRows: cfg.max_rows_per_tick, + }, + }) + lastReport = report + lastError = null + span.setAttribute('shards_built', report.shardsBuilt) + span.setAttribute('shards_skipped', report.shardsSkipped) + span.setAttribute('orphans_swept', report.orphansSwept) + span.setAttribute('rows_embedded', report.rowsEmbedded) + span.setAttribute('budget_exhausted', report.budgetExhausted) + }, + { component: 'vector-search' } + ).catch((/** @type {unknown} */ err) => { + lastError = err instanceof Error ? err.message : String(err) + runtime.log.error('vector.refresh_tick_failed', { message: lastError }) + }) + } + + function startTimer() { + const cfg = runtime.config.refresh + if (!cfg.enabled || runtime.config.indexes.length === 0) return + const intervalMs = Math.max(1, Math.round(cfg.interval_minutes * 60_000)) + handle = setInterval(() => { + if (inFlight) return + inFlight = runTick().finally(() => { inFlight = null }) + }, intervalMs) + if (typeof handle.unref === 'function') handle.unref() + } + + function stopTimer() { + if (handle) { + clearInterval(handle) + handle = null + } + } + + startTimer() + runtime.log.info('vector.refresh_source_started', { + enabled: runtime.config.refresh.enabled, + index_count: runtime.config.indexes.length, + interval_minutes: runtime.config.refresh.interval_minutes, + }) + + return { + async status() { + /** @type {SourceStatus} */ + const status = { + state: 'ready', + message: handle + ? `refreshing ${runtime.config.indexes.length} index(es) every ${runtime.config.refresh.interval_minutes}m` + : 'idle (refresh disabled or no indexes configured)', + details: { + last_tick_at: lastTickAt, + last_error: lastError, + ...(lastReport + ? { + shards_built: lastReport.shardsBuilt, + shards_skipped: lastReport.shardsSkipped, + orphans_swept: lastReport.orphansSwept, + rows_embedded: lastReport.rowsEmbedded, + budget_exhausted: lastReport.budgetExhausted, + } + : {}), + }, + } + return status + }, + async reload(freshCtx) { + const validated = validateVectorSearchConfig(freshCtx.config) + if (!validated.ok) { + runtime.log.warn('vector.reload_config_invalid', { + errors: validated.errors.map((e) => `${e.pointer}: ${e.message}`).join('; '), + }) + return + } + runtime.config = validated.config + stopTimer() + startTimer() + }, + async stop() { + stopTimer() + if (inFlight) await inFlight + }, + } +} diff --git a/hypaware-core/plugins-workspace/vector-search/src/status.js b/hypaware-core/plugins-workspace/vector-search/src/status.js new file mode 100644 index 0000000..718d0e3 --- /dev/null +++ b/hypaware-core/plugins-workspace/vector-search/src/status.js @@ -0,0 +1,52 @@ +// @ts-check + +import { computeShardStates, readShardMetas } from './shards.js' + +/** + * @import { VectorIndexStatus } from '../../../../collectivus-plugin-kernel-types.d.ts' + * @import { VectorSearchRuntime } from './types.d.ts' + */ + +/** + * Per-index, per-partition shard coverage: state, model, dimension, + * row counts, build time. Works without the optional hypvector + * dependency — everything here reads sidecar metas and the cache + * partition listing only. + * + * @param {VectorSearchRuntime} runtime + * @returns {Promise} + */ +export async function collectIndexStatus(runtime) { + /** @type {VectorIndexStatus[]} */ + const out = [] + for (const decl of runtime.config.indexes) { + const partitions = await runtime.storage.discoverCachePartitions({ datasets: [decl.dataset] }) + const metas = readShardMetas(runtime.indexesDir, decl.name) + const states = computeShardStates({ + partitions, + metas, + decl, + model: runtime.embedder.model, + dimension: runtime.embedder.dimensions, + }) + out.push({ + index: decl.name, + dataset: decl.dataset, + column: decl.column, + model: runtime.embedder.model, + shards: states.map((s) => ({ + partition: s.partition?.partition ?? s.meta?.partition ?? {}, + state: s.state, + ...(s.meta + ? { + rows: s.meta.row_count, + model: s.meta.model, + dimension: s.meta.dimension, + built_at: s.meta.built_at, + } + : {}), + })), + }) + } + return out +} diff --git a/hypaware-core/plugins-workspace/vector-search/src/types.d.ts b/hypaware-core/plugins-workspace/vector-search/src/types.d.ts new file mode 100644 index 0000000..50b4564 --- /dev/null +++ b/hypaware-core/plugins-workspace/vector-search/src/types.d.ts @@ -0,0 +1,134 @@ +/** + * Plugin-local types for `@hypaware/vector-search`. The capability + * surface (`VectorSearchCapability` and friends) lives in the kernel + * types file; these shapes cover config, shard bookkeeping, and the + * refresh/search internals. + */ + +import type { + CachePartitionMeta, + EmbedderCapability, + PluginActivationContext, + PluginLogger, +} from '../../../../collectivus-plugin-kernel-types.d.ts' +import type { ExtendedQueryStorageService } from '../../../../src/core/cache/types.d.ts' +import type { searchVectors, writeVectors } from 'hypvector' +import type { fileWriter } from 'hyparquet-writer' + +export interface VectorIndexDeclaration { + /** Dataset to index (resolved through the dataset registry at refresh time). */ + dataset: string + /** Column whose text is embedded. */ + column: string + /** Index name; defaults to `.`. */ + name: string + /** + * Column providing the shard row id. Absent means the id is a hash of + * the text content, which also deduplicates identical texts before + * embedding (bounding API spend on denormalized columns). + */ + id_column?: string +} + +export interface VectorRefreshConfig { + enabled: boolean + /** Daemon timer cadence. Deliberately longer than cache maintenance's 60. */ + interval_minutes: number + /** Wall-clock budget per daemon tick (soft: checked before each shard). */ + max_tick_ms: number + /** Embedding row budget per daemon tick (soft: checked before each shard). */ + max_rows_per_tick: number +} + +export interface VectorSearchConfig { + indexes: VectorIndexDeclaration[] + refresh: VectorRefreshConfig +} + +export interface VectorConfigError { + pointer: string + message: string + errorKind: 'vector_config_invalid' +} + +export type VectorConfigResult = + | { ok: true, config: VectorSearchConfig } + | { ok: false, errors: VectorConfigError[] } + +/** Sidecar JSON written next to each shard parquet file. */ +export interface ShardMeta { + schema_version: 1 + index: string + dataset: string + column: string + /** Row-id column the shard was built with; absent for content-hash ids. */ + id_column?: string + partition: Record + /** Embedder model the vectors were produced with. */ + model: string + dimension: number + /** Embedded (deduplicated) vector count. */ + row_count: number + /** Cache partition row count at build time — the staleness signal. */ + source_row_count: number + built_at: string +} + +export type ShardStateKind = 'fresh' | 'stale_rows' | 'stale_model' | 'stale_dimension' | 'stale_config' | 'missing' | 'orphan' + +export interface ShardState { + /** Filename-safe partition rendering; shard file base name. */ + fileBase: string + state: ShardStateKind + /** Cache partition backing the shard; absent for orphans. */ + partition?: CachePartitionMeta + /** Sidecar meta; absent for missing shards. */ + meta?: ShardMeta +} + +export interface RefreshBudget { + /** Epoch ms after which no further shard build starts. */ + deadlineMs?: number + /** Max rows embedded this run; checked before each shard build. */ + maxRows?: number +} + +export interface ShardBuildReport { + index: string + fileBase: string + rowsEmbedded: number + dimension: number +} + +export interface RefreshReport { + shardsBuilt: number + shardsSkipped: number + orphansSwept: number + rowsEmbedded: number + /** True when missing/stale shards remain because a budget ran out. */ + budgetExhausted: boolean +} + +export interface VectorSearchRuntime { + ctx: PluginActivationContext + config: VectorSearchConfig + embedder: EmbedderCapability + storage: ExtendedQueryStorageService + log: PluginLogger + /** Root for shard files: `/indexes`. */ + indexesDir: string +} + +export interface HypvectorModule { + ok: true + searchVectors: typeof searchVectors + writeVectors: typeof writeVectors + fileWriter: typeof fileWriter +} + +export type HypvectorLoadResult = HypvectorModule | { ok: false, message: string } + +export interface RawShardHit { + id: string + score: number +} diff --git a/hypaware-core/smoke/flows/cli_bundled_plugins_activated.js b/hypaware-core/smoke/flows/cli_bundled_plugins_activated.js index a187985..b3b35b3 100644 --- a/hypaware-core/smoke/flows/cli_bundled_plugins_activated.js +++ b/hypaware-core/smoke/flows/cli_bundled_plugins_activated.js @@ -260,10 +260,14 @@ export async function run({ harness, expect }) { configBoots.map((/** @type {any} */ s) => s.attributes?.plugins_activated), (rows) => Array.isArray(rows) && rows.some((n) => n === 6) ) + // Skipped = allowlist plugins this flow's config does not name (the + // excluded-from-default set never reaches the skip loop). Bumps + // whenever a plugin joins V1_BUNDLED_PLUGIN_ALLOWLIST — most + // recently @hypaware/context-graph (3 -> 4). expect.that( - 'traces: at least one config-profile boot reports plugins_skipped=3', + 'traces: at least one config-profile boot reports plugins_skipped=4', configBoots.map((/** @type {any} */ s) => s.attributes?.plugins_skipped), - (rows) => Array.isArray(rows) && rows.some((n) => n === 3) + (rows) => Array.isArray(rows) && rows.some((n) => n === 4) ) const activateSpans = traces.filter((/** @type {any} */ t) => t.name === 'plugin.activate') diff --git a/hypaware-core/smoke/flows/vector_search_local_fixture.js b/hypaware-core/smoke/flows/vector_search_local_fixture.js new file mode 100644 index 0000000..7ca929c --- /dev/null +++ b/hypaware-core/smoke/flows/vector_search_local_fixture.js @@ -0,0 +1,425 @@ +// @ts-check + +import http from 'node:http' +import path from 'node:path' +import process from 'node:process' +import { fileURLToPath } from 'node:url' + +import { + Attr, + installObservability, + runRoot, +} from '../../../src/core/observability/index.js' +import { dispatch } from '../../../src/core/cli/dispatch.js' +import { createCommandRegistry } from '../../../src/core/registry/commands.js' +import { createKernelRuntime } from '../../../src/core/runtime/activation.js' +import { activatePlugins } from '../../../src/core/runtime/loader.js' +import { loadManifests } from '../../../src/core/manifest.js' + +/** + * @import { AddressInfo } from 'node:net' + * @import { ColumnSpec } from '../../../collectivus-plugin-kernel-types.d.ts' + * @import { PluginActivationEntry } from '../../../src/core/runtime/loader.d.ts' + */ + +const SMOKE_DIR = path.dirname(fileURLToPath(import.meta.url)) +const PLUGINS_WORKSPACE = path.resolve(SMOKE_DIR, '../../plugins-workspace') + +const DATASET = 'vector_rows' +const API_KEY_ENV = 'SMOKE_EMBED_KEY' +const API_KEY_VALUE = 'smoke-secret-embed-key-do-not-log' +const EMBED_MODEL = 'fake-letters-26' + +/** @type {ColumnSpec[]} */ +const COLUMNS = [{ name: 'text', type: 'STRING', nullable: true }] + +/** + * Hermetic smoke for `@hypaware/embedder-openai` + `@hypaware/vector-search`: + * a localhost fake embedder (no real API calls), real cache partitions, + * the real CLI dispatch path, and the daemon-style refresh source. + * + * Steps (each a `smoke_step`-tagged root span): + * 1. populate — seed two cache partitions with distinct texts. + * 2. plugin_activate — activate both plugins against the fake server. + * 3. status_missing — `hyp vector status --json` reports missing shards. + * 4. first_search — `hyp vector search` auto-builds both shards and + * ranks the matching text first. + * 5. staleness — appending rows flips one shard to stale_rows. + * 6. timer_refresh — the `vector-search-refresh` source tick rebuilds + * exactly the stale shard. + * 7. no_refresh_search — `--no-refresh` finds the newly indexed row. + * 8. telemetry — spans prove the build/search/tick paths ran, and + * neither the API key nor raw text leaked. + * + * @param {{ harness: any, expect: any }} args + */ +export async function run({ harness, expect }) { + const obs = installObservability() + const cacheRoot = path.join(harness.stateDir, 'cache') + const registry = createCommandRegistry() + const kernel = createKernelRuntime({ commandRegistry: registry, cacheRoot }) + + process.env[API_KEY_ENV] = API_KEY_VALUE + + // --- fake OpenAI-compatible embedder ------------------------------------- + // Embeds a text as its L2-normalized letter-frequency histogram, so + // cosine similarity ranks shared vocabulary deterministically: a + // query repeating a row's word scores 1.0 against that row. + let authHeadersSeen = 0 + let embedRequests = 0 + const server = http.createServer((req, res) => { + if (req.method !== 'POST' || req.url !== '/v1/embeddings') { + res.writeHead(404).end() + return + } + let body = '' + req.on('data', (chunk) => { body += chunk }) + req.on('end', () => { + embedRequests++ + if (req.headers.authorization === `Bearer ${API_KEY_VALUE}`) authHeadersSeen++ + const { input } = JSON.parse(body) + const data = input.map((/** @type {string} */ text, /** @type {number} */ index) => ({ + index, + embedding: letterHistogram(text), + })) + res.writeHead(200, { 'content-type': 'application/json' }) + res.end(JSON.stringify({ data, usage: { prompt_tokens: input.length, total_tokens: input.length } })) + }) + }) + await new Promise((resolve) => server.listen(0, '127.0.0.1', () => resolve(undefined))) + const address = /** @type {AddressInfo} */ (server.address()) + const baseUrl = `http://127.0.0.1:${address.port}` + + try { + // --- 1. populate two partitions ----------------------------------------- + await runRoot( + 'smoke.populate', + { + [Attr.COMPONENT]: 'vector-search', + [Attr.OPERATION]: 'smoke.populate', + [Attr.SMOKE_NAME]: harness.smokeName, + [Attr.SMOKE_STEP]: 'populate', + [Attr.DEV_RUN_ID]: harness.devRunId, + status: 'ok', + }, + async () => { + await kernel.storage.appendRowsToPartition(DATASET, ['source=alpha'], COLUMNS, [ + { text: 'alpha alpha alpha' }, + { text: 'omega omega omega' }, + ]) + await kernel.storage.appendRowsToPartition(DATASET, ['source=beta'], COLUMNS, [ + { text: 'beta beta beta' }, + ]) + } + ) + + // --- 2. activate plugins ------------------------------------------------- + await runRoot( + 'kernel.boot', + { + [Attr.COMPONENT]: 'kernel', + [Attr.OPERATION]: 'boot', + [Attr.SMOKE_NAME]: harness.smokeName, + [Attr.SMOKE_STEP]: 'plugin_activate', + [Attr.DEV_RUN_ID]: harness.devRunId, + status: 'ok', + }, + async () => { + const { loaded, failed } = await loadManifests([ + path.join(PLUGINS_WORKSPACE, 'embedder-openai'), + path.join(PLUGINS_WORKSPACE, 'vector-search'), + ]) + if (failed.length > 0) { + throw new Error(`manifest failures — ${failed.map((f) => `${f.manifestPath}: ${f.message}`).join('; ')}`) + } + // Embedder first: vector-search requires hypaware.embedder. + const byName = new Map(loaded.map((l) => [l.manifest.name, l])) + const embedderEntry = byName.get('@hypaware/embedder-openai') + const vectorEntry = byName.get('@hypaware/vector-search') + if (!embedderEntry || !vectorEntry) throw new Error('expected both plugin manifests') + /** @type {PluginActivationEntry[]} */ + const entries = [ + { + manifest: embedderEntry.manifest, + rootDir: embedderEntry.rootDir, + config: { base_url: baseUrl, model: EMBED_MODEL, api_key_env: API_KEY_ENV }, + }, + { + manifest: vectorEntry.manifest, + rootDir: vectorEntry.rootDir, + config: { + indexes: [{ dataset: DATASET, column: 'text' }], + // ~120ms so the timer step is fast; the budget fields keep + // their defaults — this smoke never approaches them. + refresh: { interval_minutes: 0.002 }, + }, + }, + ] + const result = await activatePlugins({ + plugins: entries, + stateRoot: harness.stateDir, + runId: harness.devRunId, + runtime: kernel, + tmpRoot: path.join(harness.tmpDir, 'plugin-temp'), + }) + for (const r of result.results) { + if (!r.ok) throw new Error(`activate ${r.plugin.name} failed (${r.errorKind}): ${r.message}`) + } + } + ) + + expect.that( + 'registry: vector search command registered', + registry.get('vector search'), + (/** @type {unknown} */ v) => v !== undefined + ) + + // --- 3. status: both shards missing -------------------------------------- + const statusBefore = await vectorStatusJson(registry, kernel) + expect.that('status: one index reported', statusBefore.length, (/** @type {number} */ v) => v === 1) + expect.that( + 'status: both shards missing before any search', + statusBefore[0].shards.map((/** @type {any} */ s) => s.state).sort(), + (/** @type {string[]} */ v) => v.length === 2 && v.every((s) => s === 'missing') + ) + + // --- 4. first search: auto-refresh builds both shards -------------------- + const firstSearch = await runRoot( + 'smoke.first_search', + { + [Attr.COMPONENT]: 'vector-search', + [Attr.OPERATION]: 'smoke.first_search', + [Attr.SMOKE_NAME]: harness.smokeName, + [Attr.SMOKE_STEP]: 'first_search', + [Attr.DEV_RUN_ID]: harness.devRunId, + status: 'ok', + }, + async () => { + const stdout = makeBuf() + const stderr = makeBuf() + const code = await dispatch( + ['vector', 'search', 'alpha', '--top-k', '3', '--format', 'json'], + { stdout, stderr, kernel, registry } + ) + return { code, stdout: stdout.text(), stderr: stderr.text() } + } + ) + expect.that('first search: exited 0', firstSearch.code, (/** @type {number} */ v) => v === 0) + expect.that( + 'first search: refresh estimate printed to stderr', + firstSearch.stderr, + (/** @type {string} */ v) => v.includes('refreshing 2 shard(s)') + ) + const firstHits = JSON.parse(firstSearch.stdout) + expect.that('first search: returned hits', firstHits.length, (/** @type {number} */ v) => v >= 2) + expect.that( + 'first search: alpha text ranked first with cosine ~1', + firstHits[0], + (/** @type {any} */ hit) => hit.text === 'alpha alpha alpha' && hit.partition === 'source=alpha' && hit.score > 0.99 + ) + expect.that( + 'first search: scores strictly ordered', + firstHits, + (/** @type {any[]} */ hits) => hits.every((h, i) => i === 0 || h.score <= hits[i - 1].score) + ) + + const statusFresh = await vectorStatusJson(registry, kernel) + expect.that( + 'status: both shards fresh after first search', + statusFresh[0].shards.map((/** @type {any} */ s) => s.state), + (/** @type {string[]} */ v) => v.length === 2 && v.every((s) => s === 'fresh') + ) + + // --- 5. staleness: append rows to one partition --------------------------- + await runRoot( + 'smoke.staleness', + { + [Attr.COMPONENT]: 'vector-search', + [Attr.OPERATION]: 'smoke.staleness', + [Attr.SMOKE_NAME]: harness.smokeName, + [Attr.SMOKE_STEP]: 'staleness', + [Attr.DEV_RUN_ID]: harness.devRunId, + status: 'ok', + }, + async () => { + await kernel.storage.appendRowsToPartition(DATASET, ['source=beta'], COLUMNS, [ + { text: 'gamma gamma gamma' }, + ]) + } + ) + const statusStale = await vectorStatusJson(registry, kernel) + expect.that( + 'status: exactly the appended shard is stale_rows', + statusStale[0].shards.map((/** @type {any} */ s) => s.state).sort(), + (/** @type {string[]} */ v) => v.join(',') === 'fresh,stale_rows' + ) + + // --- 6. daemon-style timer refresh ---------------------------------------- + await runRoot( + 'smoke.timer_refresh', + { + [Attr.COMPONENT]: 'vector-search', + [Attr.OPERATION]: 'smoke.timer_refresh', + [Attr.SMOKE_NAME]: harness.smokeName, + [Attr.SMOKE_STEP]: 'timer_refresh', + [Attr.DEV_RUN_ID]: harness.devRunId, + status: 'ok', + }, + async () => { + const ctx = kernel.activationContexts.get('@hypaware/vector-search') + if (!ctx) throw new Error('no activation context for @hypaware/vector-search') + await kernel.sources.start('vector-search-refresh', ctx) + try { + const deadline = Date.now() + 5_000 + for (;;) { + const status = await vectorStatusJson(registry, kernel) + const states = status[0].shards.map((/** @type {any} */ s) => s.state) + if (states.every((/** @type {string} */ s) => s === 'fresh')) break + if (Date.now() > deadline) throw new Error(`timer refresh did not converge: ${states.join(',')}`) + await sleep(50) + } + } finally { + await kernel.sources.stop('vector-search-refresh') + } + } + ) + + // --- 7. --no-refresh search sees the timer-built shard -------------------- + const secondSearch = await runRoot( + 'smoke.no_refresh_search', + { + [Attr.COMPONENT]: 'vector-search', + [Attr.OPERATION]: 'smoke.no_refresh_search', + [Attr.SMOKE_NAME]: harness.smokeName, + [Attr.SMOKE_STEP]: 'no_refresh_search', + [Attr.DEV_RUN_ID]: harness.devRunId, + status: 'ok', + }, + async () => { + const stdout = makeBuf() + const stderr = makeBuf() + const code = await dispatch( + ['vector', 'search', 'gamma', '--no-refresh', '--top-k', '1', '--format', 'json'], + { stdout, stderr, kernel, registry } + ) + return { code, stdout: stdout.text(), stderr: stderr.text() } + } + ) + expect.that('no-refresh search: exited 0', secondSearch.code, (/** @type {number} */ v) => v === 0) + expect.that( + 'no-refresh search: finds the timer-indexed row', + JSON.parse(secondSearch.stdout)[0], + (/** @type {any} */ hit) => hit?.text === 'gamma gamma gamma' && hit.partition === 'source=beta' + ) + + // --- 8. telemetry ---------------------------------------------------------- + await obs.shutdown() + const traces = await expect.traces() + const logs = await expect.logs() + + const buildSpans = traces.filter((/** @type {any} */ t) => t.name === 'vector.build_shard') + expect.that( + 'telemetry: exactly 3 shard builds (2 first search + 1 timer)', + buildSpans.length, + (/** @type {number} */ v) => v === 3 + ) + expect.that( + 'telemetry: shard builds all ok with row counts', + buildSpans, + (/** @type {any[]} */ spans) => spans.every((s) => s.attributes?.status === 'ok' && s.attributes?.row_count >= 1) + ) + const timerBuild = buildSpans.filter((/** @type {any} */ t) => t.attributes?.shard_reason === 'stale_rows') + expect.that('telemetry: the timer rebuild was the stale shard', timerBuild.length, (/** @type {number} */ v) => v === 1) + + const searchSpans = traces.filter((/** @type {any} */ t) => t.name === 'vector.search') + expect.that('telemetry: two vector.search spans', searchSpans.length, (/** @type {number} */ v) => v === 2) + expect.that( + 'telemetry: search refresh modes recorded', + searchSpans.map((/** @type {any} */ t) => t.attributes?.refresh_mode).sort(), + (/** @type {string[]} */ v) => v.join(',') === 'auto,never' + ) + + const tickSpans = traces.filter((/** @type {any} */ t) => t.name === 'vector.refresh_tick') + expect.that('telemetry: refresh tick span proves the timer path ran', tickSpans.length, (/** @type {number} */ v) => v >= 1) + expect.that( + 'telemetry: some tick built the stale shard', + tickSpans, + (/** @type {any[]} */ spans) => spans.some((s) => s.attributes?.shards_built >= 1) + ) + + const embedSpans = traces.filter((/** @type {any} */ t) => t.name === 'embedder.embed') + expect.that('telemetry: embedder spans emitted', embedSpans.length, (/** @type {number} */ v) => v >= 3) + expect.that( + 'telemetry: embedder spans carry counts, not content', + embedSpans, + (/** @type {any[]} */ spans) => spans.every((s) => typeof s.attributes?.text_count === 'number') + ) + + expect.that( + 'telemetry: shard_built logs emitted', + logs.filter((/** @type {any} */ l) => l.body === 'vector.shard_built').length, + (/** @type {number} */ v) => v === 3 + ) + + // Secret-safety: neither the API key nor any indexed text appears in + // any telemetry record. + const everything = JSON.stringify(traces) + JSON.stringify(logs) + expect.that( + 'telemetry: API key never appears', + everything.includes(API_KEY_VALUE), + (/** @type {boolean} */ v) => v === false + ) + expect.that( + 'telemetry: indexed text never appears', + everything.includes('alpha alpha alpha'), + (/** @type {boolean} */ v) => v === false + ) + + expect.that('fake embedder: requests were authenticated', authHeadersSeen, (/** @type {number} */ v) => v === embedRequests && v >= 3) + } finally { + server.close() + delete process.env[API_KEY_ENV] + } +} + +/** + * 26-dim L2-normalized letter histogram. + * + * @param {string} text + * @returns {number[]} + */ +function letterHistogram(text) { + const counts = new Array(26).fill(0) + for (const ch of text.toLowerCase()) { + const i = ch.charCodeAt(0) - 97 + if (i >= 0 && i < 26) counts[i]++ + } + const norm = Math.sqrt(counts.reduce((acc, c) => acc + c * c, 0)) || 1 + return counts.map((c) => c / norm) +} + +/** + * @param {ReturnType} registry + * @param {ReturnType} kernel + * @returns {Promise} + */ +async function vectorStatusJson(registry, kernel) { + const stdout = makeBuf() + const stderr = makeBuf() + const code = await dispatch(['vector', 'status', '--json'], { stdout, stderr, kernel, registry }) + if (code !== 0) throw new Error(`vector status failed: ${stderr.text()}`) + return JSON.parse(stdout.text()) +} + +function makeBuf() { + let buf = '' + return { + write(/** @type {string} */ chunk) { buf += String(chunk) }, + text() { return buf }, + } +} + +/** @param {number} ms */ +function sleep(ms) { + return new Promise((resolve) => setTimeout(resolve, ms)) +} diff --git a/llp/0000-hypaware.explainer.md b/llp/0000-hypaware.explainer.md index a249b69..1feba11 100644 --- a/llp/0000-hypaware.explainer.md +++ b/llp/0000-hypaware.explainer.md @@ -72,6 +72,7 @@ plugin that registers a dataset gets query and formatting for free. | Observability & self-instrumentation | [0021](./0021-observability.spec.md) | Spec | | Iceberg export partitioning | [0022](./0022-iceberg-export-partitioning.spec.md) | Spec | | Context-graph T0 projection | [0023](./0023-context-graph-projection.decision.md) | Decision | +| Vector search plugin | [0024](./0024-vector-search-plugin.decision.md) | Decision | | Remote config & join flow | [0025](./0025-remote-config-join-flow.spec.md) | Spec | ## Where to start diff --git a/llp/0003-core-vs-plugin-surface.spec.md b/llp/0003-core-vs-plugin-surface.spec.md index 8d8a5cf..c6b51ce 100644 --- a/llp/0003-core-vs-plugin-surface.spec.md +++ b/llp/0003-core-vs-plugin-surface.spec.md @@ -42,6 +42,13 @@ dataset gets the same query and formatting behavior for free. The local query cache ([LLP 0013](./0013-local-query-cache.decision.md)) is not a plugin and never appears in `plugins[]`. +"Query is intrinsic" means the **SQL/dataset surface** specifically: the +dataset registry, SQL execution, cursors, freshness, and formatting. Other +query modalities (e.g. vector similarity search) are **plugin capabilities** +that build on the intrinsic surface, not kernel surface — decided 2026-06-12 +when scoping `@hypaware/vector-search` +([LLP 0024](./0024-vector-search-plugin.decision.md#plugin-not-kernel)). + **Partition-spec derivation is core surface.** The helpers that turn a dataset's partitioning declaration into an Iceberg `PartitionSpec` and guard its stability — `partitionSpecForDeclaration` and `validatePartitionSpecStability`, with the diff --git a/llp/0024-vector-search-plugin.decision.md b/llp/0024-vector-search-plugin.decision.md new file mode 100644 index 0000000..c209736 --- /dev/null +++ b/llp/0024-vector-search-plugin.decision.md @@ -0,0 +1,182 @@ +# LLP 0024: Vector Search Plugin + +**Type:** Decision +**Status:** Active +**Systems:** Plugins, Query +**Author:** Phil / Claude +**Date:** 2026-06-12 +**Related:** LLP 0003, LLP 0008, LLP 0013, LLP 0015 + +> Adopting [hypvector](https://github.com/hyparam/hypvector) for vector +> similarity search over cached datasets, as a bundled plugin. + +## Plugin, not kernel + +Vector search is a **plugin capability** (`hypaware.vector-search`), not +intrinsic query surface. "Query is intrinsic" means the SQL/dataset surface +only — see the sharpened wording in +[LLP 0003](./0003-core-vs-plugin-surface.spec.md#intrinsic-not-plugin-provided). +The plugin is `@hypaware/vector-search`, bundled in +`hypaware-core/plugins-workspace` per the V1 packaging divergence +([LLP 0002](./0002-v1-scope.decision.md#plugin-packaging-divergence)). + +## Packaging + +`hypvector` is a root **`optionalDependency`**, mirroring `hyparquet-writer` +(which it transitively needs for index *writes*). The plugin degrades +gracefully when the optional dep is absent: activation succeeds, commands +report the missing dependency. This follows the `@hypaware/format-parquet` +precedent — bundled plugins import from root `node_modules`; LLP 0008's +pre-bundling rule applies to separately installed plugins. + +## Index files are plugin state + +Vector index parquet files live under the plugin's managed state directory +([LLP 0004](./0004-activation-and-paths.spec.md)), keyed by **index name** +(not dataset — several indexes may cover one dataset): +`/indexes//.parquet`, each with a +`.meta.json` sidecar. They are +**derived artifacts**: rebuildable from the cache, never the system of record. +This deliberately does *not* touch the intrinsic cache layout +([LLP 0013](./0013-local-query-cache.decision.md) — layout is fixed, not +plugin-extensible) and does not ride the sink driver (indexes are not +exports). + +## Indexes are declared in config, sharded per partition + +Index definitions (`dataset` + `column`, plus an optional `name` and +`id_column`) live in the **v2 config** under the plugin's section — not +per-host state. The embedder model is deliberately *not* part of the +declaration: it comes from the required `hypaware.embedder` capability, so +swapping embedders is one config change, not an edit to every index. Unlike +`collect` collections ([LLP 0015](./0015-query-and-datasets.spec.md#collections-are-per-host-state)), +which point at host-only paths, an index definition is portable: every host +has its own cache of the same registered datasets, and centrally managed +config (the in-flight remote-config/join-flow spec) can push one index +policy org-wide. The built **artifacts** stay per-host plugin state. + +The index is **sharded one hypvector file per cache partition**. Search +discovers partitions through the dataset registry (never hard-coded names), +fans out across shards, and merges top-K. Shard file names are a sanitized +human label **plus a short hash of the canonical partition JSON** — the +sanitizer alone is lossy (`source=a/b` vs `source=a_b`), so the hash, not +the label, is what guarantees distinct partitions get distinct files. + +Each shard's sidecar records its full identity (index, dataset, column, +id_column, exact partition) alongside the embedder model and dimension, and +freshness checks *all of it*: a declaration changed under a reused index +name (`stale_config`), a model change (`stale_model`), and a dimension +change for the same model (`stale_dimension` — a changed embedder +`dimensions` setting, or a different server behind the same model name) are +all *staleness*, not errors — they re-embed through the normal refresh path +below. Search embeds the query before refreshing so the query's own +dimension feeds that staleness check. A mismatch that can't be resolved by +refresh — under `--no-refresh`, or an embedder returning inconsistent +dimensions — is a hard error, never a silent degraded search. + +## Freshness rides the cache-maintenance pattern + +Two refresh paths, both incremental (only missing/stale shards): + +- **Daemon timer**, modeled on cache maintenance + (`src/core/cache/maintenance.js`): its own configurable interval (longer + than maintenance's 60 min default) and a `max_tick_ms`-style budget per + tick. Bounded per-partition shard writes match the work-shape the + maintenance tick already performs in-daemon (compaction); the monolithic + parquet-export OOM constraint does not apply at this granularity. Embedding + API calls from the daemon are covered by the embedder's `network` + permission. +- **Search-time refresh modes**, mirroring query's `refresh` flag: default + refreshes stale shards with progress and an upfront row/token estimate; + `--no-refresh` searches existing shards only. Per-shard writes are durable, + so an interrupted cold build resumes. + +Because both paths can target the same shard concurrently, builds of one +shard **serialize through an in-process lock**, and temp files carry +pid + UUID so concurrent *processes* (CLI search vs daemon tick) never +share a temp path; the final rename stays atomic, so the worst cross-process +case is a redundant last-writer-wins build of identical content. + +Retention coupling dissolves: when cache retention evicts a partition, its +shard is an orphan and the next tick sweeps it. There is no +index-over-deleted-rows staleness class and no full-rebuild command in the +core flow. + +## CLI surface + +Contributed through the CLI registry ([LLP 0009](./0009-cli-registry.spec.md)): +`hyp vector search ` (with `--dataset`, `--top-k`, refresh flags) and +`hyp vector status` (per-dataset shard coverage, model, staleness). Results +format through the intrinsic formatter — table/json/jsonl/markdown for free. + +## Embedding is a separate capability + +Embedding production is its own capability, **`hypaware.embedder`**, which +`@hypaware/vector-search` `requires` ([LLP 0006](./0006-dependencies-and-capabilities.spec.md)). +The embedder choice is therefore an explicit `plugins[]` config decision, not +something baked into vector-search. + +**The first first-party embedder is API-backed**, not local. This is a +deliberate exception to HypAware's local-only posture: enabling it sends +captured content (the text being indexed, and each query string) to an +external embedding API. Chosen for speed-to-working-feature over a local +Transformers.js/WASM embedder (~25–200 MB weight downloads, slower CPU +inference, and tension with [LLP 0008](./0008-plugin-runtime-dependencies.decision.md)'s +pure-JS rule via ONNX runtimes). A local embedder remains the intended +follow-up; the capability split means it's a config swap, not a refactor. + +The index stores the embedder's **model name and dimension** in each shard's +JSON sidecar (the hypvector parquet's KV metadata cannot carry them); +query-time drift from either classifies the shard stale and re-embeds, and +only a mismatch refresh cannot fix is a hard error, not a silent degraded +search. + +## Embedder speaks OpenAI-compatible, base_url configurable + +The first-party embedder is **`@hypaware/embedder-openai`** — named for its +wire shape, not its vendor, so a future local embedder takes a sibling name +instead of fighting over a generic `@hypaware/embedder`. It is a single HTTP +client for the OpenAI-compatible `POST /v1/embeddings` shape with a +**configurable `base_url`**. Defaults: `https://api.openai.com`, model +`text-embedding-3-small`. One plugin therefore covers OpenAI, +OpenAI-compatible proxies, **and local servers (Ollama, LM Studio)** — +pointing `base_url` at localhost restores a fully-local privacy path without +HypAware shipping a model runtime, keeping +[LLP 0008](./0008-plugin-runtime-dependencies.decision.md) untouched. When +the configured key env var is unset the request goes out without an +`Authorization` header, so localhost servers need zero credential config. + +Because enabling the embedder is the explicit opt-in that lets captured text +leave the machine, **neither plugin is in the default-activation allowlist** +(`V1_EXCLUDED_FROM_DEFAULT`): both activate only through an explicit +`plugins[]` entry. + +Credential handling follows the `@hypaware/s3` precedent: config carries only +non-secret references (the **env var name**, default `OPENAI_API_KEY`), the +key resolves from the environment at call time, the manifest declares +`permissions: ["network", "read_env"]`, and credential material never reaches +logs or telemetry. The same posture covers the response direction: +**provider error bodies are never copied into errors or logs** — a provider +or proxy may echo the input texts (captured content) or credentials back in +its error detail, so failures surface as status + endpoint + error kind +only, every part of which comes from config. + +## Open questions + +- **Cost visibility** — *resolved 2026-06-12*: the daemon timer takes a + per-tick **row budget**, `refresh.max_rows_per_tick` (default 5000), + alongside `max_tick_ms`. Both budgets are soft — checked before each shard + build, not mid-build — so one oversized partition overshoots a tick once + instead of starving forever. Spend surfaces as `rows_embedded` / + `budget_exhausted` on the `vector.refresh_tick` span and in the refresh + source's status details. Two further mitigations fell out of the design: + the default content-hash row id deduplicates identical texts before + embedding, and search-time refresh (unbudgeted, since it is interactive) + prints an upfront shard/row estimate before spending. A per-day or + token-denominated budget remains future work if row budgets prove too + coarse. +- **Shard search mode** — per-partition shards will usually be small enough + for hypvector's exact scan; revisit per-shard approximate clustering only + if shard sizes or query latency demand it. (Shards are written without + binary/cluster columns, so hypvector's `auto` algorithm takes the exact + path today.) diff --git a/package.json b/package.json index 9b3b2f3..eacf577 100644 --- a/package.json +++ b/package.json @@ -42,7 +42,8 @@ "squirreling": "0.12.23" }, "optionalDependencies": { - "hyparquet-writer": "0.15.6" + "hyparquet-writer": "0.15.6", + "hypvector": "0.1.1" }, "overrides": { "hyparquet-writer": "0.15.6", diff --git a/src/core/runtime/bundled.js b/src/core/runtime/bundled.js index 6e69e77..b370385 100644 --- a/src/core/runtime/bundled.js +++ b/src/core/runtime/bundled.js @@ -44,11 +44,21 @@ export const V1_BUNDLED_PLUGIN_ALLOWLIST = new Set(/** @type {PluginName[]} */ ( * Activation requires explicit config (`{ name: '@hypaware/gascity' }`) * or an init preset — the picker and default boot profiles skip them. * + * The embedder/vector-search pair is excluded because enabling an + * API-backed embedder is the explicit opt-in that lets captured text + * leave the machine — it must be a deliberate `plugins[]` decision, + * never a default. `@hypaware/vector-search` follows it: its manifest + * requires `hypaware.embedder`, which no default-activated plugin + * provides. + * * @type {ReadonlySet} + * @ref LLP 0024#embedding-is-a-separate-capability [constrained-by] — the embedder choice is an explicit plugins[] config decision, so neither plugin default-activates */ export const V1_EXCLUDED_FROM_DEFAULT = new Set(/** @type {PluginName[]} */ ([ '@hypaware/central', '@hypaware/gascity', + '@hypaware/embedder-openai', + '@hypaware/vector-search', ])) /** diff --git a/test/plugins/embedder-openai-client.test.js b/test/plugins/embedder-openai-client.test.js new file mode 100644 index 0000000..e05abcd --- /dev/null +++ b/test/plugins/embedder-openai-client.test.js @@ -0,0 +1,210 @@ +// @ts-check + +import test from 'node:test' +import assert from 'node:assert/strict' + +import { createOpenAiEmbedder, parseEmbeddingsPayload } from '../../hypaware-core/plugins-workspace/embedder-openai/src/client.js' +import { validateEmbedderConfig } from '../../hypaware-core/plugins-workspace/embedder-openai/src/config.js' + +/** + * @import { EmbedderOpenAiConfig, FetchLike } from '../../hypaware-core/plugins-workspace/embedder-openai/src/types.d.ts' + */ + +const SECRET = 'sk-test-secret-value' + +/** @returns {EmbedderOpenAiConfig} */ +function baseConfig(overrides = {}) { + const result = validateEmbedderConfig({ api_key_env: 'TEST_EMBED_KEY', ...overrides }) + if (!result.ok) throw new Error('test config invalid') + return result.config +} + +function noopLog() { + return { debug() {}, info() {}, warn() {}, error() {} } +} + +/** + * Fake fetch that records requests and replies with index-aligned + * embeddings derived from each input's length. + * + * @param {{ status?: number, body?: unknown, reverse?: boolean }} [opts] + */ +function makeFakeFetch(opts = {}) { + /** @type {Array<{ url: string, headers: Record, body: any }>} */ + const requests = [] + /** @type {FetchLike} */ + const fetchImpl = async (url, init) => { + const body = JSON.parse(init.body) + requests.push({ url, headers: init.headers, body }) + if (opts.status && opts.status !== 200) { + return { + ok: false, + status: opts.status, + json: async () => opts.body ?? {}, + text: async () => JSON.stringify(opts.body ?? { error: { message: 'denied' } }), + } + } + const inputs = /** @type {string[]} */ (body.input) + const data = inputs.map((text, index) => ({ index, embedding: [text.length, 1, 0] })) + if (opts.reverse) data.reverse() + const payload = opts.body ?? { data, usage: { prompt_tokens: inputs.length * 3, total_tokens: inputs.length * 3 } } + return { + ok: true, + status: 200, + json: async () => payload, + text: async () => JSON.stringify(payload), + } + } + return { requests, fetchImpl } +} + +test('embed sends Bearer key from the configured env var and returns aligned vectors', async () => { + const { requests, fetchImpl } = makeFakeFetch({ reverse: true }) + const embedder = createOpenAiEmbedder({ + config: baseConfig(), + env: { TEST_EMBED_KEY: SECRET }, + log: noopLog(), + fetchImpl, + }) + const result = await embedder.embed(['a', 'bbb', 'cc']) + assert.equal(requests.length, 1) + assert.equal(requests[0].url, 'https://api.openai.com/v1/embeddings') + assert.equal(requests[0].headers.authorization, `Bearer ${SECRET}`) + assert.equal(requests[0].body.model, 'text-embedding-3-small') + // Response entries arrived reversed; alignment must come from `index`. + assert.deepEqual(Array.from(result.vectors[0]), [1, 1, 0]) + assert.deepEqual(Array.from(result.vectors[1]), [3, 1, 0]) + assert.deepEqual(Array.from(result.vectors[2]), [2, 1, 0]) + assert.equal(result.dimension, 3) + assert.equal(result.model, 'text-embedding-3-small') + assert.equal(result.usage?.prompt_tokens, 9) +}) + +test('embed chunks batches larger than max_batch and preserves order', async () => { + const { requests, fetchImpl } = makeFakeFetch() + const embedder = createOpenAiEmbedder({ + config: baseConfig({ max_batch: 2 }), + env: { TEST_EMBED_KEY: SECRET }, + log: noopLog(), + fetchImpl, + }) + const result = await embedder.embed(['a', 'bb', 'ccc', 'dddd', 'eeeee']) + assert.equal(requests.length, 3) + assert.deepEqual(result.vectors.map((v) => v[0]), [1, 2, 3, 4, 5]) +}) + +test('embed without the env var sends no Authorization header (localhost servers)', async () => { + const { requests, fetchImpl } = makeFakeFetch() + const embedder = createOpenAiEmbedder({ + config: baseConfig(), + env: {}, + log: noopLog(), + fetchImpl, + }) + await embedder.embed(['a']) + assert.equal(requests[0].headers.authorization, undefined) +}) + +test('embed maps a 401 without a key to a hint naming the env var, never the value', async () => { + const { fetchImpl } = makeFakeFetch({ status: 401 }) + const embedder = createOpenAiEmbedder({ + config: baseConfig(), + env: {}, + log: noopLog(), + fetchImpl, + }) + await assert.rejects( + () => embedder.embed(['a']), + (/** @type {Error & { hypErrorKind?: string }} */ err) => { + assert.equal(err.hypErrorKind, 'embedder_http_401') + assert.match(err.message, /TEST_EMBED_KEY is unset/) + return true + } + ) +}) + +test('embed error messages never contain the API key', async () => { + const { fetchImpl } = makeFakeFetch({ status: 500 }) + const embedder = createOpenAiEmbedder({ + config: baseConfig(), + env: { TEST_EMBED_KEY: SECRET }, + log: noopLog(), + fetchImpl, + }) + await assert.rejects( + () => embedder.embed(['a']), + (/** @type {Error} */ err) => { + assert.ok(!err.message.includes(SECRET), 'key must not leak into the error message') + return true + } + ) +}) + +test('embed errors and logs never contain the provider error body', async () => { + // A provider/proxy may echo the input texts or credentials back in + // its error detail; the client must not copy any of the body into + // the thrown (and therefore logged) message. + const echoed = 'PROVIDER_ECHOED_INPUT_TEXT sk-leaked-key' + const { fetchImpl } = makeFakeFetch({ status: 400, body: { error: { message: echoed } } }) + /** @type {string[]} */ + const loggedMessages = [] + const log = { + debug() {}, + info() {}, + warn() {}, + /** @param {string} _event @param {Record} fields */ + error(_event, fields) { loggedMessages.push(JSON.stringify(fields)) }, + } + const embedder = createOpenAiEmbedder({ + config: baseConfig(), + env: { TEST_EMBED_KEY: SECRET }, + log, + fetchImpl, + }) + await assert.rejects( + () => embedder.embed(['a']), + (/** @type {Error & { hypErrorKind?: string, status?: number }} */ err) => { + assert.equal(err.hypErrorKind, 'embedder_http_400') + assert.equal(err.status, 400) + assert.ok(!err.message.includes('PROVIDER_ECHOED'), 'provider body must not reach the error message') + assert.match(err.message, /HTTP 400/) + return true + } + ) + assert.equal(loggedMessages.length, 1) + assert.ok(!loggedMessages[0].includes('PROVIDER_ECHOED'), 'provider body must not reach logs') + assert.ok(!loggedMessages[0].includes(SECRET), 'key must not reach logs') +}) + +test('embed rejects an empty input batch', async () => { + const { fetchImpl } = makeFakeFetch() + const embedder = createOpenAiEmbedder({ + config: baseConfig(), + env: {}, + log: noopLog(), + fetchImpl, + }) + await assert.rejects( + () => embedder.embed([]), + (/** @type {Error & { hypErrorKind?: string }} */ err) => err.hypErrorKind === 'embedder_empty_input' + ) +}) + +test('embed surfaces a count mismatch as embedder_bad_response', async () => { + const { fetchImpl } = makeFakeFetch({ body: { data: [] } }) + const embedder = createOpenAiEmbedder({ + config: baseConfig(), + env: {}, + log: noopLog(), + fetchImpl, + }) + await assert.rejects( + () => embedder.embed(['a', 'b']), + (/** @type {Error & { hypErrorKind?: string }} */ err) => err.hypErrorKind === 'embedder_bad_response' + ) +}) + +test('parseEmbeddingsPayload rejects a malformed entry and a missing index', () => { + assert.throws(() => parseEmbeddingsPayload({ data: [{ index: 0, embedding: [] }] }, 1, 'x')) + assert.throws(() => parseEmbeddingsPayload({ data: [{ index: 1, embedding: [1] }, { index: 1, embedding: [1] }] }, 2, 'x')) +}) diff --git a/test/plugins/embedder-openai-config.test.js b/test/plugins/embedder-openai-config.test.js new file mode 100644 index 0000000..29e97d5 --- /dev/null +++ b/test/plugins/embedder-openai-config.test.js @@ -0,0 +1,87 @@ +// @ts-check + +import test from 'node:test' +import assert from 'node:assert/strict' + +import { + DEFAULT_API_KEY_ENV, + DEFAULT_BASE_URL, + DEFAULT_MAX_BATCH, + DEFAULT_MODEL, + DEFAULT_TIMEOUT_MS, + embeddingsEndpoint, + validateEmbedderConfig, +} from '../../hypaware-core/plugins-workspace/embedder-openai/src/config.js' + +test('validateEmbedderConfig defaults to OpenAI with OPENAI_API_KEY', () => { + const result = validateEmbedderConfig(undefined) + assert.equal(result.ok, true) + if (!result.ok) return + assert.equal(result.config.base_url, DEFAULT_BASE_URL) + assert.equal(result.config.model, DEFAULT_MODEL) + assert.equal(result.config.api_key_env, DEFAULT_API_KEY_ENV) + assert.equal(result.config.max_batch, DEFAULT_MAX_BATCH) + assert.equal(result.config.timeout_ms, DEFAULT_TIMEOUT_MS) + assert.equal(result.config.dimensions, undefined) +}) + +test('validateEmbedderConfig accepts a localhost override (Ollama shape)', () => { + const result = validateEmbedderConfig({ + base_url: 'http://localhost:11434/v1/', + model: 'nomic-embed-text', + api_key_env: 'MY_KEY', + max_batch: 16, + timeout_ms: 5000, + }) + assert.equal(result.ok, true) + if (!result.ok) return + assert.equal(result.config.base_url, 'http://localhost:11434/v1') + assert.equal(result.config.model, 'nomic-embed-text') + assert.equal(result.config.api_key_env, 'MY_KEY') +}) + +test('validateEmbedderConfig accepts dimensions for v3 shortening', () => { + const result = validateEmbedderConfig({ dimensions: 256 }) + assert.equal(result.ok, true) + if (!result.ok) return + assert.equal(result.config.dimensions, 256) +}) + +test('validateEmbedderConfig rejects a non-object config', () => { + const result = validateEmbedderConfig('nope') + assert.equal(result.ok, false) + if (result.ok) return + assert.equal(result.errors[0].errorKind, 'embedder_config_invalid') +}) + +test('validateEmbedderConfig rejects a non-http base_url', () => { + const result = validateEmbedderConfig({ base_url: 'ftp://example.com' }) + assert.equal(result.ok, false) + if (result.ok) return + assert.equal(result.errors[0].pointer, '/base_url') +}) + +test('validateEmbedderConfig rejects a malformed base_url', () => { + const result = validateEmbedderConfig({ base_url: 'not a url' }) + assert.equal(result.ok, false) + if (result.ok) return + assert.equal(result.errors[0].pointer, '/base_url') +}) + +test('validateEmbedderConfig rejects non-positive numeric fields', () => { + for (const key of ['dimensions', 'max_batch', 'timeout_ms']) { + const result = validateEmbedderConfig({ [key]: 0 }) + assert.equal(result.ok, false, `${key}=0 must fail`) + if (result.ok) continue + assert.equal(result.errors[0].pointer, `/${key}`) + } +}) + +test('embeddingsEndpoint appends /v1/embeddings to a bare origin', () => { + assert.equal(embeddingsEndpoint('https://api.openai.com'), 'https://api.openai.com/v1/embeddings') +}) + +test('embeddingsEndpoint does not double /v1 on a /v1-suffixed base', () => { + assert.equal(embeddingsEndpoint('http://localhost:11434/v1'), 'http://localhost:11434/v1/embeddings') + assert.equal(embeddingsEndpoint('http://localhost:11434/v1/'), 'http://localhost:11434/v1/embeddings') +}) diff --git a/test/plugins/vector-search-config.test.js b/test/plugins/vector-search-config.test.js new file mode 100644 index 0000000..7810013 --- /dev/null +++ b/test/plugins/vector-search-config.test.js @@ -0,0 +1,90 @@ +// @ts-check + +import test from 'node:test' +import assert from 'node:assert/strict' + +import { + REFRESH_DEFAULTS, + validateVectorSearchConfig, +} from '../../hypaware-core/plugins-workspace/vector-search/src/config.js' + +test('validateVectorSearchConfig defaults to no indexes and enabled refresh', () => { + const result = validateVectorSearchConfig(undefined) + assert.equal(result.ok, true) + if (!result.ok) return + assert.deepEqual(result.config.indexes, []) + assert.equal(result.config.refresh.enabled, true) + assert.equal(result.config.refresh.interval_minutes, REFRESH_DEFAULTS.interval_minutes) + assert.equal(result.config.refresh.max_tick_ms, REFRESH_DEFAULTS.max_tick_ms) + assert.equal(result.config.refresh.max_rows_per_tick, REFRESH_DEFAULTS.max_rows_per_tick) +}) + +test('refresh interval default is longer than cache maintenance (60m)', () => { + assert.ok(REFRESH_DEFAULTS.interval_minutes > 60) +}) + +test('index name defaults to dataset.column', () => { + const result = validateVectorSearchConfig({ + indexes: [{ dataset: 'ai_gateway_messages', column: 'content' }], + }) + assert.equal(result.ok, true) + if (!result.ok) return + assert.equal(result.config.indexes[0].name, 'ai_gateway_messages.content') + assert.equal(result.config.indexes[0].id_column, undefined) +}) + +test('explicit index name and id_column are honored', () => { + const result = validateVectorSearchConfig({ + indexes: [{ dataset: 'd', column: 'c', name: 'my-index', id_column: 'message_id' }], + }) + assert.equal(result.ok, true) + if (!result.ok) return + assert.equal(result.config.indexes[0].name, 'my-index') + assert.equal(result.config.indexes[0].id_column, 'message_id') +}) + +test('index names that would escape the state dir are rejected', () => { + for (const name of ['../up', 'a/b', '.hidden', '']) { + const result = validateVectorSearchConfig({ indexes: [{ dataset: 'd', column: 'c', name }] }) + assert.equal(result.ok, false, `name '${name}' must fail`) + } +}) + +test('duplicate index names are rejected', () => { + const result = validateVectorSearchConfig({ + indexes: [ + { dataset: 'd', column: 'c' }, + { dataset: 'd', column: 'c' }, + ], + }) + assert.equal(result.ok, false) + if (result.ok) return + assert.match(result.errors[0].message, /duplicate index name/) +}) + +test('index declarations missing dataset or column are rejected with pointers', () => { + const result = validateVectorSearchConfig({ indexes: [{ column: 'c' }, { dataset: 'd' }] }) + assert.equal(result.ok, false) + if (result.ok) return + assert.equal(result.errors[0].pointer, '/indexes/0/dataset') + assert.equal(result.errors[1].pointer, '/indexes/1/column') +}) + +test('refresh budgets reject non-positive values', () => { + for (const key of ['interval_minutes', 'max_tick_ms', 'max_rows_per_tick']) { + const result = validateVectorSearchConfig({ refresh: { [key]: -1 } }) + assert.equal(result.ok, false, `${key}=-1 must fail`) + } +}) + +test('refresh interval accepts fractional minutes (sub-minute smoke ticks)', () => { + const result = validateVectorSearchConfig({ refresh: { interval_minutes: 0.005 } }) + assert.equal(result.ok, true) +}) + +test('refresh can be disabled', () => { + const result = validateVectorSearchConfig({ refresh: { enabled: false } }) + assert.equal(result.ok, true) + if (!result.ok) return + assert.equal(result.config.refresh.enabled, false) +}) diff --git a/test/plugins/vector-search-manifests.test.js b/test/plugins/vector-search-manifests.test.js new file mode 100644 index 0000000..bfdf678 --- /dev/null +++ b/test/plugins/vector-search-manifests.test.js @@ -0,0 +1,61 @@ +// @ts-check + +import test from 'node:test' +import assert from 'node:assert/strict' +import path from 'node:path' +import { fileURLToPath } from 'node:url' + +import { loadManifests } from '../../src/core/manifest.js' +import { + V1_BUNDLED_PLUGIN_ALLOWLIST, + V1_EXCLUDED_FROM_DEFAULT, +} from '../../src/core/runtime/bundled.js' +import { parseVectorSearchArgv } from '../../hypaware-core/plugins-workspace/vector-search/src/commands.js' + +const WORKSPACE = path.resolve( + path.dirname(fileURLToPath(import.meta.url)), + '../../hypaware-core/plugins-workspace' +) + +test('embedder-openai and vector-search manifests load and validate', async () => { + const { loaded, failed } = await loadManifests([ + path.join(WORKSPACE, 'embedder-openai'), + path.join(WORKSPACE, 'vector-search'), + ]) + assert.equal(failed.length, 0, failed.map((f) => f.message).join('; ')) + assert.equal(loaded.length, 2) + + const embedder = loaded.find((l) => l.manifest.name === '@hypaware/embedder-openai') + assert.ok(embedder) + assert.deepEqual(embedder.manifest.provides?.capabilities, { 'hypaware.embedder': '1.0.0' }) + assert.deepEqual(embedder.manifest.permissions, ['network', 'read_env']) + + const vector = loaded.find((l) => l.manifest.name === '@hypaware/vector-search') + assert.ok(vector) + assert.deepEqual(vector.manifest.provides?.capabilities, { 'hypaware.vector-search': '1.0.0' }) + assert.deepEqual(vector.manifest.requires?.capabilities, { 'hypaware.embedder': '^1.0.0' }) +}) + +test('both plugins are bundled but excluded from default activation', () => { + assert.ok(V1_EXCLUDED_FROM_DEFAULT.has('@hypaware/embedder-openai')) + assert.ok(V1_EXCLUDED_FROM_DEFAULT.has('@hypaware/vector-search')) + assert.ok(!V1_BUNDLED_PLUGIN_ALLOWLIST.has('@hypaware/embedder-openai')) + assert.ok(!V1_BUNDLED_PLUGIN_ALLOWLIST.has('@hypaware/vector-search')) +}) + +test('parseVectorSearchArgv parses flags in any order and joins the query', () => { + const parsed = parseVectorSearchArgv(['how', 'do', 'I', '--top-k', '5', '--no-refresh', '--format', 'json']) + assert.equal(parsed.ok, true) + if (!parsed.ok) return + assert.equal(parsed.query, 'how do I') + assert.equal(parsed.topK, 5) + assert.equal(parsed.refresh, 'never') + assert.equal(parsed.format, 'json') +}) + +test('parseVectorSearchArgv rejects a missing query and bad flags', () => { + assert.equal(parseVectorSearchArgv([]).ok, false) + assert.equal(parseVectorSearchArgv(['q', '--top-k', '0']).ok, false) + assert.equal(parseVectorSearchArgv(['q', '--format', 'yaml']).ok, false) + assert.equal(parseVectorSearchArgv(['q', '--index']).ok, false) +}) diff --git a/test/plugins/vector-search-refresh.test.js b/test/plugins/vector-search-refresh.test.js new file mode 100644 index 0000000..d712cc3 --- /dev/null +++ b/test/plugins/vector-search-refresh.test.js @@ -0,0 +1,261 @@ +// @ts-check + +import test from 'node:test' +import assert from 'node:assert/strict' +import fs from 'node:fs' +import os from 'node:os' +import path from 'node:path' + +import { + collectShardTexts, + refreshIndexes, +} from '../../hypaware-core/plugins-workspace/vector-search/src/refresh.js' +import { shardFileBase, shardPaths } from '../../hypaware-core/plugins-workspace/vector-search/src/shards.js' + +/** + * @import { CachePartitionMeta, EmbedderCapability } from '../../collectivus-plugin-kernel-types.d.ts' + * @import { ExtendedQueryStorageService } from '../../src/core/cache/types.d.ts' + * @import { ShardMeta, VectorIndexDeclaration } from '../../hypaware-core/plugins-workspace/vector-search/src/types.d.ts' + */ + +/** @type {VectorIndexDeclaration} */ +const DECL = { dataset: 'd', column: 'text', name: 'd.text' } + +const noopLog = { debug() {}, info() {}, warn() {}, error() {} } + +/** + * @param {Record} partition + * @param {Array>} rows + * @returns {CachePartitionMeta & { rows: Array> }} + */ +function part(partition, rows) { + return { + dataset: DECL.dataset, + partition, + path: `/cache/${shardFileBase(partition)}`, + epoch: 0, + rowCount: rows.length, + rows, + } +} + +/** + * Storage stub: partitions carry their rows inline; `readRows` streams + * the rows of whichever partition owns the requested path. + * + * @param {Array> }>} partitions + * @returns {ExtendedQueryStorageService} + */ +function stubStorage(partitions) { + return /** @type {ExtendedQueryStorageService} */ (/** @type {unknown} */ ({ + async discoverCachePartitions() { + return partitions + }, + async *readRows(/** @type {string} */ partitionPath) { + const owner = partitions.find((p) => p.path === partitionPath) + for (const row of owner?.rows ?? []) yield row + }, + })) +} + +/** + * @param {{ dimensions?: number }} [opts] + * @returns {EmbedderCapability & { calls: string[][] }} + */ +function stubEmbedder(opts = {}) { + /** @type {string[][]} */ + const calls = [] + return { + provider: 'stub', + model: 'm1', + ...(opts.dimensions !== undefined ? { dimensions: opts.dimensions } : {}), + calls, + async embed(texts) { + calls.push(texts) + return { + vectors: texts.map((t) => Float32Array.from([t.length, 1, 0])), + dimension: 3, + model: 'm1', + } + }, + } +} + +function tmpIndexesDir() { + return fs.mkdtempSync(path.join(os.tmpdir(), 'vector-refresh-test-')) +} + +/** + * @param {string} indexesDir + * @param {Record} partition + * @param {Partial} [overrides] + */ +function writeShard(indexesDir, partition, overrides = {}) { + const fileBase = shardFileBase(partition) + const { file, meta } = shardPaths(indexesDir, DECL.name, fileBase) + fs.mkdirSync(path.dirname(file), { recursive: true }) + fs.writeFileSync(file, 'not-a-real-parquet') + /** @type {ShardMeta} */ + const shardMeta = { + schema_version: 1, + index: DECL.name, + dataset: DECL.dataset, + column: DECL.column, + partition, + model: 'm1', + dimension: 3, + row_count: 2, + source_row_count: 2, + built_at: '2026-06-12T00:00:00.000Z', + ...overrides, + } + fs.writeFileSync(meta, JSON.stringify(shardMeta)) + return { file, meta, fileBase } +} + +test('refreshIndexes: an already-expired deadline skips every pending shard and reports exhaustion', async () => { + const indexesDir = tmpIndexesDir() + const partitions = [ + part({ source: 'alpha' }, [{ text: 'aaa' }]), + part({ source: 'beta' }, [{ text: 'bbb' }]), + ] + const embedder = stubEmbedder() + const report = await refreshIndexes({ + decls: [DECL], + embedder, + storage: stubStorage(partitions), + indexesDir, + log: noopLog, + budget: { deadlineMs: Date.now() - 1 }, + }) + assert.equal(report.shardsBuilt, 0) + assert.equal(report.shardsSkipped, 2) + assert.equal(report.rowsEmbedded, 0) + assert.equal(report.budgetExhausted, true) + assert.equal(embedder.calls.length, 0, 'an exhausted budget must not spend embedding calls') +}) + +test('refreshIndexes: the row budget stops further builds after the shard that crosses it', async () => { + const indexesDir = tmpIndexesDir() + const partitions = [ + part({ source: 'alpha' }, [{ text: 'one' }, { text: 'two' }, { text: 'three' }]), + part({ source: 'beta' }, [{ text: 'four' }]), + ] + const embedder = stubEmbedder() + /** @type {Array<{ index: string, fileBase: string, state: string, rowsEmbedded: number }>} */ + const shardEvents = [] + const report = await refreshIndexes({ + decls: [DECL], + embedder, + storage: stubStorage(partitions), + indexesDir, + log: noopLog, + budget: { maxRows: 2 }, + onShard: (info) => shardEvents.push(info), + }) + // Budgets are soft: the first shard starts under budget and finishes + // (3 rows), the second never starts. + assert.equal(report.shardsBuilt, 1) + assert.equal(report.shardsSkipped, 1) + assert.equal(report.rowsEmbedded, 3) + assert.equal(report.budgetExhausted, true) + assert.deepEqual(shardEvents.map((e) => e.fileBase), [shardFileBase({ source: 'alpha' })]) + const { file, meta } = shardPaths(indexesDir, DECL.name, shardFileBase({ source: 'alpha' })) + assert.ok(fs.existsSync(file), 'built shard parquet exists') + assert.ok(fs.existsSync(meta), 'built shard sidecar exists') +}) + +test('refreshIndexes: orphans sweep even when the budget is already spent', async () => { + const indexesDir = tmpIndexesDir() + const live = part({ source: 'alpha' }, [{ text: 'aaa' }]) + const orphan = writeShard(indexesDir, { source: 'gone' }) + const report = await refreshIndexes({ + decls: [DECL], + embedder: stubEmbedder(), + storage: stubStorage([live]), + indexesDir, + log: noopLog, + budget: { deadlineMs: Date.now() - 1 }, + }) + assert.equal(report.orphansSwept, 1) + assert.equal(report.budgetExhausted, true) + assert.ok(!fs.existsSync(orphan.file), 'orphan parquet deleted') + assert.ok(!fs.existsSync(orphan.meta), 'orphan sidecar deleted') +}) + +test('refreshIndexes: a fresh shard is not rebuilt, and the report says so', async () => { + const indexesDir = tmpIndexesDir() + const partition = { source: 'alpha' } + const rows = [{ text: 'aa' }, { text: 'bb' }] + writeShard(indexesDir, partition, { source_row_count: rows.length }) + const embedder = stubEmbedder() + const report = await refreshIndexes({ + decls: [DECL], + embedder, + storage: stubStorage([part(partition, rows)]), + indexesDir, + log: noopLog, + }) + assert.equal(report.shardsBuilt, 0) + assert.equal(report.shardsSkipped, 0) + assert.equal(report.budgetExhausted, false) + assert.equal(embedder.calls.length, 0) +}) + +test('refreshIndexes: a configured embedder dimension that differs from the sidecar forces a rebuild', async () => { + const indexesDir = tmpIndexesDir() + const partition = { source: 'alpha' } + const rows = [{ text: 'aa' }, { text: 'bb' }] + // Sidecar matches on model and rows but was built at dimension 3; + // the embedder is now configured for 256. + writeShard(indexesDir, partition, { source_row_count: rows.length, dimension: 3 }) + const embedder = stubEmbedder({ dimensions: 256 }) + const report = await refreshIndexes({ + decls: [DECL], + embedder, + storage: stubStorage([part(partition, rows)]), + indexesDir, + log: noopLog, + }) + assert.equal(report.shardsBuilt, 1) + assert.equal(embedder.calls.length, 1) +}) + +test('collectShardTexts: identical texts collapse to one embedding by content hash', async () => { + const partition = part({ source: 'alpha' }, [ + { text: 'repeated' }, + { text: 'repeated' }, + { text: 'unique' }, + { text: '' }, + { text: null }, + { other: 'no text column' }, + ]) + const texts = await collectShardTexts({ + decl: DECL, + partition, + storage: stubStorage([partition]), + }) + assert.equal(texts.size, 2) + assert.deepEqual(Array.from(texts.values()).sort(), ['repeated', 'unique']) +}) + +test('collectShardTexts: id_column keys rows by id and skips rows without one', async () => { + /** @type {VectorIndexDeclaration} */ + const decl = { ...DECL, id_column: 'mid' } + const partition = part({ source: 'alpha' }, [ + { text: 'same text', mid: 'a' }, + { text: 'same text', mid: 'b' }, + { text: 'first wins', mid: 'a' }, + { text: 'no id', mid: '' }, + ]) + const texts = await collectShardTexts({ + decl, + partition, + storage: stubStorage([partition]), + }) + // Distinct ids keep their own entries even for identical text; a + // duplicate id keeps the first row; an empty id is skipped. + assert.equal(texts.size, 2) + assert.equal(texts.get('a'), 'same text') + assert.equal(texts.get('b'), 'same text') +}) diff --git a/test/plugins/vector-search-search.test.js b/test/plugins/vector-search-search.test.js new file mode 100644 index 0000000..9fe969c --- /dev/null +++ b/test/plugins/vector-search-search.test.js @@ -0,0 +1,116 @@ +// @ts-check + +import test from 'node:test' +import assert from 'node:assert/strict' +import fs from 'node:fs' +import os from 'node:os' +import path from 'node:path' + +import { searchVectorIndexes } from '../../hypaware-core/plugins-workspace/vector-search/src/search.js' +import { shardFileBase, shardPaths } from '../../hypaware-core/plugins-workspace/vector-search/src/shards.js' + +/** + * @import { CachePartitionMeta, EmbedderCapability } from '../../collectivus-plugin-kernel-types.d.ts' + * @import { ExtendedQueryStorageService } from '../../src/core/cache/types.d.ts' + * @import { ShardMeta, VectorIndexDeclaration, VectorSearchRuntime } from '../../hypaware-core/plugins-workspace/vector-search/src/types.d.ts' + */ + +/** @type {VectorIndexDeclaration} */ +const DECL = { dataset: 'd', column: 'text', name: 'd.text' } +const PARTITION = { source: 'alpha' } + +const noopLog = { debug() {}, info() {}, warn() {}, error() {} } + +/** + * The `--no-refresh` mismatch guarantees ("a mismatch is a hard error, + * never a silent degraded search") never reach hypvector: the error + * throws on sidecar metadata alone, so the shard parquet can be a + * placeholder. + * + * @param {Partial} [metaOverrides] + * @returns {VectorSearchRuntime} + */ +function makeRuntime(metaOverrides = {}) { + const indexesDir = fs.mkdtempSync(path.join(os.tmpdir(), 'vector-search-test-')) + const fileBase = shardFileBase(PARTITION) + const { file, meta } = shardPaths(indexesDir, DECL.name, fileBase) + fs.mkdirSync(path.dirname(file), { recursive: true }) + fs.writeFileSync(file, 'placeholder') + /** @type {ShardMeta} */ + const shardMeta = { + schema_version: 1, + index: DECL.name, + dataset: DECL.dataset, + column: DECL.column, + partition: PARTITION, + model: 'm1', + dimension: 3, + row_count: 2, + source_row_count: 2, + built_at: '2026-06-12T00:00:00.000Z', + ...metaOverrides, + } + fs.writeFileSync(meta, JSON.stringify(shardMeta)) + + /** @type {CachePartitionMeta} */ + const partitionMeta = { dataset: DECL.dataset, partition: PARTITION, path: '/cache/alpha', epoch: 0, rowCount: 2 } + /** @type {EmbedderCapability} */ + const embedder = { + provider: 'stub', + model: 'm1', + async embed(texts) { + return { + vectors: texts.map((t) => Float32Array.from([t.length, 1, 0])), + dimension: 3, + model: 'm1', + } + }, + } + return { + ctx: /** @type {any} */ ({}), + config: { indexes: [DECL], refresh: { enabled: true, interval_minutes: 240, max_tick_ms: 30_000, max_rows_per_tick: 5_000 } }, + embedder, + storage: /** @type {ExtendedQueryStorageService} */ (/** @type {unknown} */ ({ + async discoverCachePartitions() { return [partitionMeta] }, + async *readRows() {}, + })), + log: noopLog, + indexesDir, + } +} + +test('searchVectorIndexes: --no-refresh with a shard built by another model is vector_model_mismatch', async () => { + const runtime = makeRuntime({ model: 'old-model' }) + await assert.rejects( + () => searchVectorIndexes({ runtime, opts: { query: 'q', refresh: 'never' } }), + (/** @type {Error & { hypErrorKind?: string }} */ err) => { + assert.equal(err.hypErrorKind, 'vector_model_mismatch') + assert.match(err.message, /'old-model'/) + assert.match(err.message, /'m1'/) + assert.match(err.message, /rerun without --no-refresh/) + return true + } + ) +}) + +test('searchVectorIndexes: --no-refresh with a shard at another dimension is vector_dimension_mismatch', async () => { + const runtime = makeRuntime({ dimension: 1536 }) + await assert.rejects( + () => searchVectorIndexes({ runtime, opts: { query: 'q', refresh: 'never' } }), + (/** @type {Error & { hypErrorKind?: string }} */ err) => { + assert.equal(err.hypErrorKind, 'vector_dimension_mismatch') + assert.match(err.message, /dimension 1536/) + assert.match(err.message, /embedded to 3/) + assert.match(err.message, /rerun without --no-refresh/) + return true + } + ) +}) + +test('searchVectorIndexes: no configured index matching the filter is vector_no_indexes', async () => { + const runtime = makeRuntime() + await assert.rejects( + () => searchVectorIndexes({ runtime, opts: { query: 'q', index: 'nope', refresh: 'never' } }), + (/** @type {Error & { hypErrorKind?: string }} */ err) => err.hypErrorKind === 'vector_no_indexes' + ) +}) diff --git a/test/plugins/vector-search-shards.test.js b/test/plugins/vector-search-shards.test.js new file mode 100644 index 0000000..60e6202 --- /dev/null +++ b/test/plugins/vector-search-shards.test.js @@ -0,0 +1,256 @@ +// @ts-check + +import test from 'node:test' +import assert from 'node:assert/strict' + +import { + computeShardStates, + contentId, + mergeTopK, + partitionLabel, + shardFileBase, +} from '../../hypaware-core/plugins-workspace/vector-search/src/shards.js' + +/** + * @import { CachePartitionMeta } from '../../collectivus-plugin-kernel-types.d.ts' + * @import { ShardMeta, VectorIndexDeclaration } from '../../hypaware-core/plugins-workspace/vector-search/src/types.d.ts' + */ + +/** @type {VectorIndexDeclaration} */ +const DECL = { dataset: 'd', column: 'c', name: 'd.c' } + +/** + * @param {Record} partition + * @param {number} rowCount + * @returns {CachePartitionMeta} + */ +function part(partition, rowCount) { + return { dataset: 'd', partition, path: '/cache/d/x', epoch: 0, rowCount } +} + +/** + * @param {Record} partition + * @param {{ model?: string, sourceRows?: number, rows?: number, dimension?: number, index?: string, dataset?: string, column?: string, idColumn?: string }} [opts] + * @returns {ShardMeta} + */ +function meta(partition, opts = {}) { + return { + schema_version: 1, + index: opts.index ?? 'd.c', + dataset: opts.dataset ?? 'd', + column: opts.column ?? 'c', + ...(opts.idColumn !== undefined ? { id_column: opts.idColumn } : {}), + partition, + model: opts.model ?? 'm1', + dimension: opts.dimension ?? 3, + row_count: opts.rows ?? 5, + source_row_count: opts.sourceRows ?? 10, + built_at: '2026-06-12T00:00:00.000Z', + } +} + +/** + * @param {Record} partition + * @param {ShardMeta} m + * @returns {Map} + */ +function metasFor(partition, m) { + return new Map([[shardFileBase(partition), m]]) +} + +test('shardFileBase renders a sorted human label plus a partition hash', () => { + assert.equal(shardFileBase({}), 'all') + assert.match(shardFileBase({ source: 'claude' }), /^source=claude-[0-9a-f]{8}$/) + // Sorted by key, independent of insertion order. + assert.equal(shardFileBase({ b: '2', a: '1' }), shardFileBase({ a: '1', b: '2' })) + assert.match(shardFileBase({ b: '2', a: '1' }), /^a=1,b=2-[0-9a-f]{8}$/) + // Unsafe characters are sanitized in the label. + assert.match(shardFileBase({ source: 'a b/c' }), /^source=a_b_c-[0-9a-f]{8}$/) +}) + +test('shardFileBase: partitions whose labels collide get distinct names', () => { + // Sanitization is lossy: both labels render `source=a_b`. + assert.notEqual(shardFileBase({ source: 'a/b' }), shardFileBase({ source: 'a_b' })) + // A value containing `,`/`=` can mimic another partition's entry list. + assert.notEqual(shardFileBase({ a: 'b,c=d' }), shardFileBase({ a: 'b', c: 'd' })) + // Long values truncate in the label but stay distinct via the hash. + const long1 = shardFileBase({ k: 'x'.repeat(200) + '1' }) + const long2 = shardFileBase({ k: 'x'.repeat(200) + '2' }) + assert.notEqual(long1, long2) + assert.ok(long1.length < 120, `file base stays bounded (got ${long1.length})`) +}) + +test('partitionLabel renders unhashed display labels', () => { + assert.equal(partitionLabel({}), 'all') + assert.equal(partitionLabel({ source: 'claude' }), 'source=claude') + assert.equal(partitionLabel({ b: '2', a: '1' }), 'a=1,b=2') +}) + +test('computeShardStates: live partition without a shard is missing', () => { + const states = computeShardStates({ + partitions: [part({ source: 'claude' }, 10)], + metas: new Map(), + decl: DECL, + model: 'm1', + }) + assert.equal(states.length, 1) + assert.equal(states[0].state, 'missing') + assert.equal(states[0].fileBase, shardFileBase({ source: 'claude' })) +}) + +test('computeShardStates: matching declaration, model, and row count is fresh', () => { + const p = { source: 'claude' } + const states = computeShardStates({ + partitions: [part(p, 10)], + metas: metasFor(p, meta(p, { sourceRows: 10 })), + decl: DECL, + model: 'm1', + }) + assert.equal(states[0].state, 'fresh') +}) + +test('computeShardStates: model mismatch is stale_model, not an error', () => { + const p = { source: 'claude' } + const states = computeShardStates({ + partitions: [part(p, 10)], + metas: metasFor(p, meta(p, { model: 'old-model', sourceRows: 10 })), + decl: DECL, + model: 'm1', + }) + assert.equal(states[0].state, 'stale_model') +}) + +test('computeShardStates: row count drift is stale_rows', () => { + const p = { source: 'claude' } + const states = computeShardStates({ + partitions: [part(p, 12)], + metas: metasFor(p, meta(p, { sourceRows: 10 })), + decl: DECL, + model: 'm1', + }) + assert.equal(states[0].state, 'stale_rows') +}) + +test('computeShardStates: model mismatch wins over row drift (one rebuild fixes both)', () => { + const p = { source: 'claude' } + const states = computeShardStates({ + partitions: [part(p, 12)], + metas: metasFor(p, meta(p, { model: 'old', sourceRows: 10 })), + decl: DECL, + model: 'm1', + }) + assert.equal(states[0].state, 'stale_model') +}) + +test('computeShardStates: dataset/column/id_column drift is stale_config even when model and rows match', () => { + const p = { source: 'claude' } + for (const overrides of [{ dataset: 'other' }, { column: 'other' }, { idColumn: 'msg_id' }]) { + const states = computeShardStates({ + partitions: [part(p, 10)], + metas: metasFor(p, meta(p, { sourceRows: 10, ...overrides })), + decl: DECL, + model: 'm1', + }) + assert.equal(states[0].state, 'stale_config', JSON.stringify(overrides)) + } + // And the symmetric case: the declaration gained an id_column. + const states = computeShardStates({ + partitions: [part(p, 10)], + metas: metasFor(p, meta(p, { sourceRows: 10 })), + decl: { ...DECL, id_column: 'msg_id' }, + model: 'm1', + }) + assert.equal(states[0].state, 'stale_config') +}) + +test('computeShardStates: sidecar whose recorded partition disagrees is stale_config', () => { + const p = { source: 'claude' } + const states = computeShardStates({ + partitions: [part(p, 10)], + metas: metasFor(p, meta({ source: 'codex' }, { sourceRows: 10 })), + decl: DECL, + model: 'm1', + }) + assert.equal(states[0].state, 'stale_config') +}) + +test('computeShardStates: dimension drift is stale_dimension when an expected dimension is known', () => { + const p = { source: 'claude' } + const states = computeShardStates({ + partitions: [part(p, 10)], + metas: metasFor(p, meta(p, { sourceRows: 10, dimension: 1536 })), + decl: DECL, + model: 'm1', + dimension: 256, + }) + assert.equal(states[0].state, 'stale_dimension') +}) + +test('computeShardStates: dimension is ignored when unknown, matching, or the shard is empty', () => { + const p = { source: 'claude' } + const fresh = computeShardStates({ + partitions: [part(p, 10)], + metas: metasFor(p, meta(p, { sourceRows: 10, dimension: 1536 })), + decl: DECL, + model: 'm1', + }) + assert.equal(fresh[0].state, 'fresh') + const matching = computeShardStates({ + partitions: [part(p, 10)], + metas: metasFor(p, meta(p, { sourceRows: 10, dimension: 1536 })), + decl: DECL, + model: 'm1', + dimension: 1536, + }) + assert.equal(matching[0].state, 'fresh') + // An empty shard recorded dimension 0; that is not drift. + const empty = computeShardStates({ + partitions: [part(p, 10)], + metas: metasFor(p, meta(p, { sourceRows: 10, rows: 0, dimension: 0 })), + decl: DECL, + model: 'm1', + dimension: 1536, + }) + assert.equal(empty[0].state, 'fresh') +}) + +test('computeShardStates: shard for an evicted partition is orphan', () => { + const live = { source: 'claude' } + const evicted = { source: 'codex' } + const states = computeShardStates({ + partitions: [part(live, 10)], + metas: new Map([ + [shardFileBase(live), meta(live, { sourceRows: 10 })], + [shardFileBase(evicted), meta(evicted)], + ]), + decl: DECL, + model: 'm1', + }) + const orphan = states.find((s) => s.fileBase === shardFileBase(evicted)) + assert.equal(orphan?.state, 'orphan') + const fresh = states.find((s) => s.fileBase === shardFileBase(live)) + assert.equal(fresh?.state, 'fresh') +}) + +test('mergeTopK merges descending by score across shards', () => { + const merged = mergeTopK( + [ + [{ id: 'a', score: 0.9 }, { id: 'b', score: 0.2 }], + [{ id: 'c', score: 0.95 }, { id: 'd', score: 0.5 }], + [], + ], + 3 + ) + assert.deepEqual(merged.map((h) => h.id), ['c', 'a', 'd']) +}) + +test('mergeTopK with fewer hits than topK returns everything', () => { + const merged = mergeTopK([[{ id: 'a', score: 0.1 }]], 10) + assert.equal(merged.length, 1) +}) + +test('contentId is stable and collision-distinct for different texts', () => { + assert.equal(contentId('hello'), contentId('hello')) + assert.notEqual(contentId('hello'), contentId('hello!')) + assert.match(contentId('hello'), /^[0-9a-f]{32}$/) +})