Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
d59a4ba
feat(query-runner): add storage config + websocket notifications
Khairajani Apr 20, 2026
c8de578
Update generated TypeScript types
github-actions[bot] Apr 20, 2026
5ea663b
Merge branch 'main' into query_runner_socket_flow
Khairajani Apr 20, 2026
5f10710
formatting
Khairajani Apr 20, 2026
9c28ba5
Merge branch 'main' into query_runner_socket_flow
Khairajani Apr 21, 2026
237af9f
Merge branch 'main' into query_runner_socket_flow
Khairajani Apr 21, 2026
af4956e
Merge branch 'main' into query_runner_socket_flow
Khairajani Apr 22, 2026
f1281d8
intermediate messgage
Khairajani Apr 23, 2026
9b70241
Merge branch 'main' into query_runner_socket_flow
Khairajani Apr 23, 2026
75a5d22
Merge remote-tracking branch 'origin/main' into query_runner_socket_flow
Khairajani Apr 28, 2026
a07abdb
Merge branch 'main' into query_runner_socket_flow
Khairajani Apr 30, 2026
7669c58
Merge remote-tracking branch 'origin/main' into query_runner_socket_flow
Khairajani Apr 30, 2026
8c6044f
Merge remote-tracking branch 'origin/query_runner_socket_flow' into q…
Khairajani Apr 30, 2026
1573c5c
Merge branch 'main' into query_runner_socket_flow
Khairajani Apr 30, 2026
3a42785
Merge branch 'main' into query_runner_socket_flow
Khairajani May 5, 2026
7829276
Merge branch 'main' into query_runner_socket_flow
Khairajani May 5, 2026
eb7e635
Merge branch 'main' into query_runner_socket_flow
Khairajani May 5, 2026
a9bea1c
Merge branch 'main' into query_runner_socket_flow
Khairajani May 6, 2026
891f034
Merge remote-tracking branch 'origin/main' into query_runner_socket_flow
Khairajani May 7, 2026
6914dbd
Merge branch 'main' into query_runner_socket_flow
Khairajani May 7, 2026
eb03190
chore(deps): bump sqlalchemy-pytds to ~=1.0
Khairajani May 9, 2026
6a52449
Merge branch 'main' into query_runner_socket_flow
Khairajani May 9, 2026
921d7ee
Merge branch 'main' into query_runner_socket_flow
Khairajani May 11, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,11 @@
"cassandra": {VERSIONS["cassandra"]},
"couchbase": {"couchbase~=4.1"},
"mssql": {
"sqlalchemy-pytds~=0.3",
# 1.0+ moved internal `tds.skipall` calls to `tds_base.skipall`, matching
# the python-tds 1.x layout. 0.3.x raises AttributeError on every
# server-side cursor fetch (TABNAME / COLINFO tokens) when paired with
# python-tds 1.x.
"sqlalchemy-pytds~=1.0",
DATA_DIFF["mssql"],
},
"mssql-odbc": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class WebSocketManager {
public static final String MOVE_GLOSSARY_TERM_CHANNEL = "moveGlossaryTermChannel";
public static final String RDF_INDEX_JOB_BROADCAST_CHANNEL = "rdfIndexJobStatus";
public static final String CHART_DATA_STREAM_CHANNEL = "chartDataStream";
public static final String QUERY_RUNNER_CHANNEL = "queryRunnerChannel";

@Getter
private final Map<UUID, Map<String, SocketIoSocket>> activityFeedEndpoints =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package org.openmetadata.service.util;

import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

@NoArgsConstructor
public class QueryRunnerMessage {
@Getter @Setter private String jobId;
@Getter @Setter private String status;
@Getter @Setter private String workflowId;
@Getter @Setter private String error;
@Getter @Setter private String message;
@Getter @Setter private Double duration;
@Getter @Setter private String executedQuery;

public QueryRunnerMessage(
String jobId,
String status,
String workflowId,
String error,
String message,
Double duration,
String executedQuery) {
this.jobId = jobId;
this.status = status;
this.workflowId = workflowId;
this.error = error;
this.message = message;
this.duration = duration;
this.executedQuery = executedQuery;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,45 @@ public static void sendCsvImportFailedNotification(
}
}

public static void sendQueryRunnerCompleteNotification(
String jobId, UUID userId, String workflowId, Double duration, String executedQuery) {
QueryRunnerMessage message =
new QueryRunnerMessage(jobId, "COMPLETED", workflowId, null, null, duration, executedQuery);
String jsonMessage = JsonUtils.pojoToJson(message);
if (userId != null) {
WebSocketManager.getInstance()
.sendToOne(userId, WebSocketManager.QUERY_RUNNER_CHANNEL, jsonMessage);
}
}

public static void sendQueryRunnerFailedNotification(
String jobId, UUID userId, String errorMessage) {
QueryRunnerMessage message =
new QueryRunnerMessage(jobId, "FAILED", null, errorMessage, null, null, null);
String jsonMessage = JsonUtils.pojoToJson(message);
if (userId != null) {
WebSocketManager.getInstance()
.sendToOne(userId, WebSocketManager.QUERY_RUNNER_CHANNEL, jsonMessage);
}
}

/**
* Intermediate progress message for a Query Runner job — e.g. "Executing query…",
* "Uploading results…". UI's WebSocket hook reads the {@code message} field and surfaces it
* as {@code executionStatusMessage}. {@code status} stays "RUNNING" so the UI doesn't treat
* this as a terminal event.
*/
public static void sendQueryRunnerProgressNotification(
String jobId, UUID userId, String workflowId, String message) {
QueryRunnerMessage msg =
new QueryRunnerMessage(jobId, "RUNNING", workflowId, null, message, null, null);
String jsonMessage = JsonUtils.pojoToJson(msg);
if (userId != null) {
WebSocketManager.getInstance()
.sendToOne(userId, WebSocketManager.QUERY_RUNNER_CHANNEL, jsonMessage);
}
}

public static void sendDeleteOperationCompleteNotification(
String jobId, SecurityContext securityContext, EntityInterface entity) {
DeleteEntityMessage message =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,41 @@
"type": "string",
"enum": ["user", "team"],
"default": "user"
},
"storageConfig": {
"description": "RUNTIME FIELD - Storage configuration injected by backend for result upload.",
"type": "object",
"properties": {
"bucketName": {
"description": "S3 or GCS bucket name.",
"type": "string"
},
"prefix": {
"description": "Key prefix within the bucket.",
"type": "string"
},
"storageConfig": {
"oneOf": [
{
"title": "AWS S3 Storage Config",
"$ref": "../../security/credentials/awsCredentials.json"
},
{
"title": "GCP Storage Config",
"$ref": "../../security/credentials/gcpCredentials.json"
},
{
"title": "No Credentials",
"type": "object",
"additionalProperties": false
}
]
}
}
},
"resultPath": {
"description": "RUNTIME FIELD - Full S3/GCS key path where the worker should upload CSV results. Generated by the backend before Argo submission.",
"type": "string"
}
},
"additionalProperties": false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@
"executedQuery": {
"description": "The actual query that was executed (may be transpiled or modified from the original)",
"type": "string"
},
"resultPath": {
"description": "S3 or GCS key path where the query results CSV is stored. Present when storage mode is enabled; mutually exclusive with 'results'.",
"type": "string"
}
},
"additionalProperties": false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ export const SOCKET_EVENTS = {
DELETE_ENTITY_CHANNEL: 'deleteEntityChannel',
MOVE_GLOSSARY_TERM_CHANNEL: 'moveGlossaryTermChannel',
CHART_DATA_STREAM: 'chartDataStream',
QUERY_RUNNER_CHANNEL: 'queryRunnerChannel',
};

export const CACHE_WARMUP_APPLICATION_NAME = 'CacheWarmupApplication';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ export interface QueryRunnerRequest {
* Query to be executed.
*/
query?: string;
/**
* RUNTIME FIELD - Full S3/GCS key path where the worker should upload CSV results.
* Generated by the backend before Argo submission.
*/
resultPath?: string;
/**
* Optional role to use for query execution (selected by user in QueryRunner Studio).
* Service-specific (e.g., Snowflake role).
Expand All @@ -60,6 +65,10 @@ export interface QueryRunnerRequest {
* Optional value that identifies this service name.
*/
serviceName?: string;
/**
* RUNTIME FIELD - Storage configuration injected by backend for result upload.
*/
storageConfig?: StorageConfig;
/**
* Optional value to indicate if the query should be transpiled.
*/
Expand Down Expand Up @@ -87,3 +96,184 @@ export enum CredentialSourceType {
Team = "team",
User = "user",
}

/**
* RUNTIME FIELD - Storage configuration injected by backend for result upload.
*/
export interface StorageConfig {
/**
* S3 or GCS bucket name.
*/
bucketName?: string;
/**
* Key prefix within the bucket.
*/
prefix?: string;
storageConfig?: Credentials;
[property: string]: any;
}

/**
* AWS credentials configs.
*
* GCP credentials configs.
*/
export interface Credentials {
/**
* The Amazon Resource Name (ARN) of the role to assume. Required Field in case of Assume
* Role
*/
assumeRoleArn?: string;
/**
* An identifier for the assumed role session. Use the role session name to uniquely
* identify a session when the same role is assumed by different principals or for different
* reasons. Required Field in case of Assume Role
*/
assumeRoleSessionName?: string;
/**
* The Amazon Resource Name (ARN) of the role to assume. Optional Field in case of Assume
* Role
*/
assumeRoleSourceIdentity?: string;
/**
* AWS Access key ID.
*/
awsAccessKeyId?: string;
/**
* AWS Region
*/
awsRegion?: string;
/**
* AWS Secret Access Key.
*/
awsSecretAccessKey?: string;
/**
* AWS Session Token.
*/
awsSessionToken?: string;
/**
* Enable AWS IAM authentication. When enabled, uses the default credential provider chain
* (environment variables, instance profile, etc.). Defaults to false for backward
* compatibility.
*/
enabled?: boolean;
/**
* EndPoint URL for the AWS
*/
endPointURL?: string;
/**
* The name of a profile to use with the boto session.
*/
profileName?: string;
/**
* We support two ways of authenticating to GCP i.e via GCP Credentials Values or GCP
* Credentials Path
*/
gcpConfig?: GCPCredentialsConfiguration;
/**
* we enable the authenticated service account to impersonate another service account
*/
gcpImpersonateServiceAccount?: GCPImpersonateServiceAccountValues;
}

/**
* We support two ways of authenticating to GCP i.e via GCP Credentials Values or GCP
* Credentials Path
*
* Pass the raw credential values provided by GCP
*
* Pass the path of file containing the GCP credentials info
*
* Use the application default credentials
*/
export interface GCPCredentialsConfiguration {
/**
* Google Cloud auth provider certificate.
*/
authProviderX509CertUrl?: string;
/**
* Google Cloud auth uri.
*/
authUri?: string;
/**
* Google Cloud email.
*/
clientEmail?: string;
/**
* Google Cloud Client ID.
*/
clientId?: string;
/**
* Google Cloud client certificate uri.
*/
clientX509CertUrl?: string;
/**
* Google Cloud private key.
*/
privateKey?: string;
/**
* Google Cloud private key id.
*/
privateKeyId?: string;
/**
* Project ID
*
* GCP Project ID to parse metadata from
*/
projectId?: string[] | string;
/**
* Google Cloud token uri.
*/
tokenUri?: string;
/**
* Google Cloud Platform account type.
*
* Google Cloud Platform ADC ( Application Default Credentials )
*/
type?: string;
/**
* Path of the file containing the GCP credentials info
*/
path?: string;
/**
* Google Security Token Service audience which contains the resource name for the workload
* identity pool and the provider identifier in that pool.
*/
audience?: string;
/**
* This object defines the mechanism used to retrieve the external credential from the local
* environment so that it can be exchanged for a GCP access token via the STS endpoint
*/
credentialSource?: { [key: string]: string };
/**
* Google Cloud Platform account type.
*/
externalType?: string;
/**
* Google Security Token Service subject token type based on the OAuth 2.0 token exchange
* spec.
*/
subjectTokenType?: string;
/**
* Google Security Token Service token exchange endpoint.
*/
tokenURL?: string;
[property: string]: any;
}

/**
* we enable the authenticated service account to impersonate another service account
*
* Pass the values to impersonate a service account of Google Cloud
*/
export interface GCPImpersonateServiceAccountValues {
/**
* The impersonated service account email
*/
impersonateServiceAccount?: string;
/**
* Number of seconds the delegated credential should be valid
*/
lifetime?: number;
[property: string]: any;
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ export interface QueryRunnerResponse {
* Error message in case of failure
*/
message?: string;
/**
* S3 or GCS key path where the query results CSV is stored. Present when storage mode is
* enabled; mutually exclusive with 'results'.
*/
resultPath?: string;
/**
* Results of the query execution
*/
Expand Down
Loading
Loading