diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json index 83346d34aee0..e3d6056a5de9 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 16 + "modification": 1 } diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json index b60f5c4cc3c8..e3d6056a5de9 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 0 + "modification": 1 } diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json index b26833333238..c537844dc84a 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 2 + "modification": 3 } diff --git a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/schema.proto b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/schema.proto index 5250aaaa1a7a..58d16b83749d 100644 --- a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/schema.proto +++ b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/schema.proto @@ -174,6 +174,13 @@ message LogicalTypes { // A variable-length string with its maximum length as the argument. VAR_CHAR = 7 [(org.apache.beam.model.pipeline.v1.beam_urn) = "beam:logical_type:var_char:v1"]; + + // A URN for Date type + // - Representation type: INT64 + // - A date without a timezone, represented by the number of days + // since the epoch. + DATE = 8 [(org.apache.beam.model.pipeline.v1.beam_urn) = + "beam:logical_type:date:v1"]; } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java index 8ad5bb5ff97f..d527b2949235 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java @@ -43,6 +43,7 @@ import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.schemas.Schema.LogicalType; import org.apache.beam.sdk.schemas.Schema.TypeName; +import org.apache.beam.sdk.schemas.logicaltypes.Date; import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes; import org.apache.beam.sdk.schemas.logicaltypes.FixedPrecisionNumeric; import org.apache.beam.sdk.schemas.logicaltypes.FixedString; @@ -113,6 +114,7 @@ private static String getLogicalTypeUrn(String identifier) { .put(VariableBytes.IDENTIFIER, VariableBytes.class) .put(FixedString.IDENTIFIER, FixedString.class) .put(VariableString.IDENTIFIER, VariableString.class) + .put(Date.IDENTIFIER, Date.class) .build(); public static SchemaApi.Schema schemaToProto(Schema schema, boolean serializeLogicalType) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/Date.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/Date.java index 894b585fe660..09cc040b609c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/Date.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/Date.java @@ -18,7 +18,10 @@ package org.apache.beam.sdk.schemas.logicaltypes; import java.time.LocalDate; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.SchemaApi; import org.apache.beam.sdk.schemas.Schema; +import org.checkerframework.checker.nullness.qual.Nullable; /** * A date without a time-zone. @@ -30,23 +33,20 @@ * incrementing count of days where day 0 is 1970-01-01 (ISO). */ public class Date implements Schema.LogicalType { - public static final String IDENTIFIER = "beam:logical_type:date:v1"; + public static final String IDENTIFIER = + SchemaApi.LogicalTypes.Enum.DATE + .getValueDescriptor() + .getOptions() + .getExtension(RunnerApi.beamUrn); @Override public String getIdentifier() { return IDENTIFIER; } - // unused @Override - public Schema.FieldType getArgumentType() { - return Schema.FieldType.STRING; - } - - // unused - @Override - public String getArgument() { - return ""; + public Schema.@Nullable FieldType getArgumentType() { + return null; } @Override diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTranslationTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTranslationTest.java index fb9a8308fcff..8a382c4ad1b8 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTranslationTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTranslationTest.java @@ -25,6 +25,7 @@ import java.math.BigDecimal; import java.nio.charset.StandardCharsets; +import java.time.LocalDate; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.HashMap; @@ -142,6 +143,7 @@ public static Iterable data() { Field.of("decimal", FieldType.DECIMAL), Field.of("datetime", FieldType.DATETIME))) .add(Schema.of(Field.of("fixed_bytes", FieldType.logicalType(FixedBytes.of(24))))) .add(Schema.of(Field.of("micros_instant", FieldType.logicalType(new MicrosInstant())))) + .add(Schema.of(Field.of("date", FieldType.logicalType(SqlTypes.DATE)))) .add(Schema.of(Field.of("python_callable", FieldType.logicalType(new PythonCallable())))) .add( Schema.of( @@ -388,6 +390,7 @@ public static Iterable data() { .add(simpleRow(FieldType.DECIMAL, BigDecimal.valueOf(100000))) .add(simpleRow(FieldType.logicalType(new PortableNullArgLogicalType()), "str")) .add(simpleRow(FieldType.logicalType(new DateTime()), LocalDateTime.of(2000, 1, 3, 3, 1))) + .add(simpleRow(FieldType.logicalType(SqlTypes.DATE), LocalDate.of(2000, 1, 3))) .add(simpleNullRow(FieldType.STRING)) .add(simpleNullRow(FieldType.INT32)) .add(simpleNullRow(FieldType.map(FieldType.STRING, FieldType.INT32))) diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java index 4737e9a36e17..40731ae9ca31 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java @@ -38,6 +38,7 @@ import java.sql.SQLException; import java.sql.Time; import java.sql.Timestamp; +import java.time.LocalDate; import java.util.ArrayList; import java.util.Arrays; import java.util.Calendar; @@ -294,6 +295,13 @@ static JdbcIO.PreparedStatementSetCaller getPreparedStatementSetCaller( return (element, ps, i, fieldWithIndex) -> { ps.setBigDecimal(i + 1, element.getDecimal(fieldWithIndex.getIndex())); }; + } else if (logicalTypeName.equals( + org.apache.beam.sdk.schemas.logicaltypes.Date.IDENTIFIER)) { + return (element, ps, i, fieldWithIndex) -> { + LocalDate value = + element.getLogicalTypeValue(fieldWithIndex.getIndex(), LocalDate.class); + ps.setDate(i + 1, value == null ? null : Date.valueOf(value)); + }; } else if (logicalTypeName.equals("DATE")) { return (element, ps, i, fieldWithIndex) -> { ReadableDateTime value = element.getDateTime(fieldWithIndex.getIndex()); diff --git a/sdks/python/apache_beam/io/jdbc.py b/sdks/python/apache_beam/io/jdbc.py index df5d7f21a343..869717915031 100644 --- a/sdks/python/apache_beam/io/jdbc.py +++ b/sdks/python/apache_beam/io/jdbc.py @@ -360,6 +360,9 @@ def __init__( of the output PCollection elements. This bypasses automatic schema inference during pipeline construction. """ + # override new portable Date type with the current Jdbc type + # TODO: switch JdbcIO to return portable Date type + LogicalType.register_logical_type(JdbcDateType) classpath = classpath or DEFAULT_JDBC_CLASSPATH dataSchema = None diff --git a/sdks/python/apache_beam/portability/common_urns.py b/sdks/python/apache_beam/portability/common_urns.py index 74d9a39bb052..7777f63ffbe9 100644 --- a/sdks/python/apache_beam/portability/common_urns.py +++ b/sdks/python/apache_beam/portability/common_urns.py @@ -92,3 +92,4 @@ var_bytes = LogicalTypes.Enum.VAR_BYTES fixed_char = LogicalTypes.Enum.FIXED_CHAR var_char = LogicalTypes.Enum.VAR_CHAR +date = LogicalTypes.Enum.DATE diff --git a/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py b/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py index b1e53a79bd41..c44e911c71a9 100644 --- a/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py +++ b/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # - +import datetime import os import unittest import uuid @@ -33,7 +33,7 @@ "EXPANSION_JARS environment var is not provided, " "indicating that jars have not been built") class ManagedIcebergIT(unittest.TestCase): - WAREHOUSE = "gs://temp-storage-for-end-to-end-tests/xlang-python-using-java" + WAREHOUSE = "gs://temp-storage-for-end-to-end-tests" def setUp(self): self.test_pipeline = TestPipeline(is_integration_test=True) @@ -41,6 +41,7 @@ def setUp(self): self.args.extend([ '--experiments=enable_managed_transforms', ]) + self.project = self.test_pipeline.get_option('project') def _create_row(self, num: int): return beam.Row( @@ -49,16 +50,24 @@ def _create_row(self, num: int): bytes_=bytes(num), bool_=(num % 2 == 0), float_=(num + float(num) / 100), - arr_=[num, num, num]) + arr_=[num, num, num], + date_=datetime.date.today() - datetime.timedelta(days=num)) def test_write_read_pipeline(self): + biglake_catalog_props = { + 'type': 'rest', + 'uri': 'https://biglake.googleapis.com/iceberg/v1/restcatalog', + 'warehouse': self.WAREHOUSE, + 'header.x-goog-user-project': self.project, + 'rest.auth.type': 'google', + 'io-impl': 'org.apache.iceberg.gcp.gcs.GCSFileIO', + 'header.X-Iceberg-Access-Delegation': 'vended-credentials' + } + iceberg_config = { "table": "test_iceberg_write_read.test_" + uuid.uuid4().hex, "catalog_name": "default", - "catalog_properties": { - "type": "hadoop", - "warehouse": self.WAREHOUSE, - } + "catalog_properties": biglake_catalog_props } rows = [self._create_row(i) for i in range(100)] diff --git a/sdks/python/apache_beam/typehints/schemas.py b/sdks/python/apache_beam/typehints/schemas.py index d2c4db8cabca..a0934b110933 100644 --- a/sdks/python/apache_beam/typehints/schemas.py +++ b/sdks/python/apache_beam/typehints/schemas.py @@ -34,6 +34,7 @@ bytes <-----> BYTES ByteString ------> BYTES Timestamp <-----> LogicalType(urn="beam:logical_type:micros_instant:v1") + datetime.date <---> LogicalType(urn="beam:logical_type:date:v1") Decimal <-----> LogicalType(urn="beam:logical_type:fixed_decimal:v1") Mapping <-----> MapType Sequence <-----> ArrayType @@ -1244,7 +1245,6 @@ def argument(self): # TODO: A temporary fix for missing jdbc logical types. # See the discussion in https://github.com/apache/beam/issues/35738 for # more detail. -@LogicalType._register_internal class JdbcDateType(LogicalType[datetime.date, MillisInstant, str]): """ For internal use only; no backwards-compatibility guarantees. @@ -1330,3 +1330,30 @@ def argument(self): @classmethod def _from_typing(cls, typ): return cls() + + +@LogicalType._register_internal +class Date(NoArgumentLogicalType[datetime.date, np.int64]): + """Date logical type that handles ``datetime.date``, days since epoch.""" + EPOCH = datetime.date(1970, 1, 1) + + @classmethod + def urn(cls): + return common_urns.date.urn + + @classmethod + def representation_type(cls): + # type: () -> type + return np.int64 + + @classmethod + def language_type(cls): + return datetime.date + + def to_representation_type(self, value): + # type: (datetime.date) -> np.int64 + return (value - self.EPOCH).days + + def to_language_type(self, value): + # type: (np.int64) -> datetime.date + return self.EPOCH + datetime.timedelta(days=value) diff --git a/sdks/python/apache_beam/typehints/schemas_test.py b/sdks/python/apache_beam/typehints/schemas_test.py index 5a5d7396ab30..d70bf0c47d33 100644 --- a/sdks/python/apache_beam/typehints/schemas_test.py +++ b/sdks/python/apache_beam/typehints/schemas_test.py @@ -20,6 +20,7 @@ # pytype: skip-file import dataclasses +import datetime import itertools import pickle import unittest @@ -105,6 +106,7 @@ class ComplexSchema(NamedTuple): optional_array: Optional[Sequence[np.float32]] array_optional: Sequence[Optional[bool]] timestamp: Timestamp + date: datetime.date def get_test_beam_fieldtype_protos(): diff --git a/sdks/python/test-suites/dataflow/build.gradle b/sdks/python/test-suites/dataflow/build.gradle index be8f61236874..224d09ec8bf5 100644 --- a/sdks/python/test-suites/dataflow/build.gradle +++ b/sdks/python/test-suites/dataflow/build.gradle @@ -74,12 +74,14 @@ task examplesPostCommit { task gcpCrossLanguagePostCommit { getVersionsAsList('cross_language_validates_py_versions').each { + dependsOn.add(":sdks:python:buildPython") dependsOn.add(":sdks:python:test-suites:dataflow:py${getVersionSuffix(it)}:gcpCrossLanguagePythonUsingJava") } } task ioCrossLanguagePostCommit { getVersionsAsList('cross_language_validates_py_versions').each { + dependsOn.add(":sdks:python:buildPython") dependsOn.add(":sdks:python:test-suites:dataflow:py${getVersionSuffix(it)}:ioCrossLanguagePythonUsingJava") } }