From 436e0d512bc2672c599bea34821e42b7d4daffc3 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Sat, 4 Apr 2026 21:02:34 -0700 Subject: [PATCH 1/5] portable date python changes --- sdks/python/apache_beam/io/jdbc.py | 4 +++ .../apache_beam/portability/common_urns.py | 1 + .../transforms/managed_iceberg_it_test.py | 4 ++- sdks/python/apache_beam/typehints/schemas.py | 29 ++++++++++++++++++- .../apache_beam/typehints/schemas_test.py | 2 ++ 5 files changed, 38 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/io/jdbc.py b/sdks/python/apache_beam/io/jdbc.py index df5d7f21a343..27f7ce848e43 100644 --- a/sdks/python/apache_beam/io/jdbc.py +++ b/sdks/python/apache_beam/io/jdbc.py @@ -360,6 +360,10 @@ def __init__( of the output PCollection elements. This bypasses automatic schema inference during pipeline construction. """ + # override new portable Date type with the current Jdbc type + # TODO: switch JdbcIO to return portable Date type + LogicalType.register_logical_type(JdbcDateType) + classpath = classpath or DEFAULT_JDBC_CLASSPATH dataSchema = None diff --git a/sdks/python/apache_beam/portability/common_urns.py b/sdks/python/apache_beam/portability/common_urns.py index 74d9a39bb052..7777f63ffbe9 100644 --- a/sdks/python/apache_beam/portability/common_urns.py +++ b/sdks/python/apache_beam/portability/common_urns.py @@ -92,3 +92,4 @@ var_bytes = LogicalTypes.Enum.VAR_BYTES fixed_char = LogicalTypes.Enum.FIXED_CHAR var_char = LogicalTypes.Enum.VAR_CHAR +date = LogicalTypes.Enum.DATE diff --git a/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py b/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py index b1e53a79bd41..516dd167acb2 100644 --- a/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py +++ b/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py @@ -15,6 +15,7 @@ # limitations under the License. # +import datetime import os import unittest import uuid @@ -49,7 +50,8 @@ def _create_row(self, num: int): bytes_=bytes(num), bool_=(num % 2 == 0), float_=(num + float(num) / 100), - arr_=[num, num, num]) + arr_=[num, num, num], + date_=datetime.date.today() - datetime.timedelta(days=num))) def test_write_read_pipeline(self): iceberg_config = { diff --git a/sdks/python/apache_beam/typehints/schemas.py b/sdks/python/apache_beam/typehints/schemas.py index d2c4db8cabca..9e337f080fbf 100644 --- a/sdks/python/apache_beam/typehints/schemas.py +++ b/sdks/python/apache_beam/typehints/schemas.py @@ -34,6 +34,7 @@ bytes <-----> BYTES ByteString ------> BYTES Timestamp <-----> LogicalType(urn="beam:logical_type:micros_instant:v1") + datetime.date <---> LogicalType(urn="beam:logical_type:date:v1") Decimal <-----> LogicalType(urn="beam:logical_type:fixed_decimal:v1") Mapping <-----> MapType Sequence <-----> ArrayType @@ -1004,6 +1005,33 @@ def to_language_type(self, value): return Timestamp(seconds=int(value.seconds), micros=int(value.micros)) +@LogicalType._register_internal +class Date(NoArgumentLogicalType[datetime.date, np.int64]): + """Date logical type that handles ``datetime.date``, days since epoch.""" + EPOCH = datetime.date(1970, 1, 1) + + @classmethod + def urn(cls): + return common_urns.date.urn + + @classmethod + def representation_type(cls): + # type: () -> type + return np.int64 + + @classmethod + def language_type(cls): + return datetime.date + + def to_representation_type(self, value): + # type: (datetime.date) -> np.int64 + return (value - self.EPOCH).days + + def to_language_type(self, value): + # type: (np.int64) -> datetime.date + return self.EPOCH + datetime.timedelta(days=value) + + @LogicalType._register_internal class PythonCallable(NoArgumentLogicalType[PythonCallableWithSource, str]): """A logical type for PythonCallableSource objects.""" @@ -1244,7 +1272,6 @@ def argument(self): # TODO: A temporary fix for missing jdbc logical types. # See the discussion in https://github.com/apache/beam/issues/35738 for # more detail. -@LogicalType._register_internal class JdbcDateType(LogicalType[datetime.date, MillisInstant, str]): """ For internal use only; no backwards-compatibility guarantees. diff --git a/sdks/python/apache_beam/typehints/schemas_test.py b/sdks/python/apache_beam/typehints/schemas_test.py index 5a5d7396ab30..d70bf0c47d33 100644 --- a/sdks/python/apache_beam/typehints/schemas_test.py +++ b/sdks/python/apache_beam/typehints/schemas_test.py @@ -20,6 +20,7 @@ # pytype: skip-file import dataclasses +import datetime import itertools import pickle import unittest @@ -105,6 +106,7 @@ class ComplexSchema(NamedTuple): optional_array: Optional[Sequence[np.float32]] array_optional: Sequence[Optional[bool]] timestamp: Timestamp + date: datetime.date def get_test_beam_fieldtype_protos(): From befef067c63e8a2f45cf5b0d691af9783e65a936 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 6 Apr 2026 08:53:32 -0700 Subject: [PATCH 2/5] trigger ITs --- .../beam_PostCommit_Python_Xlang_Gcp_Dataflow.json | 2 +- .../trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json | 2 +- .../trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json | 2 +- .../trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json index bb5da04014ec..e3d6056a5de9 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 15 + "modification": 1 } diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json index 83346d34aee0..e3d6056a5de9 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 16 + "modification": 1 } diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json index b60f5c4cc3c8..e3d6056a5de9 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 0 + "modification": 1 } diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json index b26833333238..e3d6056a5de9 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 2 + "modification": 1 } From 7fd4d17c0115a268ed954506b7ef179053b8eaaf Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 6 Apr 2026 09:45:30 -0700 Subject: [PATCH 3/5] typo --- sdks/python/apache_beam/transforms/managed_iceberg_it_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py b/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py index 516dd167acb2..2fe270596f25 100644 --- a/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py +++ b/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py @@ -51,7 +51,7 @@ def _create_row(self, num: int): bool_=(num % 2 == 0), float_=(num + float(num) / 100), arr_=[num, num, num], - date_=datetime.date.today() - datetime.timedelta(days=num))) + date_=datetime.date.today() - datetime.timedelta(days=num)) def test_write_read_pipeline(self): iceberg_config = { From dfab67648b6523f084ebe588008952de8e355419 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 7 Apr 2026 11:23:13 -0700 Subject: [PATCH 4/5] add todo link --- sdks/python/apache_beam/io/jdbc.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/io/jdbc.py b/sdks/python/apache_beam/io/jdbc.py index 27f7ce848e43..e6646443bfb0 100644 --- a/sdks/python/apache_beam/io/jdbc.py +++ b/sdks/python/apache_beam/io/jdbc.py @@ -361,7 +361,8 @@ def __init__( schema inference during pipeline construction. """ # override new portable Date type with the current Jdbc type - # TODO: switch JdbcIO to return portable Date type + # TODO(https://github.com/apache/beam/issues/28359): + # switch JdbcIO to return portable Date type LogicalType.register_logical_type(JdbcDateType) classpath = classpath or DEFAULT_JDBC_CLASSPATH From b0eaf69b72018ebd4c87d001450b46022bc38ff7 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Fri, 10 Apr 2026 13:37:52 -0700 Subject: [PATCH 5/5] disable managed transforms --- .../transforms/managed_iceberg_it_test.py | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py b/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py index 2fe270596f25..458855c4b966 100644 --- a/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py +++ b/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py @@ -34,13 +34,13 @@ "EXPANSION_JARS environment var is not provided, " "indicating that jars have not been built") class ManagedIcebergIT(unittest.TestCase): - WAREHOUSE = "gs://temp-storage-for-end-to-end-tests/xlang-python-using-java" + WAREHOUSE = "gs://temp-storage-for-end-to-end-tests" def setUp(self): self.test_pipeline = TestPipeline(is_integration_test=True) self.args = self.test_pipeline.get_full_options_as_args() self.args.extend([ - '--experiments=enable_managed_transforms', + # '--experiments=enable_managed_transforms', ]) def _create_row(self, num: int): @@ -54,13 +54,20 @@ def _create_row(self, num: int): date_=datetime.date.today() - datetime.timedelta(days=num)) def test_write_read_pipeline(self): + biglake_catalog_props = { + 'type': 'rest', + 'uri': 'https://biglake.googleapis.com/iceberg/v1/restcatalog', + 'warehouse': self.WAREHOUSE, + 'header.x-goog-user-project': 'apache-beam-testing', + 'rest.auth.type': 'google', + 'io-impl': 'org.apache.iceberg.gcp.gcs.GCSFileIO', + 'header.X-Iceberg-Access-Delegation': 'vended-credentials', + 'rest-metrics-reporting-enabled': 'false' + } iceberg_config = { "table": "test_iceberg_write_read.test_" + uuid.uuid4().hex, "catalog_name": "default", - "catalog_properties": { - "type": "hadoop", - "warehouse": self.WAREHOUSE, - } + "catalog_properties": biglake_catalog_props } rows = [self._create_row(i) for i in range(100)]