Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 51 additions & 3 deletions api/oss/src/apis/fastapi/applications/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from oss.src.utils.exceptions import intercept_exceptions, suppress_exceptions
from oss.src.utils.caching import invalidate_cache

from oss.src.core.events.utils import publish_revision_event

from oss.src.core.git.types import VariantForkError
from oss.src.core.shared.dtos import (
Reference,
Expand Down Expand Up @@ -104,6 +106,12 @@ def _build_rename_apps_disabled_detail(*, existing_name: Optional[str]) -> str:


class ApplicationsRouter:
# `applications.revisions.{retrieved,fetched,queried,logged}` READ events
# are emitted from this router after each handler materializes its
# response. `applications.revisions.committed` is a WRITE event and is
# emitted from `ApplicationsService.commit_application_revision`, not
# from this router. See core/events/utils.py module docstring for the
# read-vs-write split rationale.
def __init__(
self,
*,
Expand Down Expand Up @@ -1406,6 +1414,14 @@ async def retrieve_application_revision(
resolution_info=resolution_info,
)

await publish_revision_event(
request=request,
domain="application",
action="retrieve",
revision=application_revision_response.application_revision,
count=application_revision_response.count,
)

return application_revision_response

@intercept_exceptions()
Expand Down Expand Up @@ -1470,11 +1486,21 @@ async def fetch_application_revision(
)
)

return ApplicationRevisionResponse(
response = ApplicationRevisionResponse(
count=1 if application_revision else 0,
application_revision=application_revision,
)

await publish_revision_event(
request=request,
domain="application",
action="fetch",
revision=response.application_revision,
count=response.count,
)

return response

@intercept_exceptions()
async def edit_application_revision(
self,
Expand Down Expand Up @@ -1640,11 +1666,21 @@ async def query_application_revisions(
f"Failed to resolve embeds for revision {revision.id}: {e}"
)

return ApplicationRevisionsResponse(
response = ApplicationRevisionsResponse(
count=len(application_revisions),
application_revisions=application_revisions,
)

await publish_revision_event(
request=request,
domain="application",
action="query",
revisions=response.application_revisions or [],
count=response.count,
)

return response

@intercept_exceptions()
async def commit_application_revision(
self,
Expand Down Expand Up @@ -1674,11 +1710,15 @@ async def commit_application_revision(
application_revision_commit=application_revision_commit_request.application_revision_commit,
)

return ApplicationRevisionResponse(
response = ApplicationRevisionResponse(
count=1 if application_revision else 0,
application_revision=application_revision,
)

# commit emission lives in ApplicationsService.commit_application_revision

return response

@intercept_exceptions()
async def log_application_revisions(
self,
Expand Down Expand Up @@ -1714,6 +1754,14 @@ async def log_application_revisions(
application_revisions=application_revisions,
)

await publish_revision_event(
request=request,
domain="application",
action="log",
revisions=revisions_response.application_revisions or [],
count=revisions_response.count,
)

return revisions_response

@intercept_exceptions()
Expand Down
48 changes: 46 additions & 2 deletions api/oss/src/apis/fastapi/environments/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from oss.src.utils.logging import get_module_logger
from oss.src.utils.exceptions import intercept_exceptions, suppress_exceptions

from oss.src.core.events.utils import publish_revision_event

from oss.src.core.shared.dtos import (
Reference,
)
Expand Down Expand Up @@ -77,6 +79,12 @@


class EnvironmentsRouter:
# `environments.revisions.{retrieved,fetched,queried,logged}` READ events
# are emitted from this router after each handler materializes its
# response. `environments.revisions.committed` is a WRITE event and is
# emitted from `EnvironmentsService.commit_environment_revision`, not
# from this router. (This was the original precedent for the read-vs-write
# split.) See core/events/utils.py module docstring for the rationale.
def __init__(
self,
*,
Expand Down Expand Up @@ -766,6 +774,14 @@ async def retrieve_environment_revision(
resolution_info=resolution_info,
)

await publish_revision_event(
request=request,
domain="environment",
action="retrieve",
revision=environment_revision_response.environment_revision,
count=environment_revision_response.count,
)

return environment_revision_response

@intercept_exceptions()
Expand Down Expand Up @@ -878,11 +894,21 @@ async def fetch_environment_revision(
)
)

return EnvironmentRevisionResponse(
response = EnvironmentRevisionResponse(
count=1 if environment_revision else 0,
environment_revision=environment_revision,
)

await publish_revision_event(
request=request,
domain="environment",
action="fetch",
revision=response.environment_revision,
count=response.count,
)

return response

@intercept_exceptions()
async def edit_environment_revision(
self,
Expand Down Expand Up @@ -1051,11 +1077,21 @@ async def query_environment_revisions(
f"Failed to resolve embeds for revision {revision.id}: {e}"
)

return EnvironmentRevisionsResponse(
response = EnvironmentRevisionsResponse(
count=len(environment_revisions),
environment_revisions=environment_revisions,
)

await publish_revision_event(
request=request,
domain="environment",
action="query",
revisions=response.environment_revisions or [],
count=response.count,
)

return response

@intercept_exceptions()
async def commit_environment_revision(
self,
Expand Down Expand Up @@ -1134,6 +1170,14 @@ async def log_environment_revisions(
environment_revisions=environment_revisions,
)

await publish_revision_event(
request=request,
domain="environment",
action="log",
revisions=revisions_response.environment_revisions or [],
count=revisions_response.count,
)

return revisions_response


Expand Down
54 changes: 51 additions & 3 deletions api/oss/src/apis/fastapi/evaluators/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from oss.src.utils.exceptions import intercept_exceptions, suppress_exceptions
from oss.src.utils.caching import invalidate_cache

from oss.src.core.events.utils import publish_revision_event

from oss.src.core.git.types import VariantForkError
from oss.src.core.shared.dtos import (
Reference,
Expand Down Expand Up @@ -121,6 +123,12 @@ def _registry_entry_to_catalog_template(


class EvaluatorsRouter:
# `evaluators.revisions.{retrieved,fetched,queried,logged}` READ events
# are emitted from this router after each handler materializes its
# response. `evaluators.revisions.committed` is a WRITE event and is
# emitted from `EvaluatorsService.commit_evaluator_revision`, not from
# this router. See core/events/utils.py module docstring for the
# read-vs-write split rationale.
def __init__(
self,
*,
Expand Down Expand Up @@ -1385,6 +1393,14 @@ async def retrieve_evaluator_revision(
resolution_info=resolution_info,
)

await publish_revision_event(
request=request,
domain="evaluator",
action="retrieve",
revision=evaluator_revision_response.evaluator_revision,
count=evaluator_revision_response.count,
)

return evaluator_revision_response

@intercept_exceptions()
Expand Down Expand Up @@ -1449,11 +1465,21 @@ async def fetch_evaluator_revision(
evaluator_revision_ref=Reference(id=evaluator_revision_id),
)

return EvaluatorRevisionResponse(
response = EvaluatorRevisionResponse(
count=1 if evaluator_revision else 0,
evaluator_revision=evaluator_revision,
)

await publish_revision_event(
request=request,
domain="evaluator",
action="fetch",
revision=response.evaluator_revision,
count=response.count,
)

return response

@intercept_exceptions()
async def edit_evaluator_revision(
self,
Expand Down Expand Up @@ -1608,11 +1634,21 @@ async def query_evaluator_revisions(
f"Failed to resolve embeds for revision {revision.id}: {e}"
)

return EvaluatorRevisionsResponse(
response = EvaluatorRevisionsResponse(
count=len(evaluator_revisions),
evaluator_revisions=evaluator_revisions,
)

await publish_revision_event(
request=request,
domain="evaluator",
action="query",
revisions=response.evaluator_revisions or [],
count=response.count,
)

return response

@intercept_exceptions()
async def commit_evaluator_revision(
self,
Expand Down Expand Up @@ -1641,11 +1677,15 @@ async def commit_evaluator_revision(
evaluator_revision_commit=evaluator_revision_commit_request.evaluator_revision_commit,
)

return EvaluatorRevisionResponse(
response = EvaluatorRevisionResponse(
count=1 if evaluator_revision else 0,
evaluator_revision=evaluator_revision,
)

# commit emission lives in EvaluatorsService.commit_evaluator_revision

return response

@intercept_exceptions()
async def log_evaluator_revisions(
self,
Expand Down Expand Up @@ -1678,6 +1718,14 @@ async def log_evaluator_revisions(
evaluator_revisions=evaluator_revisions,
)

await publish_revision_event(
request=request,
domain="evaluator",
action="log",
revisions=revisions_response.evaluator_revisions or [],
count=revisions_response.count,
)

return revisions_response

@intercept_exceptions()
Expand Down
Loading
Loading