diff --git a/ingestion/setup.py b/ingestion/setup.py index 78f4adfd84e0..82c713c2fe9a 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -328,7 +328,11 @@ "cassandra": {VERSIONS["cassandra"]}, "couchbase": {"couchbase~=4.1"}, "mssql": { - "sqlalchemy-pytds~=0.3", + # 1.0+ moved internal `tds.skipall` calls to `tds_base.skipall`, matching + # the python-tds 1.x layout. 0.3.x raises AttributeError on every + # server-side cursor fetch (TABNAME / COLINFO tokens) when paired with + # python-tds 1.x. + "sqlalchemy-pytds~=1.0", DATA_DIFF["mssql"], }, "mssql-odbc": { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/socket/WebSocketManager.java b/openmetadata-service/src/main/java/org/openmetadata/service/socket/WebSocketManager.java index bbc2281a3ded..57ffaaa46a62 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/socket/WebSocketManager.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/socket/WebSocketManager.java @@ -39,6 +39,7 @@ public class WebSocketManager { public static final String MOVE_GLOSSARY_TERM_CHANNEL = "moveGlossaryTermChannel"; public static final String RDF_INDEX_JOB_BROADCAST_CHANNEL = "rdfIndexJobStatus"; public static final String CHART_DATA_STREAM_CHANNEL = "chartDataStream"; + public static final String QUERY_RUNNER_CHANNEL = "queryRunnerChannel"; @Getter private final Map> activityFeedEndpoints = diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/util/QueryRunnerMessage.java b/openmetadata-service/src/main/java/org/openmetadata/service/util/QueryRunnerMessage.java new file mode 100644 index 000000000000..56446514a646 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/util/QueryRunnerMessage.java @@ -0,0 +1,33 @@ +package org.openmetadata.service.util; + +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +@NoArgsConstructor +public class QueryRunnerMessage { + @Getter @Setter private String jobId; + @Getter @Setter private String status; + @Getter @Setter private String workflowId; + @Getter @Setter private String error; + @Getter @Setter private String message; + @Getter @Setter private Double duration; + @Getter @Setter private String executedQuery; + + public QueryRunnerMessage( + String jobId, + String status, + String workflowId, + String error, + String message, + Double duration, + String executedQuery) { + this.jobId = jobId; + this.status = status; + this.workflowId = workflowId; + this.error = error; + this.message = message; + this.duration = duration; + this.executedQuery = executedQuery; + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/util/WebsocketNotificationHandler.java b/openmetadata-service/src/main/java/org/openmetadata/service/util/WebsocketNotificationHandler.java index 2705fd8a0489..0a765365bc58 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/util/WebsocketNotificationHandler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/util/WebsocketNotificationHandler.java @@ -346,6 +346,45 @@ public static void sendCsvImportFailedNotification( } } + public static void sendQueryRunnerCompleteNotification( + String jobId, UUID userId, String workflowId, Double duration, String executedQuery) { + QueryRunnerMessage message = + new QueryRunnerMessage(jobId, "COMPLETED", workflowId, null, null, duration, executedQuery); + String jsonMessage = JsonUtils.pojoToJson(message); + if (userId != null) { + WebSocketManager.getInstance() + .sendToOne(userId, WebSocketManager.QUERY_RUNNER_CHANNEL, jsonMessage); + } + } + + public static void sendQueryRunnerFailedNotification( + String jobId, UUID userId, String errorMessage) { + QueryRunnerMessage message = + new QueryRunnerMessage(jobId, "FAILED", null, errorMessage, null, null, null); + String jsonMessage = JsonUtils.pojoToJson(message); + if (userId != null) { + WebSocketManager.getInstance() + .sendToOne(userId, WebSocketManager.QUERY_RUNNER_CHANNEL, jsonMessage); + } + } + + /** + * Intermediate progress message for a Query Runner job — e.g. "Executing query…", + * "Uploading results…". UI's WebSocket hook reads the {@code message} field and surfaces it + * as {@code executionStatusMessage}. {@code status} stays "RUNNING" so the UI doesn't treat + * this as a terminal event. + */ + public static void sendQueryRunnerProgressNotification( + String jobId, UUID userId, String workflowId, String message) { + QueryRunnerMessage msg = + new QueryRunnerMessage(jobId, "RUNNING", workflowId, null, message, null, null); + String jsonMessage = JsonUtils.pojoToJson(msg); + if (userId != null) { + WebSocketManager.getInstance() + .sendToOne(userId, WebSocketManager.QUERY_RUNNER_CHANNEL, jsonMessage); + } + } + public static void sendDeleteOperationCompleteNotification( String jobId, SecurityContext securityContext, EntityInterface entity) { DeleteEntityMessage message = diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/automations/queryRunnerRequest.json b/openmetadata-spec/src/main/resources/json/schema/entity/automations/queryRunnerRequest.json index c1ca182b4086..a104330907da 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/automations/queryRunnerRequest.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/automations/queryRunnerRequest.json @@ -66,6 +66,41 @@ "type": "string", "enum": ["user", "team"], "default": "user" + }, + "storageConfig": { + "description": "RUNTIME FIELD - Storage configuration injected by backend for result upload.", + "type": "object", + "properties": { + "bucketName": { + "description": "S3 or GCS bucket name.", + "type": "string" + }, + "prefix": { + "description": "Key prefix within the bucket.", + "type": "string" + }, + "storageConfig": { + "oneOf": [ + { + "title": "AWS S3 Storage Config", + "$ref": "../../security/credentials/awsCredentials.json" + }, + { + "title": "GCP Storage Config", + "$ref": "../../security/credentials/gcpCredentials.json" + }, + { + "title": "No Credentials", + "type": "object", + "additionalProperties": false + } + ] + } + } + }, + "resultPath": { + "description": "RUNTIME FIELD - Full S3/GCS key path where the worker should upload CSV results. Generated by the backend before Argo submission.", + "type": "string" } }, "additionalProperties": false diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/automations/response/queryRunnerResponse.json b/openmetadata-spec/src/main/resources/json/schema/entity/automations/response/queryRunnerResponse.json index 5d797fa37b42..78bd5476480f 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/automations/response/queryRunnerResponse.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/automations/response/queryRunnerResponse.json @@ -41,6 +41,10 @@ "executedQuery": { "description": "The actual query that was executed (may be transpiled or modified from the original)", "type": "string" + }, + "resultPath": { + "description": "S3 or GCS key path where the query results CSV is stored. Present when storage mode is enabled; mutually exclusive with 'results'.", + "type": "string" } }, "additionalProperties": false diff --git a/openmetadata-ui/src/main/resources/ui/src/constants/constants.ts b/openmetadata-ui/src/main/resources/ui/src/constants/constants.ts index 2b759dee46cf..fd5c7cfacebb 100644 --- a/openmetadata-ui/src/main/resources/ui/src/constants/constants.ts +++ b/openmetadata-ui/src/main/resources/ui/src/constants/constants.ts @@ -351,6 +351,7 @@ export const SOCKET_EVENTS = { DELETE_ENTITY_CHANNEL: 'deleteEntityChannel', MOVE_GLOSSARY_TERM_CHANNEL: 'moveGlossaryTermChannel', CHART_DATA_STREAM: 'chartDataStream', + QUERY_RUNNER_CHANNEL: 'queryRunnerChannel', }; export const CACHE_WARMUP_APPLICATION_NAME = 'CacheWarmupApplication'; diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/entity/automations/queryRunnerRequest.ts b/openmetadata-ui/src/main/resources/ui/src/generated/entity/automations/queryRunnerRequest.ts index e808a6d39765..846721c18981 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/entity/automations/queryRunnerRequest.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/entity/automations/queryRunnerRequest.ts @@ -51,6 +51,11 @@ export interface QueryRunnerRequest { * Query to be executed. */ query?: string; + /** + * RUNTIME FIELD - Full S3/GCS key path where the worker should upload CSV results. + * Generated by the backend before Argo submission. + */ + resultPath?: string; /** * Optional role to use for query execution (selected by user in QueryRunner Studio). * Service-specific (e.g., Snowflake role). @@ -60,6 +65,10 @@ export interface QueryRunnerRequest { * Optional value that identifies this service name. */ serviceName?: string; + /** + * RUNTIME FIELD - Storage configuration injected by backend for result upload. + */ + storageConfig?: StorageConfig; /** * Optional value to indicate if the query should be transpiled. */ @@ -87,3 +96,184 @@ export enum CredentialSourceType { Team = "team", User = "user", } + +/** + * RUNTIME FIELD - Storage configuration injected by backend for result upload. + */ +export interface StorageConfig { + /** + * S3 or GCS bucket name. + */ + bucketName?: string; + /** + * Key prefix within the bucket. + */ + prefix?: string; + storageConfig?: Credentials; + [property: string]: any; +} + +/** + * AWS credentials configs. + * + * GCP credentials configs. + */ +export interface Credentials { + /** + * The Amazon Resource Name (ARN) of the role to assume. Required Field in case of Assume + * Role + */ + assumeRoleArn?: string; + /** + * An identifier for the assumed role session. Use the role session name to uniquely + * identify a session when the same role is assumed by different principals or for different + * reasons. Required Field in case of Assume Role + */ + assumeRoleSessionName?: string; + /** + * The Amazon Resource Name (ARN) of the role to assume. Optional Field in case of Assume + * Role + */ + assumeRoleSourceIdentity?: string; + /** + * AWS Access key ID. + */ + awsAccessKeyId?: string; + /** + * AWS Region + */ + awsRegion?: string; + /** + * AWS Secret Access Key. + */ + awsSecretAccessKey?: string; + /** + * AWS Session Token. + */ + awsSessionToken?: string; + /** + * Enable AWS IAM authentication. When enabled, uses the default credential provider chain + * (environment variables, instance profile, etc.). Defaults to false for backward + * compatibility. + */ + enabled?: boolean; + /** + * EndPoint URL for the AWS + */ + endPointURL?: string; + /** + * The name of a profile to use with the boto session. + */ + profileName?: string; + /** + * We support two ways of authenticating to GCP i.e via GCP Credentials Values or GCP + * Credentials Path + */ + gcpConfig?: GCPCredentialsConfiguration; + /** + * we enable the authenticated service account to impersonate another service account + */ + gcpImpersonateServiceAccount?: GCPImpersonateServiceAccountValues; +} + +/** + * We support two ways of authenticating to GCP i.e via GCP Credentials Values or GCP + * Credentials Path + * + * Pass the raw credential values provided by GCP + * + * Pass the path of file containing the GCP credentials info + * + * Use the application default credentials + */ +export interface GCPCredentialsConfiguration { + /** + * Google Cloud auth provider certificate. + */ + authProviderX509CertUrl?: string; + /** + * Google Cloud auth uri. + */ + authUri?: string; + /** + * Google Cloud email. + */ + clientEmail?: string; + /** + * Google Cloud Client ID. + */ + clientId?: string; + /** + * Google Cloud client certificate uri. + */ + clientX509CertUrl?: string; + /** + * Google Cloud private key. + */ + privateKey?: string; + /** + * Google Cloud private key id. + */ + privateKeyId?: string; + /** + * Project ID + * + * GCP Project ID to parse metadata from + */ + projectId?: string[] | string; + /** + * Google Cloud token uri. + */ + tokenUri?: string; + /** + * Google Cloud Platform account type. + * + * Google Cloud Platform ADC ( Application Default Credentials ) + */ + type?: string; + /** + * Path of the file containing the GCP credentials info + */ + path?: string; + /** + * Google Security Token Service audience which contains the resource name for the workload + * identity pool and the provider identifier in that pool. + */ + audience?: string; + /** + * This object defines the mechanism used to retrieve the external credential from the local + * environment so that it can be exchanged for a GCP access token via the STS endpoint + */ + credentialSource?: { [key: string]: string }; + /** + * Google Cloud Platform account type. + */ + externalType?: string; + /** + * Google Security Token Service subject token type based on the OAuth 2.0 token exchange + * spec. + */ + subjectTokenType?: string; + /** + * Google Security Token Service token exchange endpoint. + */ + tokenURL?: string; + [property: string]: any; +} + +/** + * we enable the authenticated service account to impersonate another service account + * + * Pass the values to impersonate a service account of Google Cloud + */ +export interface GCPImpersonateServiceAccountValues { + /** + * The impersonated service account email + */ + impersonateServiceAccount?: string; + /** + * Number of seconds the delegated credential should be valid + */ + lifetime?: number; + [property: string]: any; +} diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/entity/automations/response/queryRunnerResponse.ts b/openmetadata-ui/src/main/resources/ui/src/generated/entity/automations/response/queryRunnerResponse.ts index 4e26b0394317..c4d0e8d079f5 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/entity/automations/response/queryRunnerResponse.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/entity/automations/response/queryRunnerResponse.ts @@ -30,6 +30,11 @@ export interface QueryRunnerResponse { * Error message in case of failure */ message?: string; + /** + * S3 or GCS key path where the query results CSV is stored. Present when storage mode is + * enabled; mutually exclusive with 'results'. + */ + resultPath?: string; /** * Results of the query execution */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/entity/automations/workflow.ts b/openmetadata-ui/src/main/resources/ui/src/generated/entity/automations/workflow.ts index 67875b8e8ee1..b945a6c984fd 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/entity/automations/workflow.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/entity/automations/workflow.ts @@ -674,11 +674,20 @@ export interface TestServiceConnectionRequest { * Query to be executed. */ query?: string; + /** + * RUNTIME FIELD - Full S3/GCS key path where the worker should upload CSV results. + * Generated by the backend before Argo submission. + */ + resultPath?: string; /** * Optional role to use for query execution (selected by user in QueryRunner Studio). * Service-specific (e.g., Snowflake role). */ role?: string; + /** + * RUNTIME FIELD - Storage configuration injected by backend for result upload. + */ + storageConfig?: StorageConfig; /** * Optional value to indicate if the query should be transpiled. */ @@ -5802,6 +5811,101 @@ export enum SparkEngineType { Spark = "Spark", } +/** + * RUNTIME FIELD - Storage configuration injected by backend for result upload. + */ +export interface StorageConfig { + /** + * S3 or GCS bucket name. + */ + bucketName?: string; + /** + * Key prefix within the bucket. + */ + prefix?: string; + storageConfig?: StorageConfigClass; + [property: string]: any; +} + +/** + * AWS credentials required to access the S3 file. + * + * AWS credentials configs. + * + * AWS credentials for generating MWAA CLI token. + * + * AWS credentials configuration. + * + * GCP Credentials + * + * GCP credentials configs. + * + * GCP credentials to use. If not provided, Application Default Credentials will be used. + * + * GCP credentials configuration for authenticating with Pub/Sub. + * + * GCP credentials configuration. + * + * GCP Credentials for Google Drive API + */ +export interface StorageConfigClass { + /** + * The Amazon Resource Name (ARN) of the role to assume. Required Field in case of Assume + * Role + */ + assumeRoleArn?: string; + /** + * An identifier for the assumed role session. Use the role session name to uniquely + * identify a session when the same role is assumed by different principals or for different + * reasons. Required Field in case of Assume Role + */ + assumeRoleSessionName?: string; + /** + * The Amazon Resource Name (ARN) of the role to assume. Optional Field in case of Assume + * Role + */ + assumeRoleSourceIdentity?: string; + /** + * AWS Access key ID. + */ + awsAccessKeyId?: string; + /** + * AWS Region + */ + awsRegion?: string; + /** + * AWS Secret Access Key. + */ + awsSecretAccessKey?: string; + /** + * AWS Session Token. + */ + awsSessionToken?: string; + /** + * Enable AWS IAM authentication. When enabled, uses the default credential provider chain + * (environment variables, instance profile, etc.). Defaults to false for backward + * compatibility. + */ + enabled?: boolean; + /** + * EndPoint URL for the AWS + */ + endPointURL?: string; + /** + * The name of a profile to use with the boto session. + */ + profileName?: string; + /** + * We support two ways of authenticating to GCP i.e via GCP Credentials Values or GCP + * Credentials Path + */ + gcpConfig?: GCPCredentialsConfiguration; + /** + * we enable the authenticated service account to impersonate another service account + */ + gcpImpersonateServiceAccount?: GCPImpersonateServiceAccountValues; +} + /** * Pipeline type * @@ -5867,6 +5971,11 @@ export interface TestConnectionResult { * The actual query that was executed (may be transpiled or modified from the original) */ executedQuery?: string; + /** + * S3 or GCS key path where the query results CSV is stored. Present when storage mode is + * enabled; mutually exclusive with 'results'. + */ + resultPath?: string; } export interface ReverseIngestionOperationResult {