-
Notifications
You must be signed in to change notification settings - Fork 11
Maggie li yd/2305 natural language to cohort poc fe #2640
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from 3 commits
6a2e4f9
2d35dc6
b25e238
8a10efb
70a42bb
01d92d1
92381d7
8cdbc5c
ef28374
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,41 @@ | ||
| import { getModels } from "../utils/utils"; | ||
| import { createStaticMcpTools } from "../mcp/staticTools"; | ||
| import { createAgent } from "langchain"; | ||
| import { SystemMessage, HumanMessage } from "@langchain/core/messages"; | ||
| import { Session } from "./session-store"; | ||
| import { getSystemPrompt } from "./prompts"; | ||
| import { env } from "../env"; | ||
|
|
||
| export class AgentService { | ||
| async getStream(session: Session, userInput: string, token?: string) { | ||
| if (!env.AI_MODEL) | ||
| throw new Error("AI_MODEL environment variable is not set."); | ||
| const model = await getModels(env.AI_MODEL); | ||
|
|
||
| // Restrict to cohort-related tools for v1 | ||
| const tools = createStaticMcpTools(token, session.datasetId); | ||
| const cohortTools = tools.filter( | ||
| (t) => | ||
| t.name.includes("cohort") || | ||
| t.name.includes("phenotype") || | ||
| t.name.includes("validate") || | ||
| t.name.includes("before_cohort") || | ||
| t.name.includes("fetch_templates"), | ||
| ); | ||
|
|
||
| const agent = createAgent({ | ||
| model: model, | ||
| tools: cohortTools, | ||
| }); | ||
|
|
||
| const messages = [ | ||
| new SystemMessage( | ||
| getSystemPrompt(session.datasetId, session.initialContext), | ||
| ), | ||
| ...session.history, | ||
| new HumanMessage(userInput), | ||
| ]; | ||
|
|
||
| return await agent.stream({ messages }, { streamMode: "messages" }); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,54 @@ | ||
| export const getSystemPrompt = (datasetId: string, context?: string) => ` | ||
| You are the Data2Evidence (D2E) AI Assistant, an expert in clinical data analytics and OHDSI standards. | ||
| Current Dataset: ${datasetId} | ||
| ${context ? `UI Context: ${context}` : ''} | ||
|
|
||
| CORE OBJECTIVE: | ||
| Your primary goal is to help users analyze data and manage cohorts within the D2E platform. | ||
|
|
||
| CREATING D2E COHORTS (DEEP LINKS): | ||
| When a user wants to create or view a cohort in the "Patient Analytics" (D2E) view, you must construct a D2E Deep Link JSON. | ||
| IMPORTANT: This is DIFFERENT from an ATLAS cohort definition. | ||
|
|
||
| The D2E Deep Link JSON follows this "Basic Data" format: | ||
| { | ||
| "filter": { | ||
| "configMetadata": { "id": "OMOP_GDM_PA_CONF", "version": "1" }, | ||
| "cards": { | ||
| "type": "BooleanContainer", | ||
| "op": "OR", | ||
| "content": [ | ||
| { | ||
| "type": "FilterCard", | ||
| "configPath": "patient", | ||
| "instanceNumber": 0, | ||
| "instanceID": "patient", | ||
| "name": "Basic Data", | ||
| "inactive": false, | ||
| "attributes": { | ||
| "type": "BooleanContainer", | ||
| "op": "AND", | ||
| "content": [ | ||
| /* Attribute nodes for Age, Gender, etc. */ | ||
| ] | ||
| } | ||
| } | ||
| ] | ||
| } | ||
| } | ||
| } | ||
|
|
||
| CONSTRAINTS FOR D2E COHORTS: | ||
| - Use configPath "patient.attributes.Age" for age (operators: >, <, >=, <=, =). | ||
| - Use configPath "patient.attributes.Gender_concept_name" for gender (value e.g. "FEMALE"). | ||
| - For clinical conditions, find the Concept Set ID using your tools first. | ||
|
|
||
| WORKFLOW: | ||
| 1. Use clinical tools (search_phenotype_library, validate_atlas_cohort_definition) to clarify clinical logic. | ||
| 2. If the user wants to see the cohort in the UI, provide the Deep Link URL. | ||
| 3. Always reflect your tool usage to the user. | ||
|
|
||
| URL FORMAT: | ||
| Return the deep link in this format: | ||
| [View Cohort](<base_url>/portal/researcher/cohort?datasetId=${datasetId}&linkType=cohort-definition&query=<base64_payload>) | ||
| `; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,63 @@ | ||
| import { Router, Request, Response } from "express"; | ||
| import { SessionService } from "./session-store"; | ||
| import { AgentService } from "./agent"; | ||
| import { StreamPump } from "./stream-pump"; | ||
|
|
||
| export class AIRouter { | ||
| public router = Router(); | ||
| private sessions = new SessionService(); | ||
| private agent = new AgentService(); | ||
|
|
||
| constructor() { | ||
| this.registerRoutes(); | ||
| } | ||
|
|
||
| private registerRoutes() { | ||
| // 1. Create session | ||
| this.router.post("/sessions", (req: Request, res: Response) => { | ||
| const { datasetId, context } = req.body; | ||
| const session = this.sessions.createSession(datasetId, context); | ||
| res.status(201).json(session); | ||
| }); | ||
|
|
||
| // 2. Stream message | ||
| this.router.post( | ||
| "/sessions/:sessionId/messages", | ||
| async (req: Request, res: Response) => { | ||
| const { sessionId } = req.params; | ||
| const { userInput } = req.body; | ||
| const token = req.headers.authorization; | ||
|
|
||
| const session = this.sessions.getSession(sessionId); | ||
| if (!session) { | ||
| return res.status(404).json({ error: "Session not found" }); | ||
| } | ||
|
|
||
| // SSE headers | ||
| res.setHeader("Content-Type", "text/event-stream"); | ||
| res.setHeader("Cache-Control", "no-cache"); | ||
| res.setHeader("Connection", "keep-alive"); | ||
|
|
||
| try { | ||
| const stream = await this.agent.getStream(session, userInput, token); | ||
| const pump = new StreamPump(res); | ||
| await pump.pump(stream); | ||
| this.sessions.updateActivity(sessionId); | ||
| } catch (error: any) { | ||
| console.error("Stream error:", error); | ||
| res.write( | ||
| `event: error\ndata: ${JSON.stringify({ message: error.message })}\n\n` | ||
| ); | ||
| res.end(); | ||
| } | ||
| } | ||
| ); | ||
|
|
||
| // 3. Get session / history | ||
| this.router.get("/sessions/:sessionId", (req: Request, res: Response) => { | ||
| const session = this.sessions.getSession(req.params.sessionId); | ||
| if (!session) return res.status(404).end(); | ||
| res.json(session); | ||
| }); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,141 @@ | ||
| import { | ||
| BaseMessage, | ||
| StoredMessage, | ||
| mapChatMessagesToStoredMessages, | ||
| mapStoredMessagesToChatMessages, | ||
| } from "@langchain/core/messages"; | ||
| import { v4 as uuidv4 } from "uuid"; | ||
| import fs from "fs"; | ||
| import path from "path"; | ||
|
|
||
| // The trex runtime boots a fresh Deno worker per request, so an in-memory Map | ||
| // cannot persist sessions between calls. Sessions are written as JSON files | ||
| // under SESSIONS_DIR so every worker boot sees the same state. | ||
| const SESSIONS_DIR = "/tmp/ai-sessions"; | ||
| const TTL_MS = 60 * 60 * 1000; // 1 hour | ||
|
|
||
| export interface Session { | ||
| sessionId: string; | ||
| datasetId: string; | ||
| initialContext?: string; | ||
| history: BaseMessage[]; | ||
| createdAt: number; | ||
| lastActiveAt: number; | ||
| } | ||
|
|
||
| interface StoredSession { | ||
| sessionId: string; | ||
| datasetId: string; | ||
| initialContext?: string; | ||
| history: StoredMessage[]; | ||
| createdAt: number; | ||
| lastActiveAt: number; | ||
| } | ||
|
|
||
| export class SessionService { | ||
| constructor() { | ||
| try { | ||
| fs.mkdirSync(SESSIONS_DIR, { recursive: true }); | ||
| } catch (err) { | ||
| console.error(`Failed to create sessions dir ${SESSIONS_DIR}:`, err); | ||
| } | ||
| } | ||
|
|
||
| createSession(datasetId: string, context?: string): Session { | ||
| const sessionId = uuidv4(); | ||
| const session: Session = { | ||
| sessionId, | ||
| datasetId, | ||
| initialContext: context, | ||
| history: [], | ||
| createdAt: Date.now(), | ||
| lastActiveAt: Date.now(), | ||
| }; | ||
| this.write(session); | ||
| return session; | ||
| } | ||
|
|
||
| getSession(sessionId: string): Session | undefined { | ||
| const session = this.read(sessionId); | ||
| if (!session) return undefined; | ||
| // Lazy TTL eviction — there is no long-lived worker to run a setInterval | ||
| if (Date.now() - session.lastActiveAt > TTL_MS) { | ||
| this.deleteFile(sessionId); | ||
| return undefined; | ||
| } | ||
| session.lastActiveAt = Date.now(); | ||
| this.write(session); | ||
| return session; | ||
| } | ||
|
|
||
| updateActivity(sessionId: string) { | ||
| const session = this.read(sessionId); | ||
| if (!session) return; | ||
| session.lastActiveAt = Date.now(); | ||
| this.write(session); | ||
| } | ||
|
|
||
| private filePath(sessionId: string): string { | ||
| // sessionId comes from uuidv4 but be defensive against path traversal | ||
| if (!/^[a-zA-Z0-9-]+$/.test(sessionId)) { | ||
| throw new Error(`Invalid sessionId: ${sessionId}`); | ||
| } | ||
| return path.join(SESSIONS_DIR, `${sessionId}.json`); | ||
| } | ||
|
|
||
| private read(sessionId: string): Session | undefined { | ||
| let raw: string; | ||
| try { | ||
| raw = fs.readFileSync(this.filePath(sessionId), "utf8"); | ||
Check failureCode scanning / CodeQL Uncontrolled data used in path expression High
This path depends on a
user-provided value Error loading related location Loading This path depends on a user-provided value Error loading related location Loading |
||
| } catch (err: any) { | ||
| if (err.code !== "ENOENT") { | ||
| console.error(`Failed to read session ${sessionId}:`, err); | ||
Check failureCode scanning / CodeQL Use of externally-controlled format string High
Format string depends on a
user-provided value Error loading related location Loading Format string depends on a user-provided value Error loading related location Loading |
||
|
github-advanced-security[bot] marked this conversation as resolved.
Fixed
|
||
| } | ||
| return undefined; | ||
| } | ||
| try { | ||
| const stored = JSON.parse(raw) as StoredSession; | ||
| return { | ||
| sessionId: stored.sessionId, | ||
| datasetId: stored.datasetId, | ||
| initialContext: stored.initialContext, | ||
| history: mapStoredMessagesToChatMessages(stored.history ?? []), | ||
| createdAt: stored.createdAt, | ||
| lastActiveAt: stored.lastActiveAt, | ||
| }; | ||
| } catch (err) { | ||
| console.error(`Corrupt session file for ${sessionId}:`, err); | ||
Check failureCode scanning / CodeQL Use of externally-controlled format string High
Format string depends on a
user-provided value Error loading related location Loading Format string depends on a user-provided value Error loading related location Loading |
||
|
github-advanced-security[bot] marked this conversation as resolved.
Fixed
|
||
| return undefined; | ||
| } | ||
| } | ||
|
|
||
| private write(session: Session) { | ||
| const stored: StoredSession = { | ||
| sessionId: session.sessionId, | ||
| datasetId: session.datasetId, | ||
| initialContext: session.initialContext, | ||
| history: mapChatMessagesToStoredMessages(session.history), | ||
| createdAt: session.createdAt, | ||
| lastActiveAt: session.lastActiveAt, | ||
| }; | ||
| // Direct write: the trex sandbox blocklists Deno.renameSync, so the | ||
| // temp-file + rename atomicity pattern is not available. The reader | ||
| // catches JSON.parse errors and treats them as a missing session, so a | ||
| // concurrent partial read just behaves like a cache miss. | ||
| fs.writeFileSync( | ||
| this.filePath(session.sessionId), | ||
| JSON.stringify(stored), | ||
| "utf8", | ||
| ); | ||
| } | ||
|
|
||
| private deleteFile(sessionId: string) { | ||
| try { | ||
| fs.unlinkSync(this.filePath(sessionId)); | ||
Check failureCode scanning / CodeQL Uncontrolled data used in path expression High
This path depends on a
user-provided value Error loading related location Loading This path depends on a user-provided value Error loading related location Loading |
||
|
github-advanced-security[bot] marked this conversation as resolved.
Fixed
|
||
| } catch (err: any) { | ||
| if (err.code !== "ENOENT") { | ||
| console.error(`Failed to delete session ${sessionId}:`, err); | ||
Check failureCode scanning / CodeQL Use of externally-controlled format string High
Format string depends on a
user-provided value Error loading related location Loading Format string depends on a user-provided value Error loading related location Loading |
||
|
github-advanced-security[bot] marked this conversation as resolved.
Fixed
|
||
| } | ||
| } | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.