Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/trigger_files/beam_PostCommit_Python.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"pr": "36271",
"modification": 39
}
"pr": "38069",
"modification": 40
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@
# However, newer sklearn is needed for testing on newer Python version
scikit-learn==1.0.2; python_version < '3.11'
# bump sklearn version when new Python version is supported
scikit-learn==1.7.1; python_version >= '3.11'
scikit-learn==1.7.1; python_version >= '3.11' and python_version < '3.14'
scikit-learn==1.7.2; python_version >= '3.14'
20 changes: 13 additions & 7 deletions sdks/python/apache_beam/ml/gcp/recommendations_ai.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,19 @@ def get_recommendation_user_event_client():


class CreateCatalogItem(PTransform):
"""Creates catalogitem information.
The ``PTransform`` returns a PCollectionTuple with a PCollections of
successfully and failed created CatalogItems.
"""Creates catalog item records.

The ``PTransform`` returns a ``PCollectionTuple`` of successfully created
catalog items (``created_catalog_items``) and failures
(``failed_catalog_items``).

Example usage::

pipeline | CreateCatalogItem(
project='example-gcp-project',
catalog_name='my-catalog')
result = (
pipeline
| CreateCatalogItem(
project='example-gcp-project', catalog_name='my-catalog'))
created = result.created_catalog_items
"""
def __init__(
self,
Expand Down Expand Up @@ -123,13 +127,15 @@ def expand(self, pcoll):
raise ValueError(
"""GCP project name needs to be specified in "project" pipeline
option""")
return pcoll | ParDo(
pardo = ParDo(
_CreateCatalogItemFn(
self.project,
self.retry,
self.timeout,
self.metadata,
self.catalog_name))
return pcoll | pardo.with_outputs(
FAILED_CATALOG_ITEMS, main='created_catalog_items')


class _CreateCatalogItemFn(DoFn):
Expand Down
8 changes: 7 additions & 1 deletion sdks/python/apache_beam/ml/gcp/recommendations_ai_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,16 @@ def test_CreateCatalogItem(self):
return_value=self._mock_client):
p = beam.Pipeline()

_ = (
create_outputs = (
p | "Create data" >> beam.Create([self._catalog_item])
| "Create CatalogItem" >>
recommendations_ai.CreateCatalogItem(project="test"))
_ = (
create_outputs.created_catalog_items
| 'CountCreated' >> beam.combiners.Count.Globally())
_ = (
create_outputs.failed_catalog_items
| 'CountFailed' >> beam.combiners.Count.Globally())

result = p.run()
result.wait_until_finish()
Expand Down
7 changes: 5 additions & 2 deletions sdks/python/apache_beam/ml/gcp/recommendations_ai_test_it.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,14 @@ def test_create_catalog_item(self):

with TestPipeline(is_integration_test=True) as p:
RecommendationAIIT.test_ran = True
output = (
create_outputs = (
p | 'Create data' >> beam.Create([CATALOG_ITEM])
| 'Create CatalogItem' >>
recommendations_ai.CreateCatalogItem(project=GCP_TEST_PROJECT)
recommendations_ai.CreateCatalogItem(project=GCP_TEST_PROJECT))
output = (
create_outputs.created_catalog_items
| beam.ParDo(extract_id) | beam.combiners.ToList())
_ = create_outputs.failed_catalog_items | beam.combiners.Count.Globally()

assert_that(output, equal_to([[CATALOG_ITEM["id"]]]))

Expand Down
Loading