-
Notifications
You must be signed in to change notification settings - Fork 67
feat: add namespace insert write path #587
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,178 @@ | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| name: Spark Namespace Insert Docker | ||
|
|
||
| on: | ||
| pull_request: | ||
| types: | ||
| - opened | ||
| - synchronize | ||
| - ready_for_review | ||
| - reopened | ||
| paths: | ||
| - ".github/workflows/spark-namespace-insert.yml" | ||
| - "Makefile" | ||
| - "docker/**" | ||
| - "integration-tests/**" | ||
| - "lance-spark-base_2.12/src/main/java/org/lance/spark/LanceSparkWriteOptions.java" | ||
| - "lance-spark-base_2.12/src/main/java/org/lance/spark/write/**" | ||
| - "lance-spark-base_2.12/src/test/java/org/lance/spark/LanceSparkWriteOptionsTest.java" | ||
| - "pom.xml" | ||
| - "*/pom.xml" | ||
| workflow_dispatch: | ||
| inputs: | ||
| spark-version: | ||
| description: "Spark version to test" | ||
| required: true | ||
| default: "3.5" | ||
| scala-version: | ||
| description: "Scala version to test" | ||
| required: true | ||
| default: "2.13" | ||
| backends: | ||
| description: "Comma-separated test backends: local or local,rest-dir" | ||
| required: true | ||
| default: "local,rest-dir" | ||
| rest-uri: | ||
| description: "Optional REST namespace URI. If omitted, tests start a local REST directory namespace." | ||
| required: false | ||
| default: "" | ||
| rest-database: | ||
| description: "Optional database header value for an external REST namespace" | ||
| required: false | ||
| default: "" | ||
| docker-run-args: | ||
| description: "Extra docker run args for docker-test" | ||
| required: false | ||
| default: "" | ||
|
|
||
| permissions: | ||
| contents: read | ||
|
|
||
| concurrency: | ||
| group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} | ||
| cancel-in-progress: true | ||
|
|
||
| env: | ||
| SPARK_VERSION: ${{ github.event.inputs['spark-version'] || '3.5' }} | ||
| SCALA_VERSION: ${{ github.event.inputs['scala-version'] || '2.13' }} | ||
| NAMESPACE_INSERT_TEST_BACKENDS: ${{ github.event_name == 'workflow_dispatch' && github.event.inputs.backends || 'local,rest-dir' }} | ||
| NAMESPACE_INSERT_PYTEST_CMD: >- | ||
| pytest /home/lance/tests/test_lance_spark.py::TestDMLNamespaceInsert | ||
| -v --timeout=180 | ||
|
|
||
| jobs: | ||
| namespace-insert-docker-test: | ||
| name: Namespace Insert Docker Test | ||
| runs-on: ubuntu-24.04 | ||
| timeout-minutes: 90 | ||
| steps: | ||
| - name: Checkout | ||
| uses: actions/checkout@v4 | ||
| with: | ||
| ref: ${{ github.event.pull_request.head.sha || github.sha }} | ||
| - name: Set up Java | ||
| uses: actions/setup-java@v4 | ||
| with: | ||
| distribution: temurin | ||
| java-version: 17 | ||
| cache: "maven" | ||
| - name: Resolve Docker build args | ||
| id: docker-args | ||
| run: | | ||
| make print-docker-build-args SPARK_VERSION=${SPARK_VERSION} SCALA_VERSION=${SCALA_VERSION} >> $GITHUB_OUTPUT | ||
| - name: Set up Docker Buildx | ||
| uses: docker/setup-buildx-action@v3 | ||
| - name: Build test-base image (cached) | ||
| uses: docker/build-push-action@v6 | ||
| with: | ||
| context: docker | ||
| file: docker/Dockerfile.test-base | ||
| load: true | ||
| tags: lance-spark-test-base:${{ env.SPARK_VERSION }}_${{ env.SCALA_VERSION }} | ||
| build-args: | | ||
| SPARK_DOWNLOAD_VERSION=${{ steps.docker-args.outputs.spark-download-version }} | ||
| SPARK_MAJOR_VERSION=${{ env.SPARK_VERSION }} | ||
| SCALA_VERSION=${{ env.SCALA_VERSION }} | ||
| PY4J_VERSION=${{ steps.docker-args.outputs.py4j-version }} | ||
| SPARK_SCALA_SUFFIX=${{ steps.docker-args.outputs.spark-scala-suffix }} | ||
| cache-from: type=gha,scope=namespace-insert-test-base-${{ env.SPARK_VERSION }}_${{ env.SCALA_VERSION }} | ||
| cache-to: type=gha,mode=max,scope=namespace-insert-test-base-${{ env.SPARK_VERSION }}_${{ env.SCALA_VERSION }} | ||
| - name: Build bundle | ||
| run: make bundle SPARK_VERSION=${SPARK_VERSION} SCALA_VERSION=${SCALA_VERSION} | ||
| - name: Build test image | ||
| run: | | ||
| make docker-build-test \ | ||
| SPARK_VERSION=${SPARK_VERSION} \ | ||
| SCALA_VERSION=${SCALA_VERSION} \ | ||
| LANCE_NAMESPACE_IMPL_VERSION=${{ steps.docker-args.outputs.lance-namespace-impl-version }} | ||
| - name: Run directory namespace insert tests | ||
| if: ${{ contains(env.NAMESPACE_INSERT_TEST_BACKENDS, 'local') }} | ||
| run: | | ||
| make docker-test \ | ||
| SPARK_VERSION=${SPARK_VERSION} \ | ||
| SCALA_VERSION=${SCALA_VERSION} \ | ||
| TEST_BACKENDS=local \ | ||
| PYTEST_CMD="${NAMESPACE_INSERT_PYTEST_CMD}" | ||
| - name: Resolve REST namespace URI | ||
| id: rest | ||
| if: ${{ contains(env.NAMESPACE_INSERT_TEST_BACKENDS, 'rest-dir') }} | ||
| env: | ||
| INPUT_REST_URI: ${{ github.event.inputs['rest-uri'] }} | ||
| INPUT_DOCKER_RUN_ARGS: ${{ github.event.inputs['docker-run-args'] }} | ||
| run: | | ||
| rest_uri="${INPUT_REST_URI}" | ||
| docker_run_args="${INPUT_DOCKER_RUN_ARGS}" | ||
| start_rest_dir="false" | ||
| rest_dir_root="" | ||
| rest_dir_port="" | ||
|
|
||
| if [ -z "${rest_uri}" ]; then | ||
| rest_dir_port="10024" | ||
| rest_dir_root="/home/lance/rest-data" | ||
| rest_uri="http://127.0.0.1:${rest_dir_port}" | ||
| start_rest_dir="true" | ||
| fi | ||
|
|
||
| echo "uri=${rest_uri}" >> "$GITHUB_OUTPUT" | ||
| echo "start_rest_dir=${start_rest_dir}" >> "$GITHUB_OUTPUT" | ||
| echo "rest_dir_root=${rest_dir_root}" >> "$GITHUB_OUTPUT" | ||
| echo "rest_dir_port=${rest_dir_port}" >> "$GITHUB_OUTPUT" | ||
| { | ||
| echo "docker_run_args<<EOF" | ||
| echo "${docker_run_args}" | ||
| echo "EOF" | ||
| } >> "$GITHUB_OUTPUT" | ||
| - name: Run REST directory namespace insert tests | ||
| if: ${{ contains(env.NAMESPACE_INSERT_TEST_BACKENDS, 'rest-dir') }} | ||
| env: | ||
| LANCE_SPARK_REST_URI: ${{ steps.rest.outputs.uri }} | ||
| LANCE_SPARK_REST_API_KEY: ${{ secrets.LANCE_SPARK_REST_API_KEY }} | ||
| LANCE_SPARK_REST_DATABASE: ${{ github.event.inputs['rest-database'] }} | ||
| LANCE_SPARK_START_REST_DIR: ${{ steps.rest.outputs.start_rest_dir }} | ||
| LANCE_SPARK_REST_DIR_ROOT: ${{ steps.rest.outputs.rest_dir_root }} | ||
| LANCE_SPARK_REST_DIR_PORT: ${{ steps.rest.outputs.rest_dir_port }} | ||
| DOCKER_RUN_ARGS: ${{ steps.rest.outputs.docker_run_args }} | ||
| run: | | ||
| make docker-test \ | ||
| SPARK_VERSION=${SPARK_VERSION} \ | ||
| SCALA_VERSION=${SCALA_VERSION} \ | ||
| TEST_BACKENDS=rest-dir \ | ||
| LANCE_SPARK_REST_URI="${LANCE_SPARK_REST_URI}" \ | ||
| LANCE_SPARK_REST_API_KEY="${LANCE_SPARK_REST_API_KEY}" \ | ||
| LANCE_SPARK_REST_DATABASE="${LANCE_SPARK_REST_DATABASE}" \ | ||
| LANCE_SPARK_START_REST_DIR="${LANCE_SPARK_START_REST_DIR}" \ | ||
| LANCE_SPARK_REST_DIR_ROOT="${LANCE_SPARK_REST_DIR_ROOT}" \ | ||
| LANCE_SPARK_REST_DIR_PORT="${LANCE_SPARK_REST_DIR_PORT}" \ | ||
| DOCKER_RUN_ARGS="${DOCKER_RUN_ARGS}" \ | ||
| PYTEST_CMD="${NAMESPACE_INSERT_PYTEST_CMD}" | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -71,6 +71,23 @@ and namespace-specific options: | |
| | `spark.sql.catalog.{name}.parent` | String | ✗ | Parent prefix for multi-level namespaces. See [Note on Namespace Levels](#note-on-namespace-levels). | | ||
| | `spark.sql.catalog.{name}.parent_delimiter` | String | ✗ | Delimiter for parent prefix (default: `.`). See [Note on Namespace Levels](#note-on-namespace-levels). | | ||
|
|
||
| ## Write Options | ||
|
|
||
| Write options can be set on DataFrame writes. Catalog-level values are also used as defaults when | ||
| they are present in the Spark catalog configuration. | ||
|
|
||
| | Option | Type | Default | Description | | ||
| |--------------------------------|---------|---------|----------------------------------------------------------------------------------------------------------| | ||
| | `use_namespace_insert` | Boolean | `false` | Use the Lance Namespace insert API for eligible append writes to namespace-backed tables. | | ||
| | `namespace_insert_parallelism` | Integer | `0` | Number of writer tasks to request from Spark for namespace insert writes. `0` preserves Spark's plan. For sharded tables Spark uses the sharding distribution; for unsharded tables Spark repartitions by the first output column. | | ||
| | `batch_size` | Integer | `8192` | Maximum rows per Arrow batch/request before flushing. | | ||
| | `max_batch_bytes` | Long | `268435456` | Maximum approximate bytes per Arrow batch/request before flushing. | | ||
|
|
||
| Namespace insert writes are intended for append ingestion through a namespace implementation, | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The docs currently say that rows may be visible after failures, but they do not say that namespace insert is currently an at-least-once path, nor that a Spark job can succeed with duplicate rows after task retry or speculation. Please update both user-facing docs to state that replay de-duplication can only be claimed when the connector verifies a first-class namespace idempotency capability with defined replay semantics. Otherwise users should treat this as at-least-once ingestion. Users who need Spark driver-side atomic commit semantics should use the default writer. |
||
| including REST namespaces. Each insert request is committed by the namespace as it runs, so this mode | ||
| does not provide the same Spark driver-side atomic commit behavior as the default writer. If a Spark | ||
| task or driver fails after some requests complete, those rows may already be visible. | ||
|
|
||
| ## Example Namespace Implementations | ||
|
|
||
| ### Directory Namespace | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,7 +16,16 @@ | |
| import time | ||
| import pytest | ||
| from packaging.version import Version | ||
| from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, BinaryType | ||
| from pyspark.sql.types import ( | ||
| ArrayType, | ||
| BinaryType, | ||
| DoubleType, | ||
| FloatType, | ||
| IntegerType, | ||
| StringType, | ||
| StructField, | ||
| StructType, | ||
| ) | ||
|
|
||
| SPARK_VERSION = Version(os.environ.get("SPARK_VERSION", "3.5")) | ||
|
|
||
|
|
@@ -123,6 +132,12 @@ def _require_sql_search_backend(spark): | |
| pytest.skip("SQL search table functions are covered on local dir and rest-dir backends") | ||
|
|
||
|
|
||
| def _require_namespace_insert_backend(spark): | ||
| backend = getattr(spark, "_lance_backend", None) | ||
| if backend not in ("local", "rest-dir"): | ||
| pytest.skip("Namespace insert writes are covered on local dir and rest-dir backends") | ||
|
|
||
|
|
||
| # ============================================================================= | ||
| # DDL (Data Definition Language) Tests | ||
| # ============================================================================= | ||
|
|
@@ -1989,6 +2004,91 @@ def test_insert_append_data(self, spark): | |
| assert count == 4 | ||
|
|
||
|
|
||
| @pytest.mark.rest_dir_compatible | ||
| class TestDMLNamespaceInsert: | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| """Test append writes through the Lance namespace insert API.""" | ||
|
|
||
| def test_namespace_insert_append_data(self, spark): | ||
| """Test DataFrame append through namespace insert with multiple writer tasks.""" | ||
| _require_namespace_insert_backend(spark) | ||
|
|
||
| spark.sql(""" | ||
| CREATE TABLE default.test_table ( | ||
| id INT, | ||
| name STRING, | ||
| value DOUBLE | ||
| ) | ||
| """) | ||
|
|
||
| df = ( | ||
| spark.range(0, 24) | ||
| .repartition(6) | ||
| .selectExpr( | ||
| "CAST(id AS INT) AS id", | ||
| "concat('name-', id) AS name", | ||
| "CAST(id * 1.5 AS DOUBLE) AS value", | ||
| ) | ||
| ) | ||
|
|
||
| ( | ||
| df.writeTo("default.test_table") | ||
| .option("use_namespace_insert", "true") | ||
| .option("namespace_insert_parallelism", "3") | ||
| .option("batch_size", "5") | ||
| .append() | ||
| ) | ||
|
|
||
| rows = spark.sql(""" | ||
| SELECT COUNT(*) AS count_rows, SUM(id) AS sum_id, SUM(value) AS sum_value | ||
| FROM default.test_table | ||
| """).collect() | ||
|
|
||
| assert rows[0].count_rows == 24 | ||
| assert rows[0].sum_id == sum(range(24)) | ||
| assert rows[0].sum_value == pytest.approx(sum(i * 1.5 for i in range(24))) | ||
|
|
||
| def test_namespace_insert_append_vector_data(self, spark): | ||
| """Test namespace insert preserves fixed-size-list vector writes.""" | ||
| _require_namespace_insert_backend(spark) | ||
|
|
||
| spark.sql(""" | ||
| CREATE TABLE default.test_table ( | ||
| id INT, | ||
| vector ARRAY<FLOAT> | ||
| ) USING lance | ||
| TBLPROPERTIES ('vector.arrow.fixed-size-list.size' = '4') | ||
| """) | ||
|
|
||
| schema = StructType([ | ||
| StructField("id", IntegerType(), True), | ||
| StructField("vector", ArrayType(FloatType()), True), | ||
| ]) | ||
| df = spark.createDataFrame( | ||
| [ | ||
| (1, [1.0, 0.0, 0.0, 0.0]), | ||
| (2, [0.0, 1.0, 0.0, 0.0]), | ||
| (3, [0.0, 0.0, 1.0, 0.0]), | ||
| (4, [0.0, 0.0, 0.0, 1.0]), | ||
| ], | ||
| schema, | ||
| ).repartition(2) | ||
|
|
||
| ( | ||
| df.writeTo("default.test_table") | ||
| .option("use_namespace_insert", "true") | ||
| .option("namespace_insert_parallelism", "2") | ||
| .append() | ||
| ) | ||
|
|
||
| rows = spark.sql(""" | ||
| SELECT id, vector FROM default.test_table ORDER BY id | ||
| """).collect() | ||
|
|
||
| assert [row.id for row in rows] == [1, 2, 3, 4] | ||
| assert rows[0].vector == [1.0, 0.0, 0.0, 0.0] | ||
| assert rows[3].vector == [0.0, 0.0, 0.0, 1.0] | ||
|
|
||
|
|
||
| @requires_update_or_merge | ||
| class TestDMLUpdate: | ||
| """Test DML UPDATE SET operations.""" | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This workflow is intended to cover namespace insert, but the current
pathsmostly cover write/options/integration files. Namespace insert also depends on the Arrow IPC writer, blob-reference read/write path, source-context optimizer rule, session-extension injection, sharding utilities, namespace catalog construction, and runtime namespace wiring. Prefer source-area filters instead of enumerating individual dependencies, for examplelance-spark-base_2.12/src/main/**,lance-spark-base_2.12/src/test/**,lance-spark-*/src/main/**, andlance-spark-*/src/test/**, so key path changes still run the local/rest-dir namespace insert tests.