diff --git a/.github/trigger_files/beam_PostCommit_Python.json b/.github/trigger_files/beam_PostCommit_Python.json index 5d0598c952f7..69f759d8463d 100644 --- a/.github/trigger_files/beam_PostCommit_Python.json +++ b/.github/trigger_files/beam_PostCommit_Python.json @@ -1,5 +1,5 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "pr": "36271", - "modification": 39 -} + "pr": "38069", + "modification": 40 +} diff --git a/sdks/python/apache_beam/examples/inference/sklearn_examples_requirements.txt b/sdks/python/apache_beam/examples/inference/sklearn_examples_requirements.txt index 30dbdb2f3715..d2793a47cddc 100644 --- a/sdks/python/apache_beam/examples/inference/sklearn_examples_requirements.txt +++ b/sdks/python/apache_beam/examples/inference/sklearn_examples_requirements.txt @@ -20,4 +20,5 @@ # However, newer sklearn is needed for testing on newer Python version scikit-learn==1.0.2; python_version < '3.11' # bump sklearn version when new Python version is supported -scikit-learn==1.7.1; python_version >= '3.11' +scikit-learn==1.7.1; python_version >= '3.11' and python_version < '3.14' +scikit-learn==1.7.2; python_version >= '3.14' diff --git a/sdks/python/apache_beam/ml/gcp/recommendations_ai.py b/sdks/python/apache_beam/ml/gcp/recommendations_ai.py index 077fc83bbd07..9730a6b2b1d9 100644 --- a/sdks/python/apache_beam/ml/gcp/recommendations_ai.py +++ b/sdks/python/apache_beam/ml/gcp/recommendations_ai.py @@ -79,15 +79,19 @@ def get_recommendation_user_event_client(): class CreateCatalogItem(PTransform): - """Creates catalogitem information. - The ``PTransform`` returns a PCollectionTuple with a PCollections of - successfully and failed created CatalogItems. + """Creates catalog item records. + + The ``PTransform`` returns a ``PCollectionTuple`` of successfully created + catalog items (``created_catalog_items``) and failures + (``failed_catalog_items``). Example usage:: - pipeline | CreateCatalogItem( - project='example-gcp-project', - catalog_name='my-catalog') + result = ( + pipeline + | CreateCatalogItem( + project='example-gcp-project', catalog_name='my-catalog')) + created = result.created_catalog_items """ def __init__( self, @@ -123,13 +127,15 @@ def expand(self, pcoll): raise ValueError( """GCP project name needs to be specified in "project" pipeline option""") - return pcoll | ParDo( + pardo = ParDo( _CreateCatalogItemFn( self.project, self.retry, self.timeout, self.metadata, self.catalog_name)) + return pcoll | pardo.with_outputs( + FAILED_CATALOG_ITEMS, main='created_catalog_items') class _CreateCatalogItemFn(DoFn): diff --git a/sdks/python/apache_beam/ml/gcp/recommendations_ai_test.py b/sdks/python/apache_beam/ml/gcp/recommendations_ai_test.py index d2844f8ac08c..a8da1cd38f45 100644 --- a/sdks/python/apache_beam/ml/gcp/recommendations_ai_test.py +++ b/sdks/python/apache_beam/ml/gcp/recommendations_ai_test.py @@ -65,10 +65,16 @@ def test_CreateCatalogItem(self): return_value=self._mock_client): p = beam.Pipeline() - _ = ( + create_outputs = ( p | "Create data" >> beam.Create([self._catalog_item]) | "Create CatalogItem" >> recommendations_ai.CreateCatalogItem(project="test")) + _ = ( + create_outputs.created_catalog_items + | 'CountCreated' >> beam.combiners.Count.Globally()) + _ = ( + create_outputs.failed_catalog_items + | 'CountFailed' >> beam.combiners.Count.Globally()) result = p.run() result.wait_until_finish() diff --git a/sdks/python/apache_beam/ml/gcp/recommendations_ai_test_it.py b/sdks/python/apache_beam/ml/gcp/recommendations_ai_test_it.py index ad2d45a8e539..33d776e54a50 100644 --- a/sdks/python/apache_beam/ml/gcp/recommendations_ai_test_it.py +++ b/sdks/python/apache_beam/ml/gcp/recommendations_ai_test_it.py @@ -77,11 +77,14 @@ def test_create_catalog_item(self): with TestPipeline(is_integration_test=True) as p: RecommendationAIIT.test_ran = True - output = ( + create_outputs = ( p | 'Create data' >> beam.Create([CATALOG_ITEM]) | 'Create CatalogItem' >> - recommendations_ai.CreateCatalogItem(project=GCP_TEST_PROJECT) + recommendations_ai.CreateCatalogItem(project=GCP_TEST_PROJECT)) + output = ( + create_outputs.created_catalog_items | beam.ParDo(extract_id) | beam.combiners.ToList()) + _ = create_outputs.failed_catalog_items | beam.combiners.Count.Globally() assert_that(output, equal_to([[CATALOG_ITEM["id"]]]))