-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Fixes #27443: add OCI Autonomous Database support for Oracle connector #27508
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 2 commits
08980a0
e170023
623f99e
2c6c311
4f23116
9deb3fd
d426b5d
998d4fd
9c0a3f0
ba98a45
43e1b80
e991b85
6453951
d76b997
e06b07d
f0c823e
12b4169
e4e67c5
7cac5f3
6e3c514
a1ad81f
3e97044
d292bca
ecbfcc8
ff74d1a
a3a4bea
62594a4
cd93ec2
ba9d067
73d5ea6
dfa1f97
a50af0a
d2a58c4
ee91fb9
6ecf6c3
cc88740
4ecc9a4
90f9879
1d1f94e
473c89d
4aa5095
5993897
cc9b365
8eedea1
2249cd7
8067108
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,10 +12,16 @@ | |
| """ | ||
| Source connection handler | ||
| """ | ||
| import base64 | ||
| import io | ||
| import os | ||
| import shutil | ||
| import sys | ||
| import tempfile | ||
| import weakref | ||
| import zipfile | ||
| from copy import deepcopy | ||
| from typing import Optional | ||
| from typing import Any, Optional | ||
| from urllib.parse import quote_plus | ||
|
|
||
| import oracledb | ||
|
|
@@ -30,6 +36,7 @@ | |
| OracleConnection as OracleConnectionConfig, | ||
| ) | ||
| from metadata.generated.schema.entity.services.connections.database.oracleConnection import ( | ||
| OracleAutonomousConnection, | ||
| OracleDatabaseSchema, | ||
| OracleServiceName, | ||
| OracleTNSConnection, | ||
|
|
@@ -41,6 +48,7 @@ | |
| create_generic_db_connection, | ||
| get_connection_args_common, | ||
| get_connection_options_dict, | ||
| init_empty_connection_arguments, | ||
| ) | ||
| from metadata.ingestion.connections.connection import BaseConnection | ||
| from metadata.ingestion.connections.secrets import connection_with_options_secrets | ||
|
|
@@ -67,22 +75,153 @@ | |
| class OracleConnection(BaseConnection[OracleConnectionConfig, Engine]): | ||
| def __init__(self, connection: OracleConnectionConfig): | ||
| super().__init__(connection) | ||
| self._wallet_temp_dir: Optional[str] = None | ||
| self._wallet_cleanup_finalizer: Optional[weakref.finalize] = None | ||
|
|
||
| def _set_wallet_temp_dir(self, wallet_temp_dir: str) -> None: | ||
| self._cleanup_wallet_temp_dir() | ||
| self._wallet_temp_dir = wallet_temp_dir | ||
| self._wallet_cleanup_finalizer = weakref.finalize( | ||
| self, | ||
| shutil.rmtree, | ||
| wallet_temp_dir, | ||
| ignore_errors=True, | ||
| ) | ||
|
|
||
| def _cleanup_wallet_temp_dir(self) -> None: | ||
| wallet_temp_dir = self._wallet_temp_dir | ||
| if self._wallet_cleanup_finalizer and self._wallet_cleanup_finalizer.alive: | ||
| self._wallet_cleanup_finalizer() | ||
| elif wallet_temp_dir: | ||
| shutil.rmtree(wallet_temp_dir, ignore_errors=True) | ||
|
|
||
| self._wallet_cleanup_finalizer = None | ||
| self._wallet_temp_dir = None | ||
|
|
||
| def _is_autonomous_connection(self) -> bool: | ||
| return isinstance( | ||
| self.service_connection.oracleConnectionType, OracleAutonomousConnection | ||
| ) | ||
|
|
||
| @staticmethod | ||
| def _get_autonomous_connection_config( | ||
| connection_type: OracleAutonomousConnection, | ||
| ) -> Any: | ||
| return connection_type.root | ||
|
|
||
| @staticmethod | ||
| def _safe_extract_wallet_archive(zip_ref: zipfile.ZipFile, target_dir: str) -> None: | ||
| target_dir_real = os.path.realpath(target_dir) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how would this work in Docker/K8s deployed pod? How are asking users to pass this zipfile?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. great catch @harshach, there are actually two paths: walletPath : for on-prem/bare-metal where the wallet is already extracted on the ingestion host. Not usable in Docker/K8s without volume mounts. walletContent : for Docker/K8s and fully UI-driven setup. Users download the wallet zip from the Oracle Cloud Console, base64-encode it locally (base64 -w 0 Wallet_mydb.zip), and paste the result into the walletContent password field. OpenMetadata stores it securely, decodes it at runtime, extracts to a temp dir inside the pod, and cleans up afterward. i also noticed i missed running yarn parse-schema to regenerate the pre-built UI schema OracleAutonomousConnection is currently absent from the form. Will push that fix now. |
||
| safe_prefix = f"{target_dir_real}{os.sep}" | ||
|
|
||
| for member in zip_ref.infolist(): | ||
| member_path = os.path.realpath( | ||
| os.path.join(target_dir_real, member.filename) | ||
| ) | ||
|
|
||
| if ( | ||
| not member_path.startswith(safe_prefix) | ||
| and member_path != target_dir_real | ||
| ): | ||
| raise ValueError( | ||
| "Invalid walletContent. Wallet zip contains unsafe file paths." | ||
| ) | ||
|
|
||
| if member.is_dir(): | ||
| os.makedirs(member_path, exist_ok=True) | ||
| continue | ||
|
|
||
| os.makedirs(os.path.dirname(member_path), exist_ok=True) | ||
| with zip_ref.open(member, "r") as source_file, open( | ||
| member_path, "wb" | ||
|
hassaansaleem28 marked this conversation as resolved.
Outdated
|
||
| ) as target_file: | ||
| shutil.copyfileobj(source_file, target_file) | ||
|
|
||
| def _extract_wallet_content(self, wallet_content: SecretStr) -> str: | ||
| try: | ||
| decoded_wallet = base64.b64decode(wallet_content.get_secret_value()) | ||
| except (ValueError, TypeError) as exc: | ||
| raise ValueError( | ||
| "Invalid walletContent. Expected a base64-encoded wallet zip." | ||
| ) from exc | ||
|
hassaansaleem28 marked this conversation as resolved.
Outdated
hassaansaleem28 marked this conversation as resolved.
Outdated
|
||
|
|
||
| wallet_temp_dir = tempfile.mkdtemp(prefix="oracle_wallet_") | ||
| self._set_wallet_temp_dir(wallet_temp_dir) | ||
|
|
||
| try: | ||
| with zipfile.ZipFile(io.BytesIO(decoded_wallet)) as zip_ref: | ||
| self._safe_extract_wallet_archive(zip_ref, wallet_temp_dir) | ||
| except (ValueError, zipfile.BadZipFile) as exc: | ||
| self._cleanup_wallet_temp_dir() | ||
| if isinstance(exc, zipfile.BadZipFile): | ||
| raise ValueError( | ||
| "Invalid walletContent. Expected a valid zip archive." | ||
| ) from exc | ||
| raise | ||
|
|
||
| return wallet_temp_dir | ||
|
|
||
| def _configure_autonomous_connection_arguments(self) -> None: | ||
| connection_type = self.service_connection.oracleConnectionType | ||
| if not isinstance(connection_type, OracleAutonomousConnection): | ||
| return | ||
|
|
||
| autonomous_connection = self._get_autonomous_connection_config(connection_type) | ||
| if not self.service_connection.connectionArguments: | ||
| self.service_connection.connectionArguments = ( | ||
| init_empty_connection_arguments() | ||
| ) | ||
| elif self.service_connection.connectionArguments.root is None: | ||
| self.service_connection.connectionArguments.root = {} | ||
|
|
||
| connection_arguments = self.service_connection.connectionArguments.root | ||
|
|
||
| wallet_path = autonomous_connection.walletPath | ||
| if autonomous_connection.walletContent: | ||
| if self._wallet_temp_dir and os.path.isdir(self._wallet_temp_dir): | ||
| wallet_path = self._wallet_temp_dir | ||
| else: | ||
| wallet_path = self._extract_wallet_content( | ||
| autonomous_connection.walletContent | ||
| ) | ||
| else: | ||
| self._cleanup_wallet_temp_dir() | ||
|
|
||
| if not wallet_path: | ||
| raise ValueError( | ||
| "Oracle Autonomous connections require either walletPath or walletContent." | ||
| ) | ||
|
|
||
| connection_arguments["config_dir"] = wallet_path | ||
| connection_arguments["wallet_location"] = wallet_path | ||
|
|
||
| if autonomous_connection.walletPassword: | ||
| connection_arguments[ | ||
| "wallet_password" | ||
| ] = autonomous_connection.walletPassword.get_secret_value() | ||
| else: | ||
| connection_arguments.pop("wallet_password", None) | ||
|
|
||
| def _get_client(self) -> Engine: | ||
| """ | ||
| Create connection | ||
| """ | ||
| try: | ||
| if self.service_connection.instantClientDirectory: | ||
| logger.info( | ||
| f"Initializing Oracle thick client at {self.service_connection.instantClientDirectory}" | ||
| ) | ||
| os.environ[LD_LIB_ENV] = self.service_connection.instantClientDirectory | ||
| oracledb.init_oracle_client( | ||
| lib_dir=self.service_connection.instantClientDirectory | ||
| ) | ||
| except DatabaseError as err: | ||
| logger.info(f"Could not initialize Oracle thick client: {err}") | ||
| self._configure_autonomous_connection_arguments() | ||
|
gitar-bot[bot] marked this conversation as resolved.
|
||
|
|
||
| if not self._is_autonomous_connection(): | ||
| try: | ||
| if self.service_connection.instantClientDirectory: | ||
| logger.info( | ||
| f"Initializing Oracle thick client at {self.service_connection.instantClientDirectory}" | ||
| ) | ||
| os.environ[ | ||
| LD_LIB_ENV | ||
| ] = self.service_connection.instantClientDirectory | ||
| oracledb.init_oracle_client( | ||
| lib_dir=self.service_connection.instantClientDirectory | ||
| ) | ||
| except DatabaseError as err: | ||
| logger.info(f"Could not initialize Oracle thick client: {err}") | ||
|
|
||
|
hassaansaleem28 marked this conversation as resolved.
hassaansaleem28 marked this conversation as resolved.
|
||
| return create_generic_db_connection( | ||
| connection=self.service_connection, | ||
|
|
@@ -150,6 +289,13 @@ def get_connection_dict(self) -> dict: | |
| connection_dict[ | ||
| "host" | ||
| ] = connection_copy.oracleConnectionType.oracleTNSConnection | ||
| elif isinstance( | ||
| connection_copy.oracleConnectionType, OracleAutonomousConnection | ||
| ): | ||
| autonomous_connection = self._get_autonomous_connection_config( | ||
| connection_copy.oracleConnectionType | ||
| ) | ||
| connection_dict["host"] = autonomous_connection.tnsAlias | ||
|
|
||
| # Add connection options if present | ||
| if connection_copy.connectionOptions and connection_copy.connectionOptions.root: | ||
|
|
@@ -209,6 +355,13 @@ def _handle_connection_type(url: str, connection: OracleConnectionConfig) -> str | |
| url += connection.oracleConnectionType.oracleTNSConnection | ||
| return url | ||
|
|
||
| if isinstance(connection.oracleConnectionType, OracleAutonomousConnection): | ||
| autonomous_connection = OracleConnection._get_autonomous_connection_config( | ||
| connection.oracleConnectionType | ||
| ) | ||
| url += autonomous_connection.tnsAlias | ||
| return url | ||
|
|
||
| # If not TNS, we add the hostPort | ||
| url += connection.hostPort | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.