diff --git a/base/service/src/main/java/org/eclipse/ditto/base/service/DittoService.java b/base/service/src/main/java/org/eclipse/ditto/base/service/DittoService.java index 0f515afecfa..58f01fbe3ba 100644 --- a/base/service/src/main/java/org/eclipse/ditto/base/service/DittoService.java +++ b/base/service/src/main/java/org/eclipse/ditto/base/service/DittoService.java @@ -41,6 +41,7 @@ import org.eclipse.ditto.base.service.config.json.JsonConfig; import org.eclipse.ditto.base.service.devops.DevOpsCommandsActor; import org.eclipse.ditto.base.service.devops.LogbackLoggingFacade; +import org.eclipse.ditto.internal.utils.pekko.config.DynamicConfigWatcherExtension; import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig; import org.eclipse.ditto.internal.utils.config.DittoConfigError; import org.eclipse.ditto.internal.utils.config.InstanceIdentifierSupplier; @@ -271,6 +272,7 @@ private void startPrometheusReporter() { private void initializeActorSystem(final ActorSystem actorSystem) { startPekkoManagement(actorSystem); startClusterBootstrap(actorSystem); + DynamicConfigWatcherExtension.get(actorSystem); startStatusSupplierActor(actorSystem); startDevOpsCommandsActor(actorSystem); diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/ConnectivityRootActor.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/ConnectivityRootActor.java index a3d83cb4d39..a8928869b82 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/ConnectivityRootActor.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/ConnectivityRootActor.java @@ -129,7 +129,8 @@ private ConnectivityRootActor(final ConnectivityConfig connectivityConfig, final var cleanupConfig = connectivityConfig.getConnectionConfig().getCleanupConfig(); - final var cleanupActorProps = PersistenceCleanupActor.props(cleanupConfig, mongoReadJournal, CLUSTER_ROLE); + final var cleanupActorProps = PersistenceCleanupActor.props(cleanupConfig, mongoReadJournal, CLUSTER_ROLE, + "ditto.connectivity.connection.cleanup"); startChildActor(PersistenceCleanupActor.ACTOR_NAME, cleanupActorProps); final ActorRef healthCheckingActor = getHealthCheckingActor(connectivityConfig); diff --git a/deployment/helm/ditto/Chart.yaml b/deployment/helm/ditto/Chart.yaml index 503a3161499..d3bfa2f7b6f 100644 --- a/deployment/helm/ditto/Chart.yaml +++ b/deployment/helm/ditto/Chart.yaml @@ -16,7 +16,7 @@ description: | A digital twin is a virtual, cloud based, representation of his real world counterpart (real world “Things”, e.g. devices like sensors, smart heating, connected cars, smart grids, EV charging stations etc). type: application -version: 3.8.16 # chart version is effectively set by release-job +version: 3.8.17 # chart version is effectively set by release-job appVersion: 3.8.12 keywords: - iot-chart diff --git a/deployment/helm/ditto/templates/connectivity-deployment.yaml b/deployment/helm/ditto/templates/connectivity-deployment.yaml index 86fd3cf8393..f69e870855c 100644 --- a/deployment/helm/ditto/templates/connectivity-deployment.yaml +++ b/deployment/helm/ditto/templates/connectivity-deployment.yaml @@ -374,6 +374,14 @@ spec: value: "{{ .Values.connectivity.config.connections.encryption.migration.batchSize }}" - name: CONNECTIVITY_ENCRYPTION_MIGRATION_MAX_DOCS_PER_MINUTE value: "{{ .Values.connectivity.config.connections.encryption.migration.maxDocumentsPerMinute }}" + {{- if .Values.connectivity.dynamicConfig.enabled }} + - name: DITTO_DYNAMIC_CONFIG_ENABLED + value: "true" + - name: DITTO_DYNAMIC_CONFIG_FILE_PATH + value: "{{ .Values.connectivity.dynamicConfig.filePath }}" + - name: DITTO_DYNAMIC_CONFIG_POLL_INTERVAL + value: "{{ .Values.connectivity.dynamicConfig.pollInterval }}" + {{- end }} {{- if .Values.connectivity.extraEnv }} {{- toYaml .Values.connectivity.extraEnv | nindent 12 }} {{- end }} @@ -447,6 +455,11 @@ spec: {{- end }} - name: ditto-heap-dumps mountPath: /opt/ditto/dumps + {{- if .Values.connectivity.dynamicConfig.enabled }} + - name: ditto-dynamic-config + mountPath: {{ dir .Values.connectivity.dynamicConfig.filePath }} + readOnly: true + {{- end }} resources: requests: cpu: {{ mulf .Values.connectivity.resources.cpu 1000 }}m @@ -503,4 +516,9 @@ spec: {{- end }} - name: ditto-heap-dumps emptyDir: {} + {{- if .Values.connectivity.dynamicConfig.enabled }} + - name: ditto-dynamic-config + configMap: + name: {{ .Release.Name }}-connectivity-dynamic-config + {{- end }} {{- end }} diff --git a/deployment/helm/ditto/templates/dynamic-configmap.yaml b/deployment/helm/ditto/templates/dynamic-configmap.yaml new file mode 100644 index 00000000000..c69284bb3b2 --- /dev/null +++ b/deployment/helm/ditto/templates/dynamic-configmap.yaml @@ -0,0 +1,32 @@ +# Copyright (c) 2026 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License 2.0 which is available at +# http://www.eclipse.org/legal/epl-2.0 +# +# SPDX-License-Identifier: EPL-2.0 +{{- $releaseName := .Release.Name -}} +{{- $name := include "ditto.name" . -}} +{{- $labels := include "ditto.labels" . -}} +{{- $namespace := .Release.Namespace -}} +{{- range $serviceName, $serviceValues := dict "policies" .Values.policies "things" .Values.things "thingsSearch" .Values.thingsSearch "connectivity" .Values.connectivity "gateway" .Values.gateway }} +{{- if $serviceValues.dynamicConfig.enabled }} +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: {{ $releaseName }}-{{ $serviceName | kebabcase }}-dynamic-config + namespace: {{ $namespace }} + labels: + app.kubernetes.io/name: {{ $name }}-{{ $serviceName | kebabcase }}-dynamic-config +{{ $labels | indent 4 }} +data: + dynamic.conf: |- +{{- if $serviceValues.dynamicConfig.content }} +{{ $serviceValues.dynamicConfig.content | indent 4 }} +{{- end }} +{{- end }} +{{- end }} diff --git a/deployment/helm/ditto/templates/gateway-deployment.yaml b/deployment/helm/ditto/templates/gateway-deployment.yaml index 865b8d4a399..38d730876b7 100644 --- a/deployment/helm/ditto/templates/gateway-deployment.yaml +++ b/deployment/helm/ditto/templates/gateway-deployment.yaml @@ -279,6 +279,14 @@ spec: value: "{{ .Values.gateway.config.wotDirectory.basePrefix }}" - name: GATEWAY_WOT_DIRECTORY_AUTHENTICATION_REQUIRED value: "{{ .Values.gateway.config.wotDirectory.authenticationRequired }}" + {{- if .Values.gateway.dynamicConfig.enabled }} + - name: DITTO_DYNAMIC_CONFIG_ENABLED + value: "true" + - name: DITTO_DYNAMIC_CONFIG_FILE_PATH + value: "{{ .Values.gateway.dynamicConfig.filePath }}" + - name: DITTO_DYNAMIC_CONFIG_POLL_INTERVAL + value: "{{ .Values.gateway.dynamicConfig.pollInterval }}" + {{- end }} {{- if .Values.gateway.extraEnv }} {{- toYaml .Values.gateway.extraEnv | nindent 12 }} {{- end }} @@ -341,6 +349,11 @@ spec: {{- end }} - name: ditto-heap-dumps mountPath: /opt/ditto/dumps + {{- if .Values.gateway.dynamicConfig.enabled }} + - name: ditto-dynamic-config + mountPath: {{ dir .Values.gateway.dynamicConfig.filePath }} + readOnly: true + {{- end }} resources: requests: cpu: {{ mulf .Values.gateway.resources.cpu 1000 }}m @@ -397,4 +410,9 @@ spec: {{- end }} - name: ditto-heap-dumps emptyDir: {} + {{- if .Values.gateway.dynamicConfig.enabled }} + - name: ditto-dynamic-config + configMap: + name: {{ .Release.Name }}-gateway-dynamic-config + {{- end }} {{- end }} diff --git a/deployment/helm/ditto/templates/policies-deployment.yaml b/deployment/helm/ditto/templates/policies-deployment.yaml index f8a46b371dc..e0796eed17f 100644 --- a/deployment/helm/ditto/templates/policies-deployment.yaml +++ b/deployment/helm/ditto/templates/policies-deployment.yaml @@ -314,6 +314,14 @@ spec: value: "{{ .Values.policies.config.policiesEnforcer.cache.expireAfterWrite }}" - name: DITTO_POLICIES_ENFORCER_CACHE_EXPIRE_AFTER_ACCESS value: "{{ .Values.policies.config.policiesEnforcer.cache.expireAfterAccess }}" + {{- if .Values.policies.dynamicConfig.enabled }} + - name: DITTO_DYNAMIC_CONFIG_ENABLED + value: "true" + - name: DITTO_DYNAMIC_CONFIG_FILE_PATH + value: "{{ .Values.policies.dynamicConfig.filePath }}" + - name: DITTO_DYNAMIC_CONFIG_POLL_INTERVAL + value: "{{ .Values.policies.dynamicConfig.pollInterval }}" + {{- end }} {{- if .Values.policies.extraEnv }} {{- toYaml .Values.policies.extraEnv | nindent 12 }} {{- end }} @@ -390,6 +398,11 @@ spec: {{- end }} - name: ditto-heap-dumps mountPath: /opt/ditto/dumps + {{- if .Values.policies.dynamicConfig.enabled }} + - name: ditto-dynamic-config + mountPath: {{ dir .Values.policies.dynamicConfig.filePath }} + readOnly: true + {{- end }} resources: requests: cpu: {{ mulf .Values.policies.resources.cpu 1000 }}m @@ -446,4 +459,9 @@ spec: {{- end }} - name: ditto-heap-dumps emptyDir: {} + {{- if .Values.policies.dynamicConfig.enabled }} + - name: ditto-dynamic-config + configMap: + name: {{ .Release.Name }}-policies-dynamic-config + {{- end }} {{- end }} diff --git a/deployment/helm/ditto/templates/things-deployment.yaml b/deployment/helm/ditto/templates/things-deployment.yaml index 0c60c408480..b64c804469e 100644 --- a/deployment/helm/ditto/templates/things-deployment.yaml +++ b/deployment/helm/ditto/templates/things-deployment.yaml @@ -386,6 +386,14 @@ spec: value: "{{ index .Values.things.config.wot.tmValidation.feature.forbid "non-modeled-desired-properties" }}" - name: THINGS_WOT_TM_MODEL_VALIDATION_FEATURE_FORBID_NON_MODELED_OUTBOX_MESSAGES value: "{{ index .Values.things.config.wot.tmValidation.feature.forbid "non-modeled-outbox-messages" }}" + {{- if .Values.things.dynamicConfig.enabled }} + - name: DITTO_DYNAMIC_CONFIG_ENABLED + value: "true" + - name: DITTO_DYNAMIC_CONFIG_FILE_PATH + value: "{{ .Values.things.dynamicConfig.filePath }}" + - name: DITTO_DYNAMIC_CONFIG_POLL_INTERVAL + value: "{{ .Values.things.dynamicConfig.pollInterval }}" + {{- end }} {{- if .Values.things.extraEnv }} {{- toYaml .Values.things.extraEnv | nindent 12 }} {{- end }} @@ -459,6 +467,11 @@ spec: {{- end }} - name: ditto-heap-dumps mountPath: /opt/ditto/dumps + {{- if .Values.things.dynamicConfig.enabled }} + - name: ditto-dynamic-config + mountPath: {{ dir .Values.things.dynamicConfig.filePath }} + readOnly: true + {{- end }} resources: requests: cpu: {{ mulf .Values.things.resources.cpu 1000 }}m @@ -515,4 +528,9 @@ spec: {{- end }} - name: ditto-heap-dumps emptyDir: {} + {{- if .Values.things.dynamicConfig.enabled }} + - name: ditto-dynamic-config + configMap: + name: {{ .Release.Name }}-things-dynamic-config + {{- end }} {{- end }} diff --git a/deployment/helm/ditto/templates/thingssearch-deployment.yaml b/deployment/helm/ditto/templates/thingssearch-deployment.yaml index 4e009996f7b..5605ade76f8 100644 --- a/deployment/helm/ditto/templates/thingssearch-deployment.yaml +++ b/deployment/helm/ditto/templates/thingssearch-deployment.yaml @@ -286,6 +286,14 @@ spec: value: "{{ .Values.thingsSearch.config.updater.backgroundSync.throttle.throughput }}" - name: BACKGROUND_SYCN_THROTTLE_PERIOD value: "{{ .Values.thingsSearch.config.updater.backgroundSync.throttle.period }}" + {{- if .Values.thingsSearch.dynamicConfig.enabled }} + - name: DITTO_DYNAMIC_CONFIG_ENABLED + value: "true" + - name: DITTO_DYNAMIC_CONFIG_FILE_PATH + value: "{{ .Values.thingsSearch.dynamicConfig.filePath }}" + - name: DITTO_DYNAMIC_CONFIG_POLL_INTERVAL + value: "{{ .Values.thingsSearch.dynamicConfig.pollInterval }}" + {{- end }} {{- if .Values.thingsSearch.extraEnv }} {{- toYaml .Values.thingsSearch.extraEnv | nindent 12 }} {{- end }} @@ -359,6 +367,11 @@ spec: {{- end }} - name: ditto-heap-dumps mountPath: /opt/ditto/dumps + {{- if .Values.thingsSearch.dynamicConfig.enabled }} + - name: ditto-dynamic-config + mountPath: {{ dir .Values.thingsSearch.dynamicConfig.filePath }} + readOnly: true + {{- end }} resources: requests: cpu: {{ mulf .Values.thingsSearch.resources.cpu 1000 }}m @@ -415,4 +428,9 @@ spec: {{- end }} - name: ditto-heap-dumps emptyDir: {} + {{- if .Values.thingsSearch.dynamicConfig.enabled }} + - name: ditto-dynamic-config + configMap: + name: {{ .Release.Name }}-things-search-dynamic-config + {{- end }} {{- end }} diff --git a/deployment/helm/ditto/values.yaml b/deployment/helm/ditto/values.yaml index c93917e834b..757fecee0a1 100644 --- a/deployment/helm/ditto/values.yaml +++ b/deployment/helm/ditto/values.yaml @@ -763,6 +763,24 @@ policies: enabled: false # interval: 30s # scrapeTimeout: 15s + # dynamicConfig enables hot-reloading of HOCON configuration from a ConfigMap without pod restart. + # When enabled, a ConfigMap is mounted at the specified file path and polled at the given interval. + # Changes to the ConfigMap are detected and propagated to actors via Pekko EventStream. + # content: | + # ditto { + # policies { + # policy { + # activity-check { + # inactive-interval = 2h + # } + # } + # } + # } + dynamicConfig: + enabled: false + pollInterval: 30s + filePath: "/opt/ditto/dynamic-config/dynamic.conf" + content: "" # config holds policies specific configuration config: # cluster contains policies specific clustering config @@ -1102,6 +1120,22 @@ things: enabled: false # interval: 30s # scrapeTimeout: 15s + # dynamicConfig enables hot-reloading of HOCON configuration from a ConfigMap without pod restart. + # content: | + # ditto { + # things { + # thing { + # activity-check { + # inactive-interval = 2h + # } + # } + # } + # } + dynamicConfig: + enabled: false + pollInterval: 30s + filePath: "/opt/ditto/dynamic-config/dynamic.conf" + content: "" # config holds things specific configuration config: # cluster contains things specific clustering config @@ -1601,6 +1635,22 @@ thingsSearch: enabled: false # interval: 30s # scrapeTimeout: 15s + # dynamicConfig enables hot-reloading of HOCON configuration from a ConfigMap without pod restart. + # content: | + # ditto { + # search { + # operator-metrics { + # custom-metrics { + # my_metric { ... } + # } + # } + # } + # } + dynamicConfig: + enabled: false + pollInterval: 30s + filePath: "/opt/ditto/dynamic-config/dynamic.conf" + content: "" # config holds things-search specific configuration config: # cluster contains things-search specific clustering config @@ -1941,6 +1991,20 @@ connectivity: enabled: false # interval: 30s # scrapeTimeout: 15s + # dynamicConfig enables hot-reloading of HOCON configuration from a ConfigMap without pod restart. + # content: | + # ditto { + # connectivity { + # connection { + # ... + # } + # } + # } + dynamicConfig: + enabled: false + pollInterval: 30s + filePath: "/opt/ditto/dynamic-config/dynamic.conf" + content: "" # config holds connectivity specific configuration config: # cluster contains connectivity specific clustering config @@ -2368,6 +2432,20 @@ gateway: enabled: false # interval: 30s # scrapeTimeout: 15s + # dynamicConfig enables hot-reloading of HOCON configuration from a ConfigMap without pod restart. + # content: | + # ditto { + # gateway { + # streaming { + # ... + # } + # } + # } + dynamicConfig: + enabled: false + pollInterval: 30s + filePath: "/opt/ditto/dynamic-config/dynamic.conf" + content: "" # config holds gateway specific configuration config: # cluster contains gateway specific clustering config diff --git a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/directives/auth/DevopsAuthenticationDirectiveFactory.java b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/directives/auth/DevopsAuthenticationDirectiveFactory.java index 2ea6f86a991..d27f102ae26 100644 --- a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/directives/auth/DevopsAuthenticationDirectiveFactory.java +++ b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/directives/auth/DevopsAuthenticationDirectiveFactory.java @@ -21,7 +21,7 @@ public final class DevopsAuthenticationDirectiveFactory { private final JwtAuthenticationProvider jwtAuthenticationProvider; - private final DevOpsConfig devOpsConfig; + private volatile DevOpsConfig devOpsConfig; private DevopsAuthenticationDirectiveFactory(final JwtAuthenticationProvider jwtAuthenticationProvider, final DevOpsConfig devOpsConfig) { @@ -43,24 +43,54 @@ public static DevopsAuthenticationDirectiveFactory newInstance( return new DevopsAuthenticationDirectiveFactory(jwtAuthenticationProvider, devOpsConfig); } + /** + * Updates the DevOps configuration. Called when dynamic config changes are detected. + * + * @param devOpsConfig the new DevOps config. + */ + public void updateDevOpsConfig(final DevOpsConfig devOpsConfig) { + this.devOpsConfig = devOpsConfig; + } + + /** + * Returns a lazy devops authentication directive that re-evaluates the current config on each request. + * + * @return the devops authentication directive. + */ public DevopsAuthenticationDirective status() { - if (!devOpsConfig.isSecured() || !devOpsConfig.isStatusSecured()) { + return (realm, dittoHeaders, inner) -> createStatusDirective() + .authenticateDevOps(realm, dittoHeaders, inner); + } + + /** + * Returns a lazy devops authentication directive that re-evaluates the current config on each request. + * + * @return the devops authentication directive. + */ + public DevopsAuthenticationDirective devops() { + return (realm, dittoHeaders, inner) -> createDevopsDirective() + .authenticateDevOps(realm, dittoHeaders, inner); + } + + private DevopsAuthenticationDirective createStatusDirective() { + final DevOpsConfig currentConfig = devOpsConfig; + if (!currentConfig.isSecured() || !currentConfig.isStatusSecured()) { return DevOpsInsecureAuthenticationDirective.getInstance(); } - return switch (devOpsConfig.getStatusAuthenticationMethod()) { - case BASIC -> DevOpsBasicAuthenticationDirective.status(devOpsConfig); - case OAUTH2 -> DevOpsOAuth2AuthenticationDirective.status(devOpsConfig, jwtAuthenticationProvider); + return switch (currentConfig.getStatusAuthenticationMethod()) { + case BASIC -> DevOpsBasicAuthenticationDirective.status(currentConfig); + case OAUTH2 -> DevOpsOAuth2AuthenticationDirective.status(currentConfig, jwtAuthenticationProvider); }; - } - public DevopsAuthenticationDirective devops() { - if (!devOpsConfig.isSecured()) { + private DevopsAuthenticationDirective createDevopsDirective() { + final DevOpsConfig currentConfig = devOpsConfig; + if (!currentConfig.isSecured()) { return DevOpsInsecureAuthenticationDirective.getInstance(); } - return switch (devOpsConfig.getDevopsAuthenticationMethod()) { - case BASIC -> DevOpsBasicAuthenticationDirective.devops(devOpsConfig); - case OAUTH2 -> DevOpsOAuth2AuthenticationDirective.devops(devOpsConfig, jwtAuthenticationProvider); + return switch (currentConfig.getDevopsAuthenticationMethod()) { + case BASIC -> DevOpsBasicAuthenticationDirective.devops(currentConfig); + case OAUTH2 -> DevOpsOAuth2AuthenticationDirective.devops(currentConfig, jwtAuthenticationProvider); }; } } diff --git a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/directives/auth/DittoGatewayAuthenticationDirectiveFactory.java b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/directives/auth/DittoGatewayAuthenticationDirectiveFactory.java index 4c9652e348f..f7fe4d623bf 100644 --- a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/directives/auth/DittoGatewayAuthenticationDirectiveFactory.java +++ b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/directives/auth/DittoGatewayAuthenticationDirectiveFactory.java @@ -44,11 +44,13 @@ public final class DittoGatewayAuthenticationDirectiveFactory implements Gateway private static final Logger LOGGER = LoggerFactory.getLogger(DittoGatewayAuthenticationDirectiveFactory.class); private static final String AUTHENTICATION_DISPATCHER_NAME = "authentication-dispatcher"; - private final AuthenticationConfig authConfig; + private volatile AuthenticationConfig authConfig; private final Executor authenticationDispatcher; private final Config dittoExtensionConfig; @Nullable private GatewayAuthenticationDirective gatewayHttpAuthenticationDirective; @Nullable private GatewayAuthenticationDirective gatewayWsAuthenticationDirective; + @Nullable private JwtAuthenticationProvider httpJwtProvider; + @Nullable private JwtAuthenticationProvider wsJwtProvider; public DittoGatewayAuthenticationDirectiveFactory(final ActorSystem actorSystem, final Config config) { authConfig = DittoGatewayConfig.of(DefaultScopedConfig.dittoScoped(actorSystem.settings().config())) @@ -62,15 +64,14 @@ public GatewayAuthenticationDirective buildHttpAuthentication( final JwtAuthenticationFactory jwtAuthenticationFactory) { if (null == gatewayHttpAuthenticationDirective) { - final JwtAuthenticationProvider jwtHttpAuthenticationProvider = - JwtAuthenticationProvider.newInstance( - jwtAuthenticationFactory.newJwtAuthenticationResultProvider( - dittoExtensionConfig, null - ), - jwtAuthenticationFactory.getJwtValidator() - ); + httpJwtProvider = JwtAuthenticationProvider.newInstance( + jwtAuthenticationFactory.newJwtAuthenticationResultProvider( + dittoExtensionConfig, null + ), + jwtAuthenticationFactory.getJwtValidator() + ); gatewayHttpAuthenticationDirective = - generateGatewayAuthenticationDirective(authConfig, jwtHttpAuthenticationProvider, + generateGatewayAuthenticationDirective(authConfig, httpJwtProvider, authenticationDispatcher); } return gatewayHttpAuthenticationDirective; @@ -81,21 +82,45 @@ public GatewayAuthenticationDirective buildWsAuthentication( final JwtAuthenticationFactory jwtAuthenticationFactory) { if (null == gatewayWsAuthenticationDirective) { - final JwtAuthenticationProvider jwtWsAuthenticationProvider = - JwtAuthenticationProvider.newWsInstance( - jwtAuthenticationFactory.newJwtAuthenticationResultProvider( - dittoExtensionConfig, null - ), - jwtAuthenticationFactory.getJwtValidator() - ); + wsJwtProvider = JwtAuthenticationProvider.newWsInstance( + jwtAuthenticationFactory.newJwtAuthenticationResultProvider( + dittoExtensionConfig, null + ), + jwtAuthenticationFactory.getJwtValidator() + ); gatewayWsAuthenticationDirective = - generateGatewayAuthenticationDirective(authConfig, jwtWsAuthenticationProvider, + generateGatewayAuthenticationDirective(authConfig, wsJwtProvider, authenticationDispatcher); } return gatewayWsAuthenticationDirective; } - private static GatewayAuthenticationDirective generateGatewayAuthenticationDirective( + /** + * Updates the authentication configuration. Called when dynamic config changes are detected. + * Rebuilds the authentication chains for both HTTP and WebSocket directives if they have been built. + * + * @param authenticationConfig the new authentication config. + */ + public void updateAuthConfig(final AuthenticationConfig authenticationConfig) { + final boolean preAuthChanged = this.authConfig.isPreAuthenticationEnabled() != + authenticationConfig.isPreAuthenticationEnabled(); + this.authConfig = authenticationConfig; + + if (preAuthChanged) { + LOGGER.info("Pre-authentication enabled changed to <{}>. Rebuilding authentication chains.", + authenticationConfig.isPreAuthenticationEnabled()); + if (gatewayHttpAuthenticationDirective != null && httpJwtProvider != null) { + gatewayHttpAuthenticationDirective.updateAuthenticationChain( + buildAuthenticationChain(authenticationConfig, httpJwtProvider, authenticationDispatcher)); + } + if (gatewayWsAuthenticationDirective != null && wsJwtProvider != null) { + gatewayWsAuthenticationDirective.updateAuthenticationChain( + buildAuthenticationChain(authenticationConfig, wsJwtProvider, authenticationDispatcher)); + } + } + } + + private static AuthenticationChain buildAuthenticationChain( final AuthenticationConfig authConfig, final AuthenticationProvider jwtAuthenticationProvider, final Executor authenticationDispatcher) { @@ -111,11 +136,17 @@ private static GatewayAuthenticationDirective generateGatewayAuthenticationDirec final AuthenticationFailureAggregator authenticationFailureAggregator = AuthenticationFailureAggregators.getDefault(); - final AuthenticationChain authenticationChain = - AuthenticationChain.getInstance(authenticationProviders, authenticationFailureAggregator, - authenticationDispatcher); + return AuthenticationChain.getInstance(authenticationProviders, authenticationFailureAggregator, + authenticationDispatcher); + } + + private static GatewayAuthenticationDirective generateGatewayAuthenticationDirective( + final AuthenticationConfig authConfig, + final AuthenticationProvider jwtAuthenticationProvider, + final Executor authenticationDispatcher) { - return new GatewayAuthenticationDirective(authenticationChain); + return new GatewayAuthenticationDirective( + buildAuthenticationChain(authConfig, jwtAuthenticationProvider, authenticationDispatcher)); } } diff --git a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/directives/auth/GatewayAuthenticationDirective.java b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/directives/auth/GatewayAuthenticationDirective.java index 16acfb19d6a..dc01c12625d 100755 --- a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/directives/auth/GatewayAuthenticationDirective.java +++ b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/directives/auth/GatewayAuthenticationDirective.java @@ -43,7 +43,7 @@ public final class GatewayAuthenticationDirective { private static final DittoLogger LOGGER = DittoLoggerFactory.getLogger(GatewayAuthenticationDirective.class); - private final AuthenticationChain authenticationChain; + private volatile AuthenticationChain authenticationChain; private final Function defaultUnauthorizedExceptionFactory; /** @@ -74,6 +74,15 @@ public GatewayAuthenticationDirective(final AuthenticationChain authenticationCh checkNotNull(defaultUnauthorizedExceptionFactory, "defaultUnauthorizedExceptionFactory"); } + /** + * Updates the authentication chain used by this directive. Called when dynamic config changes are detected. + * + * @param authenticationChain the new authentication chain. + */ + void updateAuthenticationChain(final AuthenticationChain authenticationChain) { + this.authenticationChain = checkNotNull(authenticationChain, "authenticationChain"); + } + /** * Depending on the request headers, one of the supported authentication mechanisms is applied. * diff --git a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/security/authentication/jwt/DittoPublicKeyProvider.java b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/security/authentication/jwt/DittoPublicKeyProvider.java index df09b92bd48..6b8313d1682 100644 --- a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/security/authentication/jwt/DittoPublicKeyProvider.java +++ b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/security/authentication/jwt/DittoPublicKeyProvider.java @@ -93,10 +93,10 @@ public final class DittoPublicKeyProvider implements PublicKeyProvider { "P-521", "secp521r1" ); - private final JwtSubjectIssuersConfig jwtSubjectIssuersConfig; + private volatile JwtSubjectIssuersConfig jwtSubjectIssuersConfig; private final HttpClientFacade httpClient; private final Materializer materializer; - private final OAuthConfig oAuthConfig; + private volatile OAuthConfig oAuthConfig; private final Cache publicKeyCache; private DittoPublicKeyProvider(final JwtSubjectIssuersConfig jwtSubjectIssuersConfig, @@ -146,7 +146,7 @@ private DittoPublicKeyProvider(final JwtSubjectIssuersConfig jwtSubjectIssuersCo * @return the PublicKeyProvider. * @throws NullPointerException if any argument is {@code null}. */ - public static PublicKeyProvider of(final JwtSubjectIssuersConfig jwtSubjectIssuersConfig, + public static DittoPublicKeyProvider of(final JwtSubjectIssuersConfig jwtSubjectIssuersConfig, final HttpClientFacade httpClient, final CacheConfig publicKeysCacheConfig, final String cacheName, @@ -156,6 +156,31 @@ public static PublicKeyProvider of(final JwtSubjectIssuersConfig jwtSubjectIssue oAuthConfig); } + /** + * Updates the OAuth and subject issuer configuration. Called when dynamic config changes are detected. + * New/removed issuers take effect immediately for subsequent JWT validations. Cached public keys with + * potentially stale clock skew settings will expire naturally based on their TTL. + * + * @param newIssuersConfig the new subject issuers config. + * @param newOAuthConfig the new OAuth config. + */ + void updateConfig(final JwtSubjectIssuersConfig newIssuersConfig, final OAuthConfig newOAuthConfig) { + final JwtSubjectIssuersConfig previousConfig = this.jwtSubjectIssuersConfig; + this.jwtSubjectIssuersConfig = newIssuersConfig; + this.oAuthConfig = newOAuthConfig; + + // Invalidate cached public keys for issuers that were removed + publicKeyCache.asMap().keySet().removeIf(cacheKey -> { + if (newIssuersConfig.getConfigItem(cacheKey.getIssuer()).isEmpty()) { + LOGGER.info("Invalidating cached public key for removed issuer <{}>.", cacheKey.getIssuer()); + return true; + } + return false; + }); + + LOGGER.info("Updated OAuth config. Known issuers: {}", newIssuersConfig); + } + @Override public CompletableFuture> getPublicKeyWithParser(final String issuer, final String keyId) { diff --git a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/security/authentication/jwt/JwtAuthenticationFactory.java b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/security/authentication/jwt/JwtAuthenticationFactory.java index 9f0916ac60d..1fec040e1cb 100644 --- a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/security/authentication/jwt/JwtAuthenticationFactory.java +++ b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/security/authentication/jwt/JwtAuthenticationFactory.java @@ -30,14 +30,14 @@ public final class JwtAuthenticationFactory { private static final String PUBLIC_KEY_CACHE_NAME = "ditto_authorization_jwt_publicKeys_cache"; - private final OAuthConfig oAuthConfig; + private volatile OAuthConfig oAuthConfig; private final CacheConfig publicKeyCacheConfig; private final HttpClientFacade httpClientFacade; private final ActorSystem actorSystem; @Nullable private JwtValidator jwtValidator; @Nullable private JwtSubjectIssuersConfig jwtSubjectIssuersConfig; - @Nullable private PublicKeyProvider publicKeyProvider; + @Nullable private DittoPublicKeyProvider dittoPublicKeyProvider; private JwtAuthenticationFactory(final OAuthConfig oAuthConfig, final CacheConfig publicKeyCacheConfig, @@ -75,9 +75,9 @@ public JwtValidator getJwtValidator() { return jwtValidator; } - private PublicKeyProvider getPublicKeyProvider() { - if (null == publicKeyProvider) { - publicKeyProvider = DittoPublicKeyProvider.of( + private DittoPublicKeyProvider getPublicKeyProvider() { + if (null == dittoPublicKeyProvider) { + dittoPublicKeyProvider = DittoPublicKeyProvider.of( getJwtSubjectIssuersConfig(), httpClientFacade, publicKeyCacheConfig, @@ -85,7 +85,7 @@ private PublicKeyProvider getPublicKeyProvider() { oAuthConfig); } - return publicKeyProvider; + return dittoPublicKeyProvider; } private JwtSubjectIssuersConfig getJwtSubjectIssuersConfig() { @@ -101,4 +101,20 @@ public JwtAuthenticationResultProvider newJwtAuthenticationResultProvider(final return JwtAuthenticationResultProvider.get(actorSystem, extensionConfig, role); } + /** + * Updates the OAuth configuration. Called when dynamic config changes are detected. + * Rebuilds the subject issuers config and updates the public key provider so that + * new/removed JWT issuers take effect without restart. + * + * @param newOAuthConfig the new OAuth config. + */ + public void updateOAuthConfig(final OAuthConfig newOAuthConfig) { + this.oAuthConfig = newOAuthConfig; + final JwtSubjectIssuersConfig newIssuersConfig = JwtSubjectIssuersConfig.fromOAuthConfig(newOAuthConfig); + this.jwtSubjectIssuersConfig = newIssuersConfig; + if (dittoPublicKeyProvider != null) { + dittoPublicKeyProvider.updateConfig(newIssuersConfig, newOAuthConfig); + } + } + } diff --git a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/security/authorization/NamespaceAccessValidatorFactory.java b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/security/authorization/NamespaceAccessValidatorFactory.java index 352de9e4a4d..25a1e8211bd 100644 --- a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/security/authorization/NamespaceAccessValidatorFactory.java +++ b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/security/authorization/NamespaceAccessValidatorFactory.java @@ -16,8 +16,6 @@ import java.util.Optional; import javax.annotation.Nullable; -import javax.annotation.concurrent.Immutable; - import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition; import org.eclipse.ditto.base.model.headers.DittoHeaders; import org.eclipse.ditto.gateway.service.util.config.security.NamespaceAccessConfig; @@ -27,14 +25,22 @@ /** * Factory for creating {@link NamespaceAccessValidator} instances with per-request context. */ -@Immutable @AllValuesAreNonnullByDefault public final class NamespaceAccessValidatorFactory { - private final List namespaceAccessConfigs; + private volatile List namespaceAccessConfigs; public NamespaceAccessValidatorFactory(final List namespaceAccessConfigs) { - this.namespaceAccessConfigs = namespaceAccessConfigs; + this.namespaceAccessConfigs = List.copyOf(namespaceAccessConfigs); + } + + /** + * Updates the namespace access configurations. Called when dynamic config changes are detected. + * + * @param namespaceAccessConfigs the new namespace access configs. + */ + public void updateNamespaceAccessConfigs(final List namespaceAccessConfigs) { + this.namespaceAccessConfigs = List.copyOf(namespaceAccessConfigs); } /** diff --git a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/starter/GatewayRootActor.java b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/starter/GatewayRootActor.java index 14670dd3e16..4caf8633a0d 100755 --- a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/starter/GatewayRootActor.java +++ b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/starter/GatewayRootActor.java @@ -34,6 +34,7 @@ import org.eclipse.ditto.edge.service.dispatching.ShardRegions; import org.eclipse.ditto.edge.service.headers.DittoHeadersValidator; import org.eclipse.ditto.gateway.service.endpoints.directives.auth.DevopsAuthenticationDirectiveFactory; +import org.eclipse.ditto.gateway.service.endpoints.directives.auth.DittoGatewayAuthenticationDirectiveFactory; import org.eclipse.ditto.gateway.service.endpoints.directives.auth.GatewayAuthenticationDirectiveFactory; import org.eclipse.ditto.gateway.service.endpoints.directives.auth.NamespaceAccessEnforcementDirective; import org.eclipse.ditto.gateway.service.security.authorization.NamespaceAccessValidatorFactory; @@ -63,6 +64,7 @@ import org.eclipse.ditto.gateway.service.security.authentication.jwt.JwtAuthenticationFactory; import org.eclipse.ditto.gateway.service.security.authentication.jwt.JwtAuthenticationResultProvider; import org.eclipse.ditto.gateway.service.streaming.actors.StreamingActor; +import org.eclipse.ditto.gateway.service.util.config.DittoGatewayConfig; import org.eclipse.ditto.gateway.service.util.config.GatewayConfig; import org.eclipse.ditto.gateway.service.util.config.endpoints.HttpConfig; import org.eclipse.ditto.gateway.service.util.config.health.HealthCheckConfig; @@ -78,7 +80,8 @@ import org.eclipse.ditto.internal.utils.health.HealthCheckingActorOptions; import org.eclipse.ditto.internal.utils.health.routes.StatusRoute; import org.eclipse.ditto.internal.utils.http.DefaultHttpClientFacade; -import org.eclipse.ditto.internal.utils.http.HttpClientFacade; +import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig; +import org.eclipse.ditto.internal.utils.pekko.config.DynamicConfigChanged; import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory; import org.eclipse.ditto.internal.utils.protocol.ProtocolAdapterProvider; import org.eclipse.ditto.internal.utils.pubsubthings.DittoProtocolSub; @@ -98,6 +101,11 @@ public final class GatewayRootActor extends DittoRootActor { private final DiagnosticLoggingAdapter log = DittoLoggerFactory.getDiagnosticLoggingAdapter(this); private final CompletionStage httpBinding; + private final JwtAuthenticationFactory jwtAuthenticationFactory; + private final JwtAuthenticationFactory devopsJwtAuthenticationFactory; + private final DevopsAuthenticationDirectiveFactory devopsAuthenticationDirectiveFactory; + @Nullable private final DittoGatewayAuthenticationDirectiveFactory dittoGatewayAuthDirectiveFactory; + private final NamespaceAccessValidatorFactory namespaceAccessValidatorFactory; @SuppressWarnings("unused") private GatewayRootActor(final GatewayConfig gatewayConfig, final ActorRef pubSubMediator) { @@ -125,29 +133,32 @@ private GatewayRootActor(final GatewayConfig gatewayConfig, final ActorRef pubSu DefaultHttpClientFacade.getInstance(actorSystem, authenticationConfig.getHttpProxyConfig()); final OAuthConfig oAuthConfig = authenticationConfig.getOAuthConfig(); - final JwtAuthenticationFactory jwtAuthenticationFactory = + final JwtAuthenticationFactory mainJwtAuthFactory = JwtAuthenticationFactory.newInstance(oAuthConfig, publicKeysConfig, httpClient, actorSystem); + this.jwtAuthenticationFactory = mainJwtAuthFactory; final JwtAuthenticationResultProvider jwtAuthenticationResultProvider = - jwtAuthenticationFactory.newJwtAuthenticationResultProvider(dittoExtensionConfig, null); + mainJwtAuthFactory.newJwtAuthenticationResultProvider(dittoExtensionConfig, null); final DevOpsConfig devOpsConfig = authenticationConfig.getDevOpsConfig(); - final DevopsAuthenticationDirectiveFactory devopsAuthenticationDirectiveFactory = - getDevopsAuthenticationDirectiveFactory(httpClient, publicKeysConfig, devOpsConfig, actorSystem, - dittoExtensionConfig); + final JwtAuthenticationFactory devopsJwtAuthFactory = + JwtAuthenticationFactory.newInstance(devOpsConfig.getOAuthConfig(), publicKeysConfig, httpClient, + actorSystem); + this.devopsJwtAuthenticationFactory = devopsJwtAuthFactory; + devopsAuthenticationDirectiveFactory = DevopsAuthenticationDirectiveFactory.newInstance( + devopsJwtAuthFactory, devOpsConfig, dittoExtensionConfig); final ProtocolAdapterProvider protocolAdapterProvider = ProtocolAdapterProvider.load(gatewayConfig.getProtocolConfig(), actorSystem); final HeaderTranslator headerTranslator = protocolAdapterProvider.getHttpHeaderTranslator(); final var namespaceAccessConfigs = authenticationConfig.getNamespaceAccessConfigs(); - final var namespaceAccessValidatorFactory = namespaceAccessConfigs.isEmpty() ? null : - new NamespaceAccessValidatorFactory(namespaceAccessConfigs); + namespaceAccessValidatorFactory = new NamespaceAccessValidatorFactory(namespaceAccessConfigs); final ActorRef streamingActor = startChildActor(StreamingActor.ACTOR_NAME, StreamingActor.props(dittoProtocolSub, proxyActor, - jwtAuthenticationFactory.getJwtValidator(), + mainJwtAuthFactory.getJwtValidator(), jwtAuthenticationResultProvider, gatewayConfig.getStreamingConfig(), headerTranslator, @@ -160,10 +171,17 @@ private GatewayRootActor(final GatewayConfig gatewayConfig, final ActorRef pubSu final ActorRef healthCheckActor = createHealthCheckActor(healthCheckConfig); final var hostname = getHostname(httpConfig); + final var gatewayAuthDirectiveFactory = + GatewayAuthenticationDirectiveFactory.get(actorSystem, dittoExtensionConfig); + dittoGatewayAuthDirectiveFactory = gatewayAuthDirectiveFactory + instanceof DittoGatewayAuthenticationDirectiveFactory dittoFactory ? dittoFactory : null; + final Route rootRoute = createRoute(actorSystem, gatewayConfig, proxyActor, streamingActor, - healthCheckActor, pubSubMediator, healthCheckConfig, jwtAuthenticationFactory, - devopsAuthenticationDirectiveFactory, protocolAdapterProvider, headerTranslator, - namespaceAccessValidatorFactory); + healthCheckActor, pubSubMediator, healthCheckConfig, mainJwtAuthFactory, + devopsAuthenticationDirectiveFactory, gatewayAuthDirectiveFactory, protocolAdapterProvider, + headerTranslator, namespaceAccessValidatorFactory); + + actorSystem.eventStream().subscribe(getSelf(), DynamicConfigChanged.class); httpBinding = Http.get(actorSystem) .newServerAt(hostname, httpConfig.getPort()) @@ -202,10 +220,41 @@ public Receive createReceive() { binding -> sender.tell(GatewayHttpReadinessCheck.READINESS_ASK_MESSAGE_RESPONSE, ActorRef.noSender())); }) + .match(DynamicConfigChanged.class, this::handleDynamicConfigChanged) .build() .orElse(super.createReceive()); } + private void handleDynamicConfigChanged(final DynamicConfigChanged configChanged) { + try { + log.info("Received DynamicConfigChanged (version <{}>), refreshing gateway authentication config.", + configChanged.version()); + final var dittoScopedConfig = DefaultScopedConfig.dittoScoped(configChanged.dittoConfig()); + final var gatewayConfig = DittoGatewayConfig.of(dittoScopedConfig); + final var authenticationConfig = gatewayConfig.getAuthenticationConfig(); + + // Update DevOps authentication config + devopsAuthenticationDirectiveFactory.updateDevOpsConfig(authenticationConfig.getDevOpsConfig()); + + // Update gateway authentication directive factory (pre-auth toggle) + if (dittoGatewayAuthDirectiveFactory != null) { + dittoGatewayAuthDirectiveFactory.updateAuthConfig(authenticationConfig); + } + + // Update namespace access configs + namespaceAccessValidatorFactory.updateNamespaceAccessConfigs( + authenticationConfig.getNamespaceAccessConfigs()); + + // Update JWT authentication factories (OAuth issuer config) + jwtAuthenticationFactory.updateOAuthConfig(authenticationConfig.getOAuthConfig()); + devopsJwtAuthenticationFactory.updateOAuthConfig( + authenticationConfig.getDevOpsConfig().getOAuthConfig()); + } catch (final Exception e) { + log.warning("Failed to apply DynamicConfigChanged (version <{}>), keeping previous config: {}", + configChanged.version(), e.getMessage()); + } + } + private static Route createRoute(final ActorSystem actorSystem, final GatewayConfig gatewayConfig, final ActorRef proxyActor, @@ -215,6 +264,7 @@ private static Route createRoute(final ActorSystem actorSystem, final HealthCheckConfig healthCheckConfig, final JwtAuthenticationFactory jwtAuthenticationFactory, final DevopsAuthenticationDirectiveFactory devopsAuthenticationDirectiveFactory, + final GatewayAuthenticationDirectiveFactory authenticationDirectiveFactory, final ProtocolAdapterProvider protocolAdapterProvider, final HeaderTranslator headerTranslator, @Nullable final NamespaceAccessValidatorFactory namespaceAccessValidatorFactory) { @@ -223,9 +273,6 @@ private static Route createRoute(final ActorSystem actorSystem, final var authConfig = gatewayConfig.getAuthenticationConfig(); final var materializer = SystemMaterializer.get(actorSystem).materializer(); - final var authenticationDirectiveFactory = - GatewayAuthenticationDirectiveFactory.get(actorSystem, dittoExtensionConfig); - final var devopsAuthenticationDirective = devopsAuthenticationDirectiveFactory.devops(); final var statusAuthenticationDirective = @@ -320,20 +367,6 @@ private ActorRef startGatewayProxyActor(final ActorRefFactory actorSystem, final GatewayProxyActor.props(pubSubMediator, devOpsCommandsActor, edgeCommandForwarder, httpConfig)); } - private static DevopsAuthenticationDirectiveFactory getDevopsAuthenticationDirectiveFactory( - final HttpClientFacade httpClient, - final CacheConfig publicKeysConfig, - final DevOpsConfig devOpsConfig, - final ActorSystem actorSystem, - final Config dittoExtensionConfig) { - final var devopsOauthConfig = devOpsConfig.getOAuthConfig(); - final var devopsJwtAuthenticationFactory = - JwtAuthenticationFactory.newInstance(devopsOauthConfig, publicKeysConfig, httpClient, actorSystem); - - return DevopsAuthenticationDirectiveFactory.newInstance(devopsJwtAuthenticationFactory, devOpsConfig, - dittoExtensionConfig); - } - private String getHostname(final HttpConfig httpConfig) { String hostname = httpConfig.getHostname(); if (hostname.isEmpty()) { diff --git a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/streaming/actors/StreamingActor.java b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/streaming/actors/StreamingActor.java index 6eb7985c451..fb3d992bd15 100755 --- a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/streaming/actors/StreamingActor.java +++ b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/streaming/actors/StreamingActor.java @@ -26,8 +26,10 @@ import org.eclipse.ditto.gateway.service.streaming.signals.Connect; import org.eclipse.ditto.gateway.service.util.config.streaming.DefaultStreamingConfig; import org.eclipse.ditto.gateway.service.util.config.streaming.StreamingConfig; +import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig; import org.eclipse.ditto.internal.utils.pekko.actors.ModifyConfigBehavior; import org.eclipse.ditto.internal.utils.pekko.actors.RetrieveConfigBehavior; +import org.eclipse.ditto.internal.utils.pekko.config.DynamicConfigChanged; import org.eclipse.ditto.internal.utils.pekko.logging.DittoDiagnosticLoggingAdapter; import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory; import org.eclipse.ditto.internal.utils.metrics.DittoMetrics; @@ -64,8 +66,11 @@ public final class StreamingActor extends AbstractActorWithTimers implements Ret private final Gauge streamingSessionsCounter; private final JwtValidator jwtValidator; private final JwtAuthenticationResultProvider jwtAuthenticationResultProvider; - private final Props subscriptionManagerProps; - private final Props streamingSubscriptionManagerProps; + private final ActorRef pubSubMediator; + private final ActorSelection commandForwarderSelection; + private final Materializer materializer; + private Props subscriptionManagerProps; + private Props streamingSubscriptionManagerProps; private final DittoDiagnosticLoggingAdapter logger = DittoLoggerFactory.getDiagnosticLoggingAdapter(this); private final HeaderTranslator headerTranslator; @Nullable @@ -102,15 +107,13 @@ private StreamingActor(final DittoProtocolSub dittoProtocolSub, this.headerTranslator = headerTranslator; this.namespaceAccessValidatorFactory = namespaceAccessValidatorFactory; streamingSessionsCounter = DittoMetrics.gauge("streaming_sessions_count"); - final ActorSelection commandForwarderSelection = ActorSelection.apply(commandForwarder, ""); - final Materializer materializer = Materializer.createMaterializer(getContext()); - subscriptionManagerProps = - SubscriptionManager.props(streamingConfig.getSearchIdleTimeout(), pubSubMediator, - commandForwarderSelection, materializer); - streamingSubscriptionManagerProps = - StreamingSubscriptionManager.props(streamingConfig.getSearchIdleTimeout(), - commandForwarderSelection, materializer); + this.pubSubMediator = pubSubMediator; + this.commandForwarderSelection = ActorSelection.apply(commandForwarder, ""); + this.materializer = Materializer.createMaterializer(getContext()); + regenerateSubscriptionManagerProps(); scheduleScrapeStreamSessionsCounter(); + + getContext().getSystem().eventStream().subscribe(getSelf(), DynamicConfigChanged.class); } /** @@ -149,7 +152,10 @@ public SupervisorStrategy supervisorStrategy() { @Override public Receive createReceive() { - return retrieveConfigBehavior() + return ReceiveBuilder.create() + .match(DynamicConfigChanged.class, this::handleDynamicConfigChanged) + .build() + .orElse(retrieveConfigBehavior()) .orElse(modifyConfigBehavior()) .orElse(createConnectAndMetricsBehavior()) .orElse(ReceiveBuilder.create() @@ -157,6 +163,20 @@ public Receive createReceive() { .build()); } + private void handleDynamicConfigChanged(final DynamicConfigChanged configChanged) { + try { + final String gatewayStreamingPath = "ditto.gateway." + StreamingConfig.CONFIG_PATH; + if (configChanged.dittoConfig().hasPath(gatewayStreamingPath)) { + logger.info("Received DynamicConfigChanged (version <{}>), refreshing StreamingConfig.", + configChanged.version()); + setConfig(configChanged.dittoConfig().getConfig(gatewayStreamingPath)); + } + } catch (final Exception e) { + logger.warning("Failed to apply DynamicConfigChanged (version <{}>), keeping previous config: {}", + configChanged.version(), e.getMessage()); + } + } + private Receive createConnectAndMetricsBehavior() { return ReceiveBuilder.create() .match(Connect.class, connect -> { @@ -183,11 +203,21 @@ public Config getConfig() { public Config setConfig(final Config config) { streamingConfig = DefaultStreamingConfig.of( config.atKey(StreamingConfig.CONFIG_PATH).withFallback(streamingConfig.render())); + regenerateSubscriptionManagerProps(); // reschedule scrapes: interval may have changed. scheduleScrapeStreamSessionsCounter(); return streamingConfig.render(); } + private void regenerateSubscriptionManagerProps() { + subscriptionManagerProps = + SubscriptionManager.props(streamingConfig.getSearchIdleTimeout(), pubSubMediator, + commandForwarderSelection, materializer); + streamingSubscriptionManagerProps = + StreamingSubscriptionManager.props(streamingConfig.getSearchIdleTimeout(), + commandForwarderSelection, materializer); + } + private String getUniqueChildActorName(final String suffix) { final int counter = ++childCounter; return String.format("%x-%s", counter, URLEncoder.encode(suffix, StandardCharsets.UTF_8)); diff --git a/internal/utils/config/src/main/resources/ditto-dynamic-config.conf b/internal/utils/config/src/main/resources/ditto-dynamic-config.conf new file mode 100644 index 00000000000..ad9ea78bc14 --- /dev/null +++ b/internal/utils/config/src/main/resources/ditto-dynamic-config.conf @@ -0,0 +1,10 @@ +ditto { + dynamic-config-watcher { + enabled = false + enabled = ${?DITTO_DYNAMIC_CONFIG_ENABLED} + file-path = "/opt/ditto/dynamic-config/dynamic.conf" + file-path = ${?DITTO_DYNAMIC_CONFIG_FILE_PATH} + poll-interval = 30s + poll-interval = ${?DITTO_DYNAMIC_CONFIG_POLL_INTERVAL} + } +} diff --git a/internal/utils/config/src/main/resources/ditto-service-base.conf b/internal/utils/config/src/main/resources/ditto-service-base.conf index 8fee34a5361..158bb709a48 100644 --- a/internal/utils/config/src/main/resources/ditto-service-base.conf +++ b/internal/utils/config/src/main/resources/ditto-service-base.conf @@ -17,6 +17,7 @@ include "ditto-mongo.conf" include "ditto-enforcement.conf" include "ditto-entity-creation.conf" include "ditto-things-aggregator.conf" +include "ditto-dynamic-config.conf" # extension point include "ditto-service-extension.conf" diff --git a/internal/utils/health/src/main/java/org/eclipse/ditto/internal/utils/health/AbstractBackgroundStreamingActorWithConfigWithStatusReport.java b/internal/utils/health/src/main/java/org/eclipse/ditto/internal/utils/health/AbstractBackgroundStreamingActorWithConfigWithStatusReport.java index abcd9bcba22..364298f245b 100644 --- a/internal/utils/health/src/main/java/org/eclipse/ditto/internal/utils/health/AbstractBackgroundStreamingActorWithConfigWithStatusReport.java +++ b/internal/utils/health/src/main/java/org/eclipse/ditto/internal/utils/health/AbstractBackgroundStreamingActorWithConfigWithStatusReport.java @@ -17,6 +17,7 @@ import java.util.ArrayDeque; import java.util.Collections; import java.util.Deque; +import java.util.Optional; import java.util.concurrent.CompletionStage; import java.util.stream.Stream; @@ -24,6 +25,7 @@ import org.eclipse.ditto.base.api.common.ShutdownResponse; import org.eclipse.ditto.internal.utils.pekko.actors.ModifyConfigBehavior; import org.eclipse.ditto.internal.utils.pekko.actors.RetrieveConfigBehavior; +import org.eclipse.ditto.internal.utils.pekko.config.DynamicConfigChanged; import org.eclipse.ditto.internal.utils.pekko.logging.DittoDiagnosticLoggingAdapter; import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory; import org.eclipse.ditto.internal.utils.config.DittoConfigError; @@ -92,6 +94,22 @@ protected AbstractBackgroundStreamingActorWithConfigWithStatusReport(final C con if (config.isEnabled()) { scheduleWakeUp(); } + + if (getDittoConfigPath().isPresent()) { + getContext().getSystem().eventStream().subscribe(getSelf(), DynamicConfigChanged.class); + } + } + + /** + * Returns the full ditto config path for this actor's config section + * (e.g., {@code "ditto.things-search.updater.background-sync"}). + * When present, the actor subscribes to {@link DynamicConfigChanged} events + * and refreshes its config automatically. + * + * @return the ditto config path, or empty to disable dynamic config reload. + */ + protected Optional getDittoConfigPath() { + return Optional.empty(); } /** @@ -182,6 +200,7 @@ public Config setConfig(final Config config) { private Receive sleeping() { final var sleepingReceiveBuilder = ReceiveBuilder.create(); + sleepingReceiveBuilder.match(DynamicConfigChanged.class, this::handleDynamicConfigChanged); preEnhanceSleepingBehavior(sleepingReceiveBuilder); return sleepingReceiveBuilder.match(WokeUp.class, this::wokeUp) @@ -196,6 +215,7 @@ private Receive sleeping() { private Receive streaming() { final var streamingReceiveBuilder = ReceiveBuilder.create(); + streamingReceiveBuilder.match(DynamicConfigChanged.class, this::handleDynamicConfigChanged); preEnhanceStreamingBehavior(streamingReceiveBuilder); return streamingReceiveBuilder @@ -209,6 +229,21 @@ private Receive streaming() { .orElse(modifyConfigBehavior()); } + private void handleDynamicConfigChanged(final DynamicConfigChanged configChanged) { + try { + getDittoConfigPath().ifPresent(dittoConfigPath -> { + if (configChanged.dittoConfig().hasPath(dittoConfigPath)) { + log.info("Received DynamicConfigChanged (version <{}>), refreshing config.", + configChanged.version()); + setConfig(configChanged.dittoConfig().getConfig(dittoConfigPath)); + } + }); + } catch (final Exception e) { + log.warning("Failed to apply DynamicConfigChanged (version <{}>), keeping previous config: {}", + configChanged.version(), e.getMessage()); + } + } + private void wokeUp(final WokeUp wokeUp) { log.info("Woke up."); enqueue(events, wokeUp.enable(config.isEnabled()), config.getKeptEvents()); diff --git a/internal/utils/pekko/src/main/java/org/eclipse/ditto/internal/utils/pekko/config/DefaultDynamicConfigWatcherConfig.java b/internal/utils/pekko/src/main/java/org/eclipse/ditto/internal/utils/pekko/config/DefaultDynamicConfigWatcherConfig.java new file mode 100644 index 00000000000..3b0eeae7e74 --- /dev/null +++ b/internal/utils/pekko/src/main/java/org/eclipse/ditto/internal/utils/pekko/config/DefaultDynamicConfigWatcherConfig.java @@ -0,0 +1,97 @@ +/* + * Copyright (c) 2026 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.ditto.internal.utils.pekko.config; + +import java.time.Duration; +import java.util.Objects; + +import com.typesafe.config.Config; + +/** + * Default implementation of {@link DynamicConfigWatcherConfig}. + */ +final class DefaultDynamicConfigWatcherConfig implements DynamicConfigWatcherConfig { + + private static final String CONFIG_PATH = "ditto.dynamic-config-watcher"; + private static final boolean DEFAULT_ENABLED = false; + private static final String DEFAULT_FILE_PATH = "/opt/ditto/dynamic-config/dynamic.conf"; + private static final Duration DEFAULT_POLL_INTERVAL = Duration.ofSeconds(30); + + private final boolean enabled; + private final String filePath; + private final Duration pollInterval; + + private DefaultDynamicConfigWatcherConfig(final boolean enabled, final String filePath, + final Duration pollInterval) { + this.enabled = enabled; + this.filePath = filePath; + this.pollInterval = pollInterval; + } + + /** + * Creates a new {@code DefaultDynamicConfigWatcherConfig} from the given raw config. + * + * @param rawConfig the raw config (typically the full ActorSystem config). + * @return the new instance. + */ + static DefaultDynamicConfigWatcherConfig of(final Config rawConfig) { + if (rawConfig.hasPath(CONFIG_PATH)) { + final Config watcherConfig = rawConfig.getConfig(CONFIG_PATH); + return new DefaultDynamicConfigWatcherConfig( + watcherConfig.hasPath("enabled") ? watcherConfig.getBoolean("enabled") : DEFAULT_ENABLED, + watcherConfig.hasPath("file-path") ? watcherConfig.getString("file-path") : DEFAULT_FILE_PATH, + watcherConfig.hasPath("poll-interval") ? watcherConfig.getDuration("poll-interval") : DEFAULT_POLL_INTERVAL + ); + } + return new DefaultDynamicConfigWatcherConfig(DEFAULT_ENABLED, DEFAULT_FILE_PATH, DEFAULT_POLL_INTERVAL); + } + + @Override + public boolean isEnabled() { + return enabled; + } + + @Override + public String getFilePath() { + return filePath; + } + + @Override + public Duration getPollInterval() { + return pollInterval; + } + + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + final DefaultDynamicConfigWatcherConfig that = (DefaultDynamicConfigWatcherConfig) o; + return enabled == that.enabled && + Objects.equals(filePath, that.filePath) && + Objects.equals(pollInterval, that.pollInterval); + } + + @Override + public int hashCode() { + return Objects.hash(enabled, filePath, pollInterval); + } + + @Override + public String toString() { + return getClass().getSimpleName() + " [" + + "enabled=" + enabled + + ", filePath=" + filePath + + ", pollInterval=" + pollInterval + + "]"; + } +} diff --git a/internal/utils/pekko/src/main/java/org/eclipse/ditto/internal/utils/pekko/config/DynamicConfigChanged.java b/internal/utils/pekko/src/main/java/org/eclipse/ditto/internal/utils/pekko/config/DynamicConfigChanged.java new file mode 100644 index 00000000000..979da1ef0c3 --- /dev/null +++ b/internal/utils/pekko/src/main/java/org/eclipse/ditto/internal/utils/pekko/config/DynamicConfigChanged.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2026 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.ditto.internal.utils.pekko.config; + +import java.time.Instant; + +import com.typesafe.config.Config; + +/** + * Event published on the Pekko EventStream when the dynamic configuration file has changed. + * + * @param dittoConfig the new merged {@code ditto.*} scoped config. + * @param previousDittoConfig the previous {@code ditto.*} scoped config. + * @param version the monotonically increasing config version. + * @param timestamp the instant when the change was detected. + */ +public record DynamicConfigChanged(Config dittoConfig, Config previousDittoConfig, + long version, Instant timestamp) { +} diff --git a/internal/utils/pekko/src/main/java/org/eclipse/ditto/internal/utils/pekko/config/DynamicConfigPoller.java b/internal/utils/pekko/src/main/java/org/eclipse/ditto/internal/utils/pekko/config/DynamicConfigPoller.java new file mode 100644 index 00000000000..26a92e584cc --- /dev/null +++ b/internal/utils/pekko/src/main/java/org/eclipse/ditto/internal/utils/pekko/config/DynamicConfigPoller.java @@ -0,0 +1,97 @@ +/* + * Copyright (c) 2026 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.ditto.internal.utils.pekko.config; + +import java.util.function.Function; + +import org.apache.pekko.actor.ActorSystem; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.typesafe.config.Config; + +/** + * Utility for lazily polling dynamic config changes via {@link DynamicConfigWatcherExtension}. + * Encapsulates version tracking, cached config parsing, and error handling. + * + *

Designed for sharded actors where subscribing to EventStream is not feasible due to the + * large number of instances. Instead, the config version is checked on demand (e.g., before + * accessing a config value), and the parsed result is shared across all pollers with the same + * cache key via {@link DynamicConfigWatcherExtension#getParsedConfig(String, Function)}.

+ * + *

Thread-safe: version and value are stored together in an immutable {@code Snapshot} record + * behind a single volatile reference, so this class can also be used from non-actor contexts + * (e.g., {@code PreEnforcer} extensions called from multiple threads).

+ * + * @param the parsed config type. + */ +public final class DynamicConfigPoller { + + private static final Logger LOGGER = LoggerFactory.getLogger(DynamicConfigPoller.class); + + private final DynamicConfigWatcherExtension extension; + private final String cacheKey; + private final Function parser; + + @SuppressWarnings("java:S3077") // volatile reference to immutable record is safe publication + private volatile Snapshot snapshot; + + private record Snapshot(long version, T value) {} + + private DynamicConfigPoller(final DynamicConfigWatcherExtension extension, final String cacheKey, + final Function parser, final T initialValue) { + this.extension = extension; + this.cacheKey = cacheKey; + this.parser = parser; + this.snapshot = new Snapshot<>(extension.getVersion(), initialValue); + } + + /** + * Creates a new poller. + * + * @param system the actor system. + * @param cacheKey a unique key for the parsed config cache (e.g., {@code "ThingConfigBundle"}). + * @param parser a function that parses the merged ditto config into the desired type. + * @param initialValue the initial config value (typically parsed at construction time). + * @param the parsed config type. + * @return the new poller. + */ + public static DynamicConfigPoller of(final ActorSystem system, final String cacheKey, + final Function parser, final T initialValue) { + return new DynamicConfigPoller<>(DynamicConfigWatcherExtension.get(system), cacheKey, parser, initialValue); + } + + /** + * Returns the current config value, refreshing from the extension if the version has changed. + * On parse failure, the previous value is retained and a warning is logged. + * + * @return the current (possibly refreshed) config value. + */ + public T get() { + final long currentVersion = extension.getVersion(); + final Snapshot current = this.snapshot; + if (currentVersion != current.version) { + try { + final T refreshed = extension.getParsedConfig(cacheKey, parser); + this.snapshot = new Snapshot<>(currentVersion, refreshed); + LOGGER.info("Refreshed config '{}' from dynamic config version <{}>.", cacheKey, currentVersion); + } catch (final Exception e) { + // update version even on failure to avoid retrying on every call + this.snapshot = new Snapshot<>(currentVersion, current.value); + LOGGER.warn("Failed to apply dynamic config version <{}> for '{}', keeping previous config: {}", + currentVersion, cacheKey, e.getMessage()); + } + } + return this.snapshot.value; + } +} diff --git a/internal/utils/pekko/src/main/java/org/eclipse/ditto/internal/utils/pekko/config/DynamicConfigWatcherActor.java b/internal/utils/pekko/src/main/java/org/eclipse/ditto/internal/utils/pekko/config/DynamicConfigWatcherActor.java new file mode 100644 index 00000000000..a317071ea09 --- /dev/null +++ b/internal/utils/pekko/src/main/java/org/eclipse/ditto/internal/utils/pekko/config/DynamicConfigWatcherActor.java @@ -0,0 +1,169 @@ +/* + * Copyright (c) 2026 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.ditto.internal.utils.pekko.config; + +import java.io.File; +import java.time.Instant; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.pekko.actor.AbstractActorWithTimers; +import org.apache.pekko.actor.Props; +import org.apache.pekko.event.Logging; +import org.apache.pekko.event.LoggingAdapter; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigObject; +import com.typesafe.config.ConfigValue; +import com.typesafe.config.ConfigValueType; + +/** + * Actor that periodically polls a dynamic config file on disk and updates the + * {@link DynamicConfigWatcherExtension} when changes are detected. + */ +final class DynamicConfigWatcherActor extends AbstractActorWithTimers { + + /** + * The name of this actor. + */ + static final String ACTOR_NAME = "dynamicConfigWatcher"; + + private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this); + + private final DynamicConfigWatcherExtension extension; + private final File configFile; + private final Config staticConfig; + + private long lastModified = 0L; + private Config previousDynamicConfig; + + @SuppressWarnings("unused") + private DynamicConfigWatcherActor(final DynamicConfigWatcherExtension extension, + final DynamicConfigWatcherConfig watcherConfig, + final Config staticConfig) { + this.extension = extension; + this.configFile = new File(watcherConfig.getFilePath()); + this.staticConfig = staticConfig; + this.previousDynamicConfig = ConfigFactory.empty(); + + getTimers().startTimerWithFixedDelay("poll", Poll.INSTANCE, watcherConfig.getPollInterval()); + } + + static Props props(final DynamicConfigWatcherExtension extension, + final DynamicConfigWatcherConfig watcherConfig, + final Config staticConfig) { + return Props.create(DynamicConfigWatcherActor.class, extension, watcherConfig, staticConfig); + } + + @Override + public Receive createReceive() { + return receiveBuilder() + .matchEquals(Poll.INSTANCE, this::onPoll) + .build(); + } + + private void onPoll(final Poll poll) { + if (!configFile.exists()) { + if (lastModified != 0L) { + log.info("Dynamic config file <{}> no longer exists, keeping current config.", configFile); + } + return; + } + + final long currentModified = configFile.lastModified(); + if (currentModified == lastModified) { + return; + } + + final Config dynamicConfig; + try { + dynamicConfig = ConfigFactory.parseFile(configFile); + } catch (final Exception e) { + log.warning("Failed to parse dynamic config file <{}>: {}. Keeping current config.", + configFile, e.getMessage()); + return; + } + + lastModified = currentModified; + + if (dynamicConfig.equals(previousDynamicConfig)) { + return; + } + + // Merge: dynamic overrides static, then scope to ditto.* + final Config merged = dynamicConfig.withFallback(staticConfig); + final Config previousMerged = extension.getDittoConfig(); + + if (merged.hasPath("ditto")) { + final Config newDittoConfig = merged.getConfig("ditto").atKey("ditto"); + + if (!newDittoConfig.equals(previousMerged)) { + final long newVersion = extension.updateConfig(newDittoConfig); + + logChangedKeys(previousMerged, newDittoConfig); + + final DynamicConfigChanged event = new DynamicConfigChanged( + newDittoConfig, previousMerged, newVersion, Instant.now()); + getContext().getSystem().eventStream().publish(event); + + log.info("Dynamic config updated to version <{}>.", newVersion); + } + } + + previousDynamicConfig = dynamicConfig; + } + + private void logChangedKeys(final Config previousConfig, final Config newConfig) { + final ConfigObject previousRoot = previousConfig.root(); + final ConfigObject newRoot = newConfig.root(); + final Set changedPaths = new TreeSet<>(); + collectChangedPaths(previousRoot, newRoot, "", changedPaths); + + if (!changedPaths.isEmpty()) { + log.info("Changed config paths: <{}>", changedPaths); + } + } + + private static void collectChangedPaths(final ConfigObject previous, final ConfigObject current, + final String prefix, final Set changedPaths) { + + final Set allKeys = new HashSet<>(); + allKeys.addAll(previous.keySet()); + allKeys.addAll(current.keySet()); + + for (final String key : allKeys) { + final String fullPath = prefix.isEmpty() ? key : prefix + "." + key; + final ConfigValue prevValue = previous.get(key); + final ConfigValue currValue = current.get(key); + + if (prevValue == null) { + changedPaths.add(fullPath + " (added)"); + } else if (currValue == null) { + changedPaths.add(fullPath + " (removed)"); + } else if (prevValue.valueType() == ConfigValueType.OBJECT && + currValue.valueType() == ConfigValueType.OBJECT) { + collectChangedPaths((ConfigObject) prevValue, (ConfigObject) currValue, + fullPath, changedPaths); + } else if (!prevValue.equals(currValue)) { + changedPaths.add(fullPath); + } + } + } + + private enum Poll { + INSTANCE + } +} diff --git a/internal/utils/pekko/src/main/java/org/eclipse/ditto/internal/utils/pekko/config/DynamicConfigWatcherConfig.java b/internal/utils/pekko/src/main/java/org/eclipse/ditto/internal/utils/pekko/config/DynamicConfigWatcherConfig.java new file mode 100644 index 00000000000..2641bd67485 --- /dev/null +++ b/internal/utils/pekko/src/main/java/org/eclipse/ditto/internal/utils/pekko/config/DynamicConfigWatcherConfig.java @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2026 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.ditto.internal.utils.pekko.config; + +import java.time.Duration; + +/** + * Configuration for the dynamic config file watcher. + */ +public interface DynamicConfigWatcherConfig { + + /** + * @return whether dynamic config watching is enabled. + */ + boolean isEnabled(); + + /** + * @return the path to the dynamic config file on disk. + */ + String getFilePath(); + + /** + * @return the interval at which the config file is polled for changes. + */ + Duration getPollInterval(); +} diff --git a/internal/utils/pekko/src/main/java/org/eclipse/ditto/internal/utils/pekko/config/DynamicConfigWatcherExtension.java b/internal/utils/pekko/src/main/java/org/eclipse/ditto/internal/utils/pekko/config/DynamicConfigWatcherExtension.java new file mode 100644 index 00000000000..a230f7fd70e --- /dev/null +++ b/internal/utils/pekko/src/main/java/org/eclipse/ditto/internal/utils/pekko/config/DynamicConfigWatcherExtension.java @@ -0,0 +1,161 @@ +/* + * Copyright (c) 2026 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.ditto.internal.utils.pekko.config; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; + +import org.apache.pekko.actor.AbstractExtensionId; +import org.apache.pekko.actor.ActorSystem; +import org.apache.pekko.actor.ExtendedActorSystem; +import org.apache.pekko.actor.Extension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.typesafe.config.Config; + +/** + * Pekko Extension that holds the current merged Ditto configuration and manages the + * {@link DynamicConfigWatcherActor} for hot-reloading configuration from a file on disk. + * + *

Actors can read the current config via {@link #getDittoConfig()} at any time. + * The returned config always reflects the latest merged state of static + dynamic config. + * Use {@link #getVersion()} for cheap cache invalidation checks.

+ */ +public final class DynamicConfigWatcherExtension implements Extension { + + private static final Logger LOGGER = LoggerFactory.getLogger(DynamicConfigWatcherExtension.class); + private static final ExtensionId EXTENSION_ID = new ExtensionId(); + + private final AtomicReference mergedDittoConfig; + private final AtomicLong version; + private final ConcurrentHashMap> parsedConfigCache; + private final boolean enabled; + + private DynamicConfigWatcherExtension(final ExtendedActorSystem system) { + final Config rawConfig = system.settings().config(); + + // Initialize with static ditto.* config + final Config staticDittoConfig; + if (rawConfig.hasPath("ditto")) { + staticDittoConfig = rawConfig.getConfig("ditto").atKey("ditto"); + } else { + staticDittoConfig = com.typesafe.config.ConfigFactory.empty(); + } + this.mergedDittoConfig = new AtomicReference<>(staticDittoConfig); + this.version = new AtomicLong(0L); + this.parsedConfigCache = new ConcurrentHashMap<>(); + + final DynamicConfigWatcherConfig watcherConfig = DefaultDynamicConfigWatcherConfig.of(rawConfig); + this.enabled = watcherConfig.isEnabled(); + + if (enabled) { + LOGGER.info("Dynamic config watcher is enabled, watching file <{}> every <{}>.", + watcherConfig.getFilePath(), watcherConfig.getPollInterval()); + system.actorOf( + DynamicConfigWatcherActor.props(this, watcherConfig, rawConfig), + DynamicConfigWatcherActor.ACTOR_NAME + ); + } else { + LOGGER.info("Dynamic config watcher is disabled."); + } + } + + /** + * Get this extension from the given actor system. + * + * @param system the actor system. + * @return the extension instance. + */ + public static DynamicConfigWatcherExtension get(final ActorSystem system) { + return EXTENSION_ID.get(system); + } + + /** + * Returns the current merged {@code ditto.*} configuration. This always returns the latest + * merged config (static config overridden by dynamic config file). + * + * @return the current ditto-scoped config. + */ + public Config getDittoConfig() { + return mergedDittoConfig.get(); + } + + /** + * Returns the current config version. Starts at 0 and is incremented on each reload. + * Useful for cheap cache invalidation: compare with a locally stored version number. + * + * @return the current version. + */ + public long getVersion() { + return version.get(); + } + + /** + * @return whether dynamic config file watching is active. + */ + public boolean isEnabled() { + return enabled; + } + + /** + * Returns a parsed config object for the given key, cached per config version. Only the first caller + * after a version change actually invokes the parser; all subsequent callers sharing the same key + * get the cached result. This avoids duplicating parsed config objects across hundreds of thousands + * of sharded actors. + * + * @param key a unique key identifying the parsed config type (e.g., {@code "ThingConfig"}). + * @param parser a function that parses the merged ditto config into the desired type. + * @param the parsed config type. + * @return the parsed config, shared across all callers with the same key and version. + */ + @SuppressWarnings("unchecked") + public T getParsedConfig(final String key, final Function parser) { + final long currentVersion = version.get(); + final VersionedValue cached = parsedConfigCache.get(key); + if (cached != null && cached.version == currentVersion) { + return (T) cached.value; + } + // On version change, compute atomically per key — only one thread parses + final VersionedValue computed = parsedConfigCache.compute(key, (k, existing) -> { + if (existing != null && existing.version == currentVersion) { + return existing; + } + return new VersionedValue<>(currentVersion, parser.apply(getDittoConfig())); + }); + return (T) computed.value; + } + + /** + * Updates the merged config and increments the version. Called by {@link DynamicConfigWatcherActor}. + * + * @param newDittoConfig the new merged ditto config. + * @return the new version number. + */ + long updateConfig(final Config newDittoConfig) { + mergedDittoConfig.set(newDittoConfig); + return version.incrementAndGet(); + } + + private record VersionedValue(long version, T value) {} + + private static final class ExtensionId extends AbstractExtensionId { + + @Override + public DynamicConfigWatcherExtension createExtension(final ExtendedActorSystem system) { + return new DynamicConfigWatcherExtension(system); + } + } +} diff --git a/internal/utils/pekko/src/test/java/org/eclipse/ditto/internal/utils/pekko/config/DynamicConfigPollerTest.java b/internal/utils/pekko/src/test/java/org/eclipse/ditto/internal/utils/pekko/config/DynamicConfigPollerTest.java new file mode 100644 index 00000000000..09e294f88a0 --- /dev/null +++ b/internal/utils/pekko/src/test/java/org/eclipse/ditto/internal/utils/pekko/config/DynamicConfigPollerTest.java @@ -0,0 +1,155 @@ +/* + * Copyright (c) 2026 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.ditto.internal.utils.pekko.config; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.pekko.actor.ActorSystem; +import org.apache.pekko.testkit.javadsl.TestKit; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.typesafe.config.ConfigFactory; + +/** + * Tests {@link DynamicConfigPoller}. + */ +public final class DynamicConfigPollerTest { + + private ActorSystem system; + private DynamicConfigWatcherExtension extension; + + @Before + public void setup() { + system = ActorSystem.create("DynamicConfigPollerTest", + ConfigFactory.parseString("ditto.things.thing.activity-check.inactive-interval = 2h")); + extension = DynamicConfigWatcherExtension.get(system); + } + + @After + public void tearDown() { + if (system != null) { + TestKit.shutdownActorSystem(system); + system = null; + } + } + + @Test + public void getReturnsInitialValue() { + final DynamicConfigPoller poller = + DynamicConfigPoller.of(system, "test", config -> "parsed", "initial"); + + assertThat(poller.get()).isEqualTo("initial"); + } + + @Test + public void getReturnsSameInstanceWhenVersionUnchanged() { + final var initial = new Object(); + final DynamicConfigPoller poller = + DynamicConfigPoller.of(system, "test", config -> new Object(), initial); + + assertThat(poller.get()).isSameAs(initial); + assertThat(poller.get()).isSameAs(initial); + } + + @Test + public void getRefreshesAfterVersionChange() { + final DynamicConfigPoller poller = + DynamicConfigPoller.of(system, "testRefresh", config -> "refreshed", "initial"); + + assertThat(poller.get()).isEqualTo("initial"); + + // trigger version change + final var newConfig = ConfigFactory.parseString("ditto.things.thing.activity-check.inactive-interval = 5m") + .withFallback(system.settings().config()); + extension.updateConfig(newConfig.getConfig("ditto").atKey("ditto")); + + assertThat(poller.get()).isEqualTo("refreshed"); + } + + @Test + public void getKeepsPreviousValueOnParseFailure() { + final AtomicInteger callCount = new AtomicInteger(0); + final DynamicConfigPoller poller = + DynamicConfigPoller.of(system, "testFailure", config -> { + if (callCount.incrementAndGet() > 0) { + throw new RuntimeException("parse error"); + } + return "should-not-reach"; + }, "initial"); + + assertThat(poller.get()).isEqualTo("initial"); + + // trigger version change — parser will throw + final var newConfig = ConfigFactory.parseString("ditto.things.thing.activity-check.inactive-interval = 5m") + .withFallback(system.settings().config()); + extension.updateConfig(newConfig.getConfig("ditto").atKey("ditto")); + + // should keep initial value + assertThat(poller.get()).isEqualTo("initial"); + } + + @Test + public void getDoesNotRetryAfterFailureUntilNextVersionChange() { + final AtomicInteger callCount = new AtomicInteger(0); + final DynamicConfigPoller poller = + DynamicConfigPoller.of(system, "testNoRetry", config -> { + callCount.incrementAndGet(); + throw new RuntimeException("parse error"); + }, "initial"); + + // trigger version change + final var newConfig = ConfigFactory.parseString("ditto.things.thing.activity-check.inactive-interval = 5m") + .withFallback(system.settings().config()); + extension.updateConfig(newConfig.getConfig("ditto").atKey("ditto")); + + poller.get(); // first call after version change — triggers parse attempt + final int countAfterFirst = callCount.get(); + + poller.get(); // second call — same version, should NOT retry + assertThat(callCount.get()).isEqualTo(countAfterFirst); + } + + @Test + public void multiplePollersWithSameCacheKeyShareParsedResult() { + final AtomicInteger parseCount = new AtomicInteger(0); + final var parser = new java.util.function.Function() { + @Override + public String apply(final com.typesafe.config.Config config) { + parseCount.incrementAndGet(); + return "shared-" + extension.getVersion(); + } + }; + + final DynamicConfigPoller poller1 = + DynamicConfigPoller.of(system, "sharedKey", parser, "initial1"); + final DynamicConfigPoller poller2 = + DynamicConfigPoller.of(system, "sharedKey", parser, "initial2"); + + // trigger version change + final var newConfig = ConfigFactory.parseString("ditto.things.thing.activity-check.inactive-interval = 5m") + .withFallback(system.settings().config()); + extension.updateConfig(newConfig.getConfig("ditto").atKey("ditto")); + + final String result1 = poller1.get(); + final String result2 = poller2.get(); + + assertThat(result1).isEqualTo("shared-1"); + assertThat(result2).isEqualTo("shared-1"); + // parser should only be called once thanks to the extension's cache + assertThat(parseCount.get()).isEqualTo(1); + } +} diff --git a/internal/utils/pekko/src/test/java/org/eclipse/ditto/internal/utils/pekko/config/DynamicConfigWatcherActorTest.java b/internal/utils/pekko/src/test/java/org/eclipse/ditto/internal/utils/pekko/config/DynamicConfigWatcherActorTest.java new file mode 100644 index 00000000000..182efc083b8 --- /dev/null +++ b/internal/utils/pekko/src/test/java/org/eclipse/ditto/internal/utils/pekko/config/DynamicConfigWatcherActorTest.java @@ -0,0 +1,260 @@ +/* + * Copyright (c) 2026 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.ditto.internal.utils.pekko.config; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.time.Duration; + +import org.apache.pekko.actor.ActorRef; +import org.apache.pekko.actor.ActorSystem; +import org.apache.pekko.testkit.javadsl.TestKit; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +/** + * Tests {@link DynamicConfigWatcherActor}. + */ +public final class DynamicConfigWatcherActorTest { + + @Rule + public final TemporaryFolder tempFolder = new TemporaryFolder(); + + private ActorSystem system; + private DynamicConfigWatcherExtension extension; + + @Before + public void setup() { + system = ActorSystem.create("DynamicConfigWatcherActorTest", + ConfigFactory.parseString("ditto.things.thing.activity-check.inactive-interval = 2h") + .withFallback(ConfigFactory.empty())); + extension = DynamicConfigWatcherExtension.get(system); + } + + @After + public void tearDown() { + if (system != null) { + TestKit.shutdownActorSystem(system); + system = null; + } + } + + @Test + public void fileNotFoundDoesNotCauseError() { + new TestKit(system) {{ + final File nonExistentFile = new File(tempFolder.getRoot(), "nonexistent.conf"); + final DynamicConfigWatcherConfig config = createConfig(nonExistentFile, Duration.ofMillis(100)); + + system.actorOf(DynamicConfigWatcherActor.props(extension, config, + system.settings().config())); + + // Subscribe to config change events + system.eventStream().subscribe(getRef(), DynamicConfigChanged.class); + + // Wait and verify no event is published + expectNoMessage(Duration.ofMillis(500)); + assertThat(extension.getVersion()).isEqualTo(0L); + }}; + } + + @Test + public void firstDetectionPublishesEvent() throws IOException { + new TestKit(system) {{ + final File configFile = tempFolder.newFile("dynamic.conf"); + Files.writeString(configFile.toPath(), + "ditto.things.thing.activity-check.inactive-interval = 5m"); + + final DynamicConfigWatcherConfig config = createConfig(configFile, Duration.ofMillis(100)); + + system.eventStream().subscribe(getRef(), DynamicConfigChanged.class); + system.actorOf(DynamicConfigWatcherActor.props(extension, config, + system.settings().config())); + + final DynamicConfigChanged event = expectMsgClass(Duration.ofSeconds(5), + DynamicConfigChanged.class); + assertThat(event.version()).isEqualTo(1L); + assertThat(event.dittoConfig().hasPath("ditto.things.thing.activity-check.inactive-interval")) + .isTrue(); + assertThat(extension.getVersion()).isEqualTo(1L); + }}; + } + + @Test + public void unchangedFileDoesNotProduceDuplicateEvent() throws IOException { + new TestKit(system) {{ + final File configFile = tempFolder.newFile("dynamic.conf"); + Files.writeString(configFile.toPath(), + "ditto.things.thing.activity-check.inactive-interval = 5m"); + + final DynamicConfigWatcherConfig config = createConfig(configFile, Duration.ofMillis(100)); + + system.eventStream().subscribe(getRef(), DynamicConfigChanged.class); + system.actorOf(DynamicConfigWatcherActor.props(extension, config, + system.settings().config())); + + // First event + expectMsgClass(Duration.ofSeconds(5), DynamicConfigChanged.class); + + // No more events since file hasn't changed + expectNoMessage(Duration.ofMillis(500)); + assertThat(extension.getVersion()).isEqualTo(1L); + }}; + } + + @Test + public void fileChangedProducesNewEvent() throws IOException, InterruptedException { + new TestKit(system) {{ + final File configFile = tempFolder.newFile("dynamic.conf"); + Files.writeString(configFile.toPath(), + "ditto.things.thing.activity-check.inactive-interval = 5m"); + + final DynamicConfigWatcherConfig config = createConfig(configFile, Duration.ofMillis(100)); + + system.eventStream().subscribe(getRef(), DynamicConfigChanged.class); + system.actorOf(DynamicConfigWatcherActor.props(extension, config, + system.settings().config())); + + // First event + expectMsgClass(Duration.ofSeconds(5), DynamicConfigChanged.class); + + // Ensure file modification time changes (some filesystems have 1s granularity) + Thread.sleep(1100); + Files.writeString(configFile.toPath(), + "ditto.things.thing.activity-check.inactive-interval = 10m"); + + // Second event + final DynamicConfigChanged event2 = expectMsgClass(Duration.ofSeconds(5), + DynamicConfigChanged.class); + assertThat(event2.version()).isEqualTo(2L); + assertThat(extension.getVersion()).isEqualTo(2L); + }}; + } + + @Test + public void invalidHoconKeepsOldConfig() throws IOException, InterruptedException { + new TestKit(system) {{ + final File configFile = tempFolder.newFile("dynamic.conf"); + Files.writeString(configFile.toPath(), + "ditto.things.thing.activity-check.inactive-interval = 5m"); + + final DynamicConfigWatcherConfig config = createConfig(configFile, Duration.ofMillis(100)); + + system.eventStream().subscribe(getRef(), DynamicConfigChanged.class); + system.actorOf(DynamicConfigWatcherActor.props(extension, config, + system.settings().config())); + + // First valid event + expectMsgClass(Duration.ofSeconds(5), DynamicConfigChanged.class); + final Config configAfterFirst = extension.getDittoConfig(); + + // Write invalid HOCON + Thread.sleep(1100); + Files.writeString(configFile.toPath(), "this is { not valid hocon"); + + // No new event, config unchanged + expectNoMessage(Duration.ofMillis(500)); + assertThat(extension.getDittoConfig()).isEqualTo(configAfterFirst); + assertThat(extension.getVersion()).isEqualTo(1L); + }}; + } + + @Test + public void invalidHoconIsRetriedOnNextPollAfterBeingFixed() throws IOException, InterruptedException { + new TestKit(system) {{ + final File configFile = tempFolder.newFile("dynamic.conf"); + Files.writeString(configFile.toPath(), + "ditto.things.thing.activity-check.inactive-interval = 5m"); + + final DynamicConfigWatcherConfig config = createConfig(configFile, Duration.ofMillis(100)); + + system.eventStream().subscribe(getRef(), DynamicConfigChanged.class); + system.actorOf(DynamicConfigWatcherActor.props(extension, config, + system.settings().config())); + + // First valid event + expectMsgClass(Duration.ofSeconds(5), DynamicConfigChanged.class); + + // Write invalid HOCON + Thread.sleep(1100); + Files.writeString(configFile.toPath(), "this is { not valid hocon"); + + // No event for invalid content + expectNoMessage(Duration.ofMillis(500)); + + // Fix the file (without changing modification time — the retry should still work + // because lastModified was NOT updated after the parse failure) + Files.writeString(configFile.toPath(), + "ditto.things.thing.activity-check.inactive-interval = 10m"); + + // The fixed file should be picked up on the next poll + final DynamicConfigChanged event = expectMsgClass(Duration.ofSeconds(5), + DynamicConfigChanged.class); + assertThat(event.version()).isEqualTo(2L); + }}; + } + + @Test + public void fileDeletedAfterBeingPresentDoesNotCrash() throws IOException, InterruptedException { + new TestKit(system) {{ + final File configFile = tempFolder.newFile("dynamic.conf"); + Files.writeString(configFile.toPath(), + "ditto.things.thing.activity-check.inactive-interval = 5m"); + + final DynamicConfigWatcherConfig config = createConfig(configFile, Duration.ofMillis(100)); + + system.eventStream().subscribe(getRef(), DynamicConfigChanged.class); + final ActorRef watcher = system.actorOf(DynamicConfigWatcherActor.props(extension, config, + system.settings().config())); + + // First event + expectMsgClass(Duration.ofSeconds(5), DynamicConfigChanged.class); + + // Delete file + configFile.delete(); + + // No crash, no event + expectNoMessage(Duration.ofMillis(500)); + + // Actor is still alive + assertThat(watcher.isTerminated()).isFalse(); + }}; + } + + private DynamicConfigWatcherConfig createConfig(final File file, final Duration pollInterval) { + return new DynamicConfigWatcherConfig() { + @Override + public boolean isEnabled() { + return true; + } + + @Override + public String getFilePath() { + return file.getAbsolutePath(); + } + + @Override + public Duration getPollInterval() { + return pollInterval; + } + }; + } +} diff --git a/internal/utils/pekko/src/test/java/org/eclipse/ditto/internal/utils/pekko/config/DynamicConfigWatcherExtensionTest.java b/internal/utils/pekko/src/test/java/org/eclipse/ditto/internal/utils/pekko/config/DynamicConfigWatcherExtensionTest.java new file mode 100644 index 00000000000..15520e101b7 --- /dev/null +++ b/internal/utils/pekko/src/test/java/org/eclipse/ditto/internal/utils/pekko/config/DynamicConfigWatcherExtensionTest.java @@ -0,0 +1,141 @@ +/* + * Copyright (c) 2026 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.ditto.internal.utils.pekko.config; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.pekko.actor.ActorSystem; +import org.apache.pekko.testkit.javadsl.TestKit; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.typesafe.config.ConfigFactory; + +/** + * Tests {@link DynamicConfigWatcherExtension}. + */ +public final class DynamicConfigWatcherExtensionTest { + + private ActorSystem system; + + @Before + public void setup() { + system = ActorSystem.create("DynamicConfigWatcherExtensionTest", + ConfigFactory.parseString("ditto.things.thing.activity-check.inactive-interval = 2h")); + } + + @After + public void tearDown() { + if (system != null) { + TestKit.shutdownActorSystem(system); + system = null; + } + } + + @Test + public void getDittoConfigReturnsStaticConfigWhenDisabled() { + final DynamicConfigWatcherExtension extension = DynamicConfigWatcherExtension.get(system); + + assertThat(extension.isEnabled()).isFalse(); + assertThat(extension.getDittoConfig().hasPath("ditto")).isTrue(); + assertThat(extension.getDittoConfig().hasPath("ditto.things")).isTrue(); + } + + @Test + public void getVersionStartsAtZero() { + final DynamicConfigWatcherExtension extension = DynamicConfigWatcherExtension.get(system); + + assertThat(extension.getVersion()).isEqualTo(0L); + } + + @Test + public void extensionIsSingleton() { + final DynamicConfigWatcherExtension extension1 = DynamicConfigWatcherExtension.get(system); + final DynamicConfigWatcherExtension extension2 = DynamicConfigWatcherExtension.get(system); + + assertThat(extension1).isSameAs(extension2); + } + + @Test + public void updateConfigIncrementsVersion() { + final DynamicConfigWatcherExtension extension = DynamicConfigWatcherExtension.get(system); + + final var newConfig = ConfigFactory.parseString("ditto.things.thing.activity-check.inactive-interval = 5m") + .withFallback(system.settings().config()); + final var dittoConfig = newConfig.getConfig("ditto").atKey("ditto"); + + final long newVersion = extension.updateConfig(dittoConfig); + assertThat(newVersion).isEqualTo(1L); + assertThat(extension.getVersion()).isEqualTo(1L); + assertThat(extension.getDittoConfig()).isEqualTo(dittoConfig); + } + + @Test + public void getParsedConfigReturnsCachedValueForSameVersion() { + final DynamicConfigWatcherExtension extension = DynamicConfigWatcherExtension.get(system); + final AtomicInteger parseCount = new AtomicInteger(0); + + final String result1 = extension.getParsedConfig("testKey", config -> { + parseCount.incrementAndGet(); + return "parsed-" + extension.getVersion(); + }); + final String result2 = extension.getParsedConfig("testKey", config -> { + parseCount.incrementAndGet(); + return "parsed-" + extension.getVersion(); + }); + + assertThat(result1).isEqualTo("parsed-0"); + assertThat(result2).isSameAs(result1); + assertThat(parseCount.get()).isEqualTo(1); + } + + @Test + public void getParsedConfigInvalidatesCacheOnVersionChange() { + final DynamicConfigWatcherExtension extension = DynamicConfigWatcherExtension.get(system); + final AtomicInteger parseCount = new AtomicInteger(0); + + final String result1 = extension.getParsedConfig("testKey", config -> { + parseCount.incrementAndGet(); + return "parsed-" + extension.getVersion(); + }); + assertThat(result1).isEqualTo("parsed-0"); + + // Trigger version change + final var newConfig = ConfigFactory.parseString("ditto.things.thing.activity-check.inactive-interval = 5m") + .withFallback(system.settings().config()); + extension.updateConfig(newConfig.getConfig("ditto").atKey("ditto")); + + final String result2 = extension.getParsedConfig("testKey", config -> { + parseCount.incrementAndGet(); + return "parsed-" + extension.getVersion(); + }); + + assertThat(result2).isEqualTo("parsed-1"); + assertThat(result2).isNotSameAs(result1); + assertThat(parseCount.get()).isEqualTo(2); + } + + @Test + public void getParsedConfigDifferentKeysAreCachedIndependently() { + final DynamicConfigWatcherExtension extension = DynamicConfigWatcherExtension.get(system); + + final String resultA = extension.getParsedConfig("keyA", config -> "valueA"); + final String resultB = extension.getParsedConfig("keyB", config -> "valueB"); + + assertThat(resultA).isEqualTo("valueA"); + assertThat(resultB).isEqualTo("valueB"); + } +} diff --git a/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/cleanup/PersistenceCleanupActor.java b/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/cleanup/PersistenceCleanupActor.java index e65331d8d01..a7e4c0b857a 100644 --- a/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/cleanup/PersistenceCleanupActor.java +++ b/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/cleanup/PersistenceCleanupActor.java @@ -44,6 +44,7 @@ import org.eclipse.ditto.internal.utils.metrics.instruments.counter.Counter; import org.eclipse.ditto.internal.utils.pekko.actors.ModifyConfigBehavior; import org.eclipse.ditto.internal.utils.pekko.actors.RetrieveConfigBehavior; +import org.eclipse.ditto.internal.utils.pekko.config.DynamicConfigChanged; import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory; import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLoggingAdapter; import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal; @@ -84,6 +85,7 @@ public final class PersistenceCleanupActor extends AbstractFSM> responsibilitySupplier; + @Nullable private final String dittoConfigCleanupPath; private CleanupConfig config; private Cleanup cleanup; @@ -99,18 +101,24 @@ public final class PersistenceCleanupActor extends AbstractFSM running() { private FSMStateFunctionBuilder inAnyState() { return matchEvent(RetrieveHealth.class, this::retrieveHealth) + .event(DynamicConfigChanged.class, (configChanged, lastPid) -> { + try { + if (dittoConfigCleanupPath != null && + configChanged.dittoConfig().hasPath(dittoConfigCleanupPath)) { + logger.info("Received DynamicConfigChanged (version <{}>), refreshing CleanupConfig.", + configChanged.version()); + setConfig(configChanged.dittoConfig().getConfig(dittoConfigCleanupPath)); + } + } catch (final Exception e) { + logger.warning("Failed to apply DynamicConfigChanged (version <{}>), " + + "keeping previous config: {}", + configChanged.version(), e.getMessage()); + } + return stay(); + }) .event(RetrieveConfig.class, (retrieveConfig, lastPid) -> { retrieveConfigBehavior().onMessage().apply(retrieveConfig); diff --git a/policies/enforcement/src/main/java/org/eclipse/ditto/policies/enforcement/pre/CreationRestrictionPreEnforcer.java b/policies/enforcement/src/main/java/org/eclipse/ditto/policies/enforcement/pre/CreationRestrictionPreEnforcer.java index 8566415a1e5..c074f6d37ce 100644 --- a/policies/enforcement/src/main/java/org/eclipse/ditto/policies/enforcement/pre/CreationRestrictionPreEnforcer.java +++ b/policies/enforcement/src/main/java/org/eclipse/ditto/policies/enforcement/pre/CreationRestrictionPreEnforcer.java @@ -18,8 +18,6 @@ import java.util.concurrent.CompletionStage; import java.util.regex.Pattern; -import javax.annotation.concurrent.Immutable; - import org.apache.pekko.actor.ActorSystem; import org.eclipse.ditto.base.model.entity.id.NamespacedEntityId; import org.eclipse.ditto.base.model.entity.id.WithEntityId; @@ -27,6 +25,7 @@ import org.eclipse.ditto.base.model.headers.DittoHeaders; import org.eclipse.ditto.base.model.signals.Signal; import org.eclipse.ditto.base.model.signals.commands.Command; +import org.eclipse.ditto.internal.utils.pekko.config.DynamicConfigPoller; import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory; import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLogger; import org.eclipse.ditto.policies.enforcement.config.CreationRestrictionConfig; @@ -37,14 +36,14 @@ /** * Pre-Enforcer for evaluating if creation of new entities should be restricted. + * Supports dynamic configuration reload via {@link DynamicConfigPoller}. */ -@Immutable public class CreationRestrictionPreEnforcer implements PreEnforcer { protected static final ThreadSafeDittoLogger LOG = DittoLoggerFactory.getThreadSafeLogger(CreationRestrictionPreEnforcer.class); - private final EntityCreationConfig config; + private final DynamicConfigPoller configPoller; /** * Constructs a new instance of CreationRestrictionPreEnforcer extension. @@ -56,12 +55,19 @@ public class CreationRestrictionPreEnforcer list, final C context) { @@ -149,7 +155,7 @@ protected String getEntityNotCreatableDescription(final C context) { @Override public String toString() { return getClass().getSimpleName() + " [" + - "config=" + config + + "config=" + getConfig() + ']'; } @@ -179,7 +185,7 @@ private Signal handleCreatingCommand(final Signal signal) { } else { LOG.withCorrelationId(context.headers()) .info("Create command with context <{}> is not allowed to pass - entity-creation config was: " + - "{}", context, this.config); + "{}", context, getConfig()); throw EntityNotCreatableException.newBuilder(withEntityId.getEntityId()) .description(getEntityNotCreatableDescription(context)) .dittoHeaders(signal.getDittoHeaders()) diff --git a/policies/service/src/main/java/org/eclipse/ditto/policies/service/persistence/actors/PolicyPersistenceActor.java b/policies/service/src/main/java/org/eclipse/ditto/policies/service/persistence/actors/PolicyPersistenceActor.java index 6414fb5efdc..538f84d2a52 100755 --- a/policies/service/src/main/java/org/eclipse/ditto/policies/service/persistence/actors/PolicyPersistenceActor.java +++ b/policies/service/src/main/java/org/eclipse/ditto/policies/service/persistence/actors/PolicyPersistenceActor.java @@ -28,6 +28,7 @@ import org.eclipse.ditto.base.model.signals.commands.Command; import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess; import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig; +import org.eclipse.ditto.internal.utils.pekko.config.DynamicConfigPoller; import org.eclipse.ditto.internal.utils.persistence.mongo.config.ActivityCheckConfig; import org.eclipse.ditto.internal.utils.persistence.mongo.config.NamespaceActivityCheckConfigProvider; import org.eclipse.ditto.internal.utils.persistence.mongo.config.SnapshotConfig; @@ -72,8 +73,7 @@ public final class PolicyPersistenceActor static final String SNAPSHOT_PLUGIN_ID = "pekko-contrib-mongodb-persistence-policies-snapshots"; private final ActorRef pubSubMediator; - private final PolicyConfig policyConfig; - private final NamespaceActivityCheckConfigProvider activityCheckConfigProvider; + private final DynamicConfigPoller configPoller; private final ActorRef announcementManager; private final ActorRef supervisor; @@ -87,12 +87,16 @@ private PolicyPersistenceActor(final PolicyId policyId, super(policyId, mongoReadJournal); this.pubSubMediator = pubSubMediator; this.announcementManager = announcementManager; - this.policyConfig = policyConfig; - this.activityCheckConfigProvider = NamespaceActivityCheckConfigProvider.of( - policyConfig.getNamespaceActivityCheckConfigs(), - policyConfig.getActivityCheckConfig() - ); this.supervisor = getContext().getParent(); + this.configPoller = DynamicConfigPoller.of(getContext().getSystem(), "PolicyConfigBundle", + dittoConfig -> { + final var scoped = DefaultScopedConfig.dittoScoped(dittoConfig); + final var cfg = DittoPoliciesConfig.of(scoped).getPolicyConfig(); + return new PolicyConfigBundle(cfg, NamespaceActivityCheckConfigProvider.of( + cfg.getNamespaceActivityCheckConfigs(), cfg.getActivityCheckConfig())); + }, + new PolicyConfigBundle(policyConfig, NamespaceActivityCheckConfigProvider.of( + policyConfig.getNamespaceActivityCheckConfigs(), policyConfig.getActivityCheckConfig()))); } private PolicyPersistenceActor(final PolicyId policyId, @@ -109,11 +113,16 @@ private PolicyPersistenceActor(final PolicyId policyId, final DittoPoliciesConfig policiesConfig = DittoPoliciesConfig.of( DefaultScopedConfig.dittoScoped(getContext().getSystem().settings().config()) ); - this.policyConfig = policiesConfig.getPolicyConfig(); - this.activityCheckConfigProvider = NamespaceActivityCheckConfigProvider.of( - policyConfig.getNamespaceActivityCheckConfigs(), - policyConfig.getActivityCheckConfig() - ); + final PolicyConfig policyConfig = policiesConfig.getPolicyConfig(); + this.configPoller = DynamicConfigPoller.of(getContext().getSystem(), "PolicyConfigBundle", + dittoConfig -> { + final var scoped = DefaultScopedConfig.dittoScoped(dittoConfig); + final var cfg = DittoPoliciesConfig.of(scoped).getPolicyConfig(); + return new PolicyConfigBundle(cfg, NamespaceActivityCheckConfigProvider.of( + cfg.getNamespaceActivityCheckConfigs(), cfg.getActivityCheckConfig())); + }, + new PolicyConfigBundle(policyConfig, NamespaceActivityCheckConfigProvider.of( + policyConfig.getNamespaceActivityCheckConfigs(), policyConfig.getActivityCheckConfig()))); } /** @@ -174,12 +183,12 @@ protected CommandStrategy.Context getStrategyContext() { @Override protected PolicyCommandStrategies getCreatedStrategy() { - return PolicyCommandStrategies.getInstance(policyConfig, getContext().getSystem()); + return PolicyCommandStrategies.getInstance(configPoller.get().policyConfig, getContext().getSystem()); } @Override protected CommandStrategy, Policy, PolicyId, PolicyEvent> getDeletedStrategy() { - return PolicyCommandStrategies.getCreatePolicyStrategy(policyConfig); + return PolicyCommandStrategies.getCreatePolicyStrategy(configPoller.get().policyConfig); } @Override @@ -187,14 +196,9 @@ protected EventStrategy, Policy> getEventStrategy() { return PolicyEventStrategies.getInstance(); } - @Override - protected ActivityCheckConfig getActivityCheckConfig() { - return activityCheckConfigProvider.getConfigForNamespace(entityId.getNamespace()); - } - @Override protected SnapshotConfig getSnapshotConfig() { - return policyConfig.getSnapshotConfig(); + return configPoller.get().policyConfig.getSnapshotConfig(); } @Override @@ -217,6 +221,14 @@ protected DittoRuntimeExceptionBuilder newHistoryNotAccessibleExceptionBuilde return PolicyHistoryNotAccessibleException.newBuilder(entityId, timestamp); } + @Override + protected ActivityCheckConfig getActivityCheckConfig() { + return configPoller.get().activityCheckProvider.getConfigForNamespace(entityId.getNamespace()); + } + + private record PolicyConfigBundle(PolicyConfig policyConfig, + NamespaceActivityCheckConfigProvider activityCheckProvider) {} + @Override protected void publishEvent(@Nullable final Policy previousEntity, final PolicyEvent event) { diff --git a/policies/service/src/main/java/org/eclipse/ditto/policies/service/persistence/actors/PolicySupervisorActor.java b/policies/service/src/main/java/org/eclipse/ditto/policies/service/persistence/actors/PolicySupervisorActor.java index 546f0866a21..57ebce5c321 100755 --- a/policies/service/src/main/java/org/eclipse/ditto/policies/service/persistence/actors/PolicySupervisorActor.java +++ b/policies/service/src/main/java/org/eclipse/ditto/policies/service/persistence/actors/PolicySupervisorActor.java @@ -73,6 +73,9 @@ private PolicySupervisorActor(final ActorRef pubSubMediator, } else { announcementManager = getContext().getSystem().deadLetters(); } + + // PoliciesConfig is read at actor creation; updated config is picked up on entity re-activation + // after passivation by cluster sharding. } /** @@ -136,6 +139,7 @@ protected DittoRuntimeExceptionBuilder getUnavailableExceptionBuilder(@Nullab return PolicyUnavailableException.newBuilder(policyId); } + @Nullable private Props getAnnouncementManagerProps(final DistributedPub> pub, final PolicyAnnouncementConfig policyAnnouncementConfig) { diff --git a/policies/service/src/main/java/org/eclipse/ditto/policies/service/starter/PoliciesRootActor.java b/policies/service/src/main/java/org/eclipse/ditto/policies/service/starter/PoliciesRootActor.java index ce708bfefd2..ec0ab33eeb9 100755 --- a/policies/service/src/main/java/org/eclipse/ditto/policies/service/starter/PoliciesRootActor.java +++ b/policies/service/src/main/java/org/eclipse/ditto/policies/service/starter/PoliciesRootActor.java @@ -124,7 +124,8 @@ private PoliciesRootActor(final PoliciesConfig policiesConfig, final ActorRef pu } final var cleanupConfig = policiesConfig.getPolicyConfig().getCleanupConfig(); - final var cleanupActorProps = PersistenceCleanupActor.props(cleanupConfig, mongoReadJournal, CLUSTER_ROLE); + final var cleanupActorProps = PersistenceCleanupActor.props(cleanupConfig, mongoReadJournal, CLUSTER_ROLE, + "ditto.policies.policy.cleanup"); startChildActor(PersistenceCleanupActor.ACTOR_NAME, cleanupActorProps); final var healthCheckConfig = policiesConfig.getHealthCheckConfig(); diff --git a/things/service/src/main/java/org/eclipse/ditto/things/service/enforcement/ThingEnforcerActor.java b/things/service/src/main/java/org/eclipse/ditto/things/service/enforcement/ThingEnforcerActor.java index 26c3b7515c7..9a9c2a61e4c 100644 --- a/things/service/src/main/java/org/eclipse/ditto/things/service/enforcement/ThingEnforcerActor.java +++ b/things/service/src/main/java/org/eclipse/ditto/things/service/enforcement/ThingEnforcerActor.java @@ -146,6 +146,9 @@ private ThingEnforcerActor(final ThingId thingId, final DittoWotIntegration wotIntegration = DittoWotIntegration.get(system); thingModelValidator = wotIntegration.getWotThingModelValidator(); wotValidationExecutor = getContext().getSystem().dispatchers().lookup(DittoWotIntegration.WOT_DISPATCHER); + + // ThingsConfig is read at actor creation; updated config is picked up on entity re-activation + // after passivation by cluster sharding. } /** @@ -175,6 +178,7 @@ public static Props props(final ThingId thingId, ).withDispatcher(ENFORCEMENT_DISPATCHER); } + @Override protected CompletionStage> loadPolicyEnforcer(final Signal signal) { if (signal instanceof CreateThing createThing && !Signal.isChannelLive(createThing)) { diff --git a/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/ThingPersistenceActor.java b/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/ThingPersistenceActor.java index cfc9216149f..51c8e0c435c 100755 --- a/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/ThingPersistenceActor.java +++ b/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/ThingPersistenceActor.java @@ -60,6 +60,9 @@ import org.eclipse.ditto.things.model.signals.commands.query.RetrieveThing; import org.eclipse.ditto.things.model.signals.commands.query.ThingQueryCommandResponse; import org.eclipse.ditto.things.model.signals.events.ThingEvent; +import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig; +import org.eclipse.ditto.internal.utils.pekko.config.DynamicConfigPoller; +import org.eclipse.ditto.things.service.common.config.DittoThingsConfig; import org.eclipse.ditto.things.service.common.config.ThingConfig; import org.eclipse.ditto.things.service.persistence.actors.enrichment.EnrichSignalWithPreDefinedExtraFields; import org.eclipse.ditto.things.service.persistence.actors.enrichment.EnrichSignalWithPreDefinedExtraFieldsResponse; @@ -91,8 +94,7 @@ public final class ThingPersistenceActor private static final AckExtractor> ACK_EXTRACTOR = AckExtractor.of(ThingEvent::getEntityId, ThingEvent::getDittoHeaders); - private final ThingConfig thingConfig; - private final NamespaceActivityCheckConfigProvider activityCheckConfigProvider; + private final DynamicConfigPoller configPoller; private final DistributedPub> distributedPub; @Nullable private final ActorRef searchShardRegionProxy; private final ThingEventEnricher thingEventEnricher; @@ -106,17 +108,21 @@ private ThingPersistenceActor(final ThingId thingId, final PolicyEnforcerProvider policyEnforcerProvider) { super(thingId, mongoReadJournal); - this.thingConfig = thingConfig; - this.activityCheckConfigProvider = NamespaceActivityCheckConfigProvider.of( - thingConfig.getNamespaceActivityCheckConfigs(), - thingConfig.getActivityCheckConfig() - ); this.distributedPub = distributedPub; this.searchShardRegionProxy = searchShardRegionProxy; this.thingEventEnricher = new ThingEventEnricher( policyEnforcerProvider, thingConfig.getEventConfig().isPartialAccessEventsEnabled() ); + configPoller = DynamicConfigPoller.of(getContext().getSystem(), "ThingConfigBundle", + dittoConfig -> { + final var scoped = DefaultScopedConfig.dittoScoped(dittoConfig); + final var cfg = DittoThingsConfig.of(scoped).getThingConfig(); + return new ThingConfigBundle(cfg, NamespaceActivityCheckConfigProvider.of( + cfg.getNamespaceActivityCheckConfigs(), cfg.getActivityCheckConfig())); + }, + new ThingConfigBundle(thingConfig, NamespaceActivityCheckConfigProvider.of( + thingConfig.getNamespaceActivityCheckConfigs(), thingConfig.getActivityCheckConfig()))); } /** @@ -223,14 +229,9 @@ protected EventStrategy, Thing> getEventStrategy() { return ThingEventStrategies.getInstance(); } - @Override - protected ActivityCheckConfig getActivityCheckConfig() { - return activityCheckConfigProvider.getConfigForNamespace(entityId.getNamespace()); - } - @Override protected SnapshotConfig getSnapshotConfig() { - return thingConfig.getSnapshotConfig(); + return configPoller.get().thingConfig.getSnapshotConfig(); } @Override @@ -246,6 +247,14 @@ protected Receive matchAnyAfterInitialization() { .orElse(super.matchAnyAfterInitialization()); } + @Override + protected ActivityCheckConfig getActivityCheckConfig() { + return configPoller.get().activityCheckProvider.getConfigForNamespace(entityId.getNamespace()); + } + + private record ThingConfigBundle(ThingConfig thingConfig, + NamespaceActivityCheckConfigProvider activityCheckProvider) {} + @Override protected Receive matchAnyWhenDeleted() { return ReceiveBuilder.create() @@ -281,7 +290,7 @@ protected void recoveryCompleted(final RecoveryCompleted event) { @Override protected void publishEvent(@Nullable final Thing previousEntity, final ThingEvent event) { final CompletionStage> stage = thingEventEnricher.enrichWithPredefinedExtraFields( - thingConfig.getEventConfig().getPredefinedExtraFieldsConfigs(), + configPoller.get().thingConfig.getEventConfig().getPredefinedExtraFieldsConfigs(), entityId, entity, Optional.ofNullable(entity).flatMap(Thing::getPolicyId) @@ -341,7 +350,7 @@ private void enrichSignalWithPreDefinedExtraFields( switch (signal) { case MessageCommand messageCommand -> stage = thingEventEnricher.enrichWithPredefinedExtraFields( - thingConfig.getEventConfig().getPredefinedExtraFieldsConfigs(), + configPoller.get().thingConfig.getEventConfig().getPredefinedExtraFieldsConfigs(), entityId, entity, Optional.ofNullable(entity).flatMap(Thing::getPolicyId).orElse(null), @@ -349,7 +358,7 @@ private void enrichSignalWithPreDefinedExtraFields( ); case ThingEvent thingEvent -> stage = thingEventEnricher.enrichWithPredefinedExtraFields( - thingConfig.getEventConfig().getPredefinedExtraFieldsConfigs(), + configPoller.get().thingConfig.getEventConfig().getPredefinedExtraFieldsConfigs(), entityId, entity, Optional.ofNullable(entity).flatMap(Thing::getPolicyId).orElse(null), diff --git a/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/ThingSupervisorActor.java b/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/ThingSupervisorActor.java index 09c91745793..8f9a723b1a8 100755 --- a/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/ThingSupervisorActor.java +++ b/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/ThingSupervisorActor.java @@ -179,6 +179,9 @@ private ThingSupervisorActor(final ActorRef pubSubMediator, liveSignalPub, getContext(), thingPersistenceActorSelection, system); smartChannelDispatching = new SupervisorSmartChannelDispatching(log, thingPersistenceActorSelection, liveChannelDispatching); + + // ThingsConfig is read at actor creation; updated config is picked up on entity re-activation + // after passivation by cluster sharding. } /** diff --git a/things/service/src/main/java/org/eclipse/ditto/things/service/starter/ThingsRootActor.java b/things/service/src/main/java/org/eclipse/ditto/things/service/starter/ThingsRootActor.java index 734a5949c17..b450348e74f 100755 --- a/things/service/src/main/java/org/eclipse/ditto/things/service/starter/ThingsRootActor.java +++ b/things/service/src/main/java/org/eclipse/ditto/things/service/starter/ThingsRootActor.java @@ -191,7 +191,8 @@ private ThingsRootActor(final ThingsConfig thingsConfig, final ActorRef pubSubMe ThingsPersistenceStreamingActorCreator.startPersistenceStreamingActor(this::startChildActor); final var cleanupConfig = thingsConfig.getThingConfig().getCleanupConfig(); - final Props cleanupActorProps = PersistenceCleanupActor.props(cleanupConfig, mongoReadJournal, CLUSTER_ROLE); + final Props cleanupActorProps = PersistenceCleanupActor.props(cleanupConfig, mongoReadJournal, CLUSTER_ROLE, + "ditto.things.thing.cleanup"); startChildActor(PersistenceCleanupActor.ACTOR_NAME, cleanupActorProps); pubSubMediator.tell(DistPubSubAccess.put(getSelf()), getSelf()); diff --git a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/starter/actors/OperatorAggregateMetricsProviderActor.java b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/starter/actors/OperatorAggregateMetricsProviderActor.java index e3e208035ab..a6e22e7c427 100644 --- a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/starter/actors/OperatorAggregateMetricsProviderActor.java +++ b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/starter/actors/OperatorAggregateMetricsProviderActor.java @@ -33,12 +33,14 @@ import org.apache.pekko.actor.Status; import org.apache.pekko.japi.pf.ReceiveBuilder; import org.eclipse.ditto.base.model.headers.DittoHeaders; +import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig; import org.eclipse.ditto.internal.utils.cluster.ClusterUtil; import org.eclipse.ditto.internal.utils.metrics.instruments.gauge.Gauge; import org.eclipse.ditto.internal.utils.metrics.instruments.gauge.KamonGauge; import org.eclipse.ditto.internal.utils.metrics.instruments.tag.KamonTagSetConverter; import org.eclipse.ditto.internal.utils.metrics.instruments.tag.Tag; import org.eclipse.ditto.internal.utils.metrics.instruments.tag.TagSet; +import org.eclipse.ditto.internal.utils.pekko.config.DynamicConfigChanged; import org.eclipse.ditto.internal.utils.pekko.logging.DittoDiagnosticLoggingAdapter; import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory; import org.eclipse.ditto.internal.utils.persistence.mongo.DittoMongoClient; @@ -48,6 +50,7 @@ import org.eclipse.ditto.thingsearch.model.signals.commands.query.AggregateThingsMetricsResponse; import org.eclipse.ditto.thingsearch.service.common.config.CustomAggregationMetricConfig; import org.eclipse.ditto.thingsearch.service.common.config.OperatorMetricsConfig; +import org.eclipse.ditto.thingsearch.service.common.config.DittoSearchConfig; import org.eclipse.ditto.thingsearch.service.common.config.SearchConfig; import org.eclipse.ditto.thingsearch.service.persistence.read.MongoThingsAggregationPersistence; import org.eclipse.ditto.thingsearch.service.placeholders.GroupByPlaceholderResolver; @@ -71,21 +74,26 @@ public final class OperatorAggregateMetricsProviderActor extends AbstractActorWi private final DittoDiagnosticLoggingAdapter log = DittoLoggerFactory.getDiagnosticLoggingAdapter(this); private final ActorRef aggregateThingsMetricsActorSingletonProxy; - private final Map customSearchMetricConfigMap; private final Map metricsGauges; private final Gauge customSearchMetricsGauge; + private Map customSearchMetricConfigMap; + private OperatorMetricsConfig operatorMetricsConfig; + @SuppressWarnings("unused") private OperatorAggregateMetricsProviderActor(final SearchConfig searchConfig) { this.aggregateThingsMetricsActorSingletonProxy = initializeAggregationThingsMetricsActor(searchConfig); - this.customSearchMetricConfigMap = searchConfig.getOperatorMetricsConfig().getCustomAggregationMetricConfigs(); + this.operatorMetricsConfig = searchConfig.getOperatorMetricsConfig(); + this.customSearchMetricConfigMap = operatorMetricsConfig.getCustomAggregationMetricConfigs(); this.metricsGauges = new HashMap<>(); this.customSearchMetricsGauge = KamonGauge.newGauge("custom-aggregation-metrics-count-of-instruments"); this.customSearchMetricConfigMap.forEach( (metricName, customSearchMetricConfig) -> initializeCustomMetricTimer(metricName, customSearchMetricConfig, - searchConfig.getOperatorMetricsConfig().getScrapeInterval())); - initializeCustomMetricsCleanupTimer(searchConfig.getOperatorMetricsConfig()); + operatorMetricsConfig.getScrapeInterval())); + initializeCustomMetricsCleanupTimer(operatorMetricsConfig); + + getContext().getSystem().eventStream().subscribe(getSelf(), DynamicConfigChanged.class); } /** @@ -101,6 +109,7 @@ public static Props props(final SearchConfig searchConfig) { @Override public Receive createReceive() { return ReceiveBuilder.create() + .match(DynamicConfigChanged.class, this::handleDynamicConfigChanged) .match(GatherMetricsCommand.class, this::handleGatheringMetrics) .match(AggregateThingsMetricsResponse.class, this::handleAggregateThingsResponse) .match(CleanupUnusedMetricsCommand.class, this::handleCleanupUnusedMetrics) @@ -125,6 +134,50 @@ private ActorRef initializeAggregationThingsMetricsActor(final SearchConfig sear return aggregationThingsMetricsActorProxy; } + private void handleDynamicConfigChanged(final DynamicConfigChanged configChanged) { + try { + log.info("Received DynamicConfigChanged (version <{}>), reconciling custom aggregation metrics.", + configChanged.version()); + final var dittoScopedConfig = DefaultScopedConfig.dittoScoped(configChanged.dittoConfig()); + final var searchConfig = DittoSearchConfig.of(dittoScopedConfig); + final var newMetricsConfig = searchConfig.getOperatorMetricsConfig(); + final var newConfigs = newMetricsConfig.getCustomAggregationMetricConfigs(); + final var oldConfigs = this.customSearchMetricConfigMap; + + // Cancel timers for removed or disabled metrics + for (final String metricName : oldConfigs.keySet()) { + if (!newConfigs.containsKey(metricName) || !newConfigs.get(metricName).isEnabled()) { + log.info("Removing custom aggregation metric <{}> (removed or disabled in dynamic config).", + metricName); + getTimers().cancel(metricName); + } + } + + // Add or update metrics + newConfigs.forEach((metricName, config) -> { + if (config.isEnabled()) { + final CustomAggregationMetricConfig oldConfig = oldConfigs.get(metricName); + if (oldConfig == null || !oldConfig.equals(config) || !oldConfig.isEnabled()) { + log.info("Initializing/updating custom aggregation metric <{}> from dynamic config.", + metricName); + getTimers().cancel(metricName); + initializeCustomMetricTimer(metricName, config, + newMetricsConfig.getScrapeInterval()); + } + } + }); + + this.operatorMetricsConfig = newMetricsConfig; + this.customSearchMetricConfigMap = newConfigs; + + // Re-initialize cleanup timer with potentially new interval + initializeCustomMetricsCleanupTimer(newMetricsConfig); + } catch (final Exception e) { + log.warning("Failed to apply DynamicConfigChanged (version <{}>), keeping previous config: {}", + configChanged.version(), e.getMessage()); + } + } + private void handleGatheringMetrics(final GatherMetricsCommand gatherMetricsCommand) { final CustomAggregationMetricConfig config = gatherMetricsCommand.config(); final String metricName = config.getMetricName(); @@ -146,6 +199,11 @@ private void handleAggregateThingsResponse(final AggregateThingsMetricsResponse result.ifPresentOrElse(value -> { final CustomAggregationMetricConfig customAggregationMetricConfig = customSearchMetricConfigMap.get(metricName); + if (customAggregationMetricConfig == null) { + log.withCorrelationId(response) + .debug("Ignoring response for metric <{}> which was removed from config.", metricName); + return; + } final TagSet tagSet = resolveTags(customAggregationMetricConfig, response); log.withCorrelationId(response) .debug("Received aggregate things response for metric name <{} : {}>: {}, " + diff --git a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/starter/actors/OperatorMetricsProviderActor.java b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/starter/actors/OperatorMetricsProviderActor.java index a14046f6646..61c3eebbfaf 100644 --- a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/starter/actors/OperatorMetricsProviderActor.java +++ b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/starter/actors/OperatorMetricsProviderActor.java @@ -31,11 +31,14 @@ import org.eclipse.ditto.internal.utils.metrics.instruments.gauge.KamonGauge; import org.eclipse.ditto.internal.utils.metrics.instruments.tag.Tag; import org.eclipse.ditto.internal.utils.metrics.instruments.tag.TagSet; +import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig; +import org.eclipse.ditto.internal.utils.pekko.config.DynamicConfigChanged; import org.eclipse.ditto.internal.utils.pekko.logging.DittoDiagnosticLoggingAdapter; import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory; import org.eclipse.ditto.thingsearch.api.commands.sudo.SudoCountThings; import org.eclipse.ditto.thingsearch.model.signals.commands.query.CountThingsResponse; import org.eclipse.ditto.thingsearch.service.common.config.CustomMetricConfig; +import org.eclipse.ditto.thingsearch.service.common.config.DittoSearchConfig; import org.eclipse.ditto.thingsearch.service.common.config.OperatorMetricsConfig; /** @@ -57,20 +60,24 @@ public final class OperatorMetricsProviderActor extends AbstractActorWithTimers private final ActorRef searchActor; private final Map metricsGauges; + private OperatorMetricsConfig operatorMetricsConfig; @SuppressWarnings("unused") private OperatorMetricsProviderActor(final OperatorMetricsConfig operatorMetricsConfig, final ActorRef searchActor) { this.searchActor = searchActor; + this.operatorMetricsConfig = operatorMetricsConfig; metricsGauges = new HashMap<>(); operatorMetricsConfig.getCustomMetricConfigurations().forEach((metricName, config) -> { if (config.isEnabled()) { - initializeCustomMetric(operatorMetricsConfig, metricName, config); + initializeCustomMetric(operatorMetricsConfig.getScrapeInterval(), metricName, config); } else { log.info("Initializing custom metric Gauge for metric <{}> is DISABLED", metricName); } }); + + getContext().getSystem().eventStream().subscribe(getSelf(), DynamicConfigChanged.class); } /** @@ -87,7 +94,9 @@ public static Props props(final OperatorMetricsConfig operatorMetricsConfig, fin @Override public Receive createReceive() { return ReceiveBuilder.create() + .match(DynamicConfigChanged.class, this::handleDynamicConfigChanged) .match(GatherMetrics.class, this::handleGatheringMetrics) + .match(MetricCountResult.class, this::handleMetricCountResult) .match(Status.Failure.class, f -> log.error(f.cause(), "Got failure: {}", f)) .matchAny(m -> { log.warning("Unknown message: {}", m); @@ -96,14 +105,52 @@ public Receive createReceive() { .build(); } - private void initializeCustomMetric(final OperatorMetricsConfig operatorMetricsConfig, final String metricName, + private void handleDynamicConfigChanged(final DynamicConfigChanged configChanged) { + try { + log.info("Received DynamicConfigChanged (version <{}>), reconciling custom metrics.", + configChanged.version()); + final var dittoScopedConfig = DefaultScopedConfig.dittoScoped(configChanged.dittoConfig()); + final var searchConfig = DittoSearchConfig.of(dittoScopedConfig); + final var newMetricsConfig = searchConfig.getOperatorMetricsConfig(); + final var newConfigs = newMetricsConfig.getCustomMetricConfigurations(); + final var oldConfigs = operatorMetricsConfig.getCustomMetricConfigurations(); + + // Cancel timers for removed metrics and remove their gauges + for (final String metricName : oldConfigs.keySet()) { + if (!newConfigs.containsKey(metricName) || !newConfigs.get(metricName).isEnabled()) { + log.info("Removing custom metric <{}> (removed or disabled in dynamic config).", metricName); + getTimers().cancel(metricName); + metricsGauges.remove(metricName); + } + } + + // Add or update metrics + newConfigs.forEach((metricName, config) -> { + if (config.isEnabled()) { + final CustomMetricConfig oldConfig = oldConfigs.get(metricName); + if (oldConfig == null || !oldConfig.equals(config) || !oldConfig.isEnabled()) { + log.info("Initializing/updating custom metric <{}> from dynamic config.", metricName); + getTimers().cancel(metricName); + initializeCustomMetric(newMetricsConfig.getScrapeInterval(), metricName, config); + } + } + }); + + this.operatorMetricsConfig = newMetricsConfig; + } catch (final Exception e) { + log.warning("Failed to apply DynamicConfigChanged (version <{}>), keeping previous config: {}", + configChanged.version(), e.getMessage()); + } + } + + private void initializeCustomMetric(final Duration defaultScrapeInterval, final String metricName, final CustomMetricConfig config) { // start each custom metric provider with a random initialDelay final Duration initialDelay = Duration.ofSeconds( ThreadLocalRandom.current().nextInt(MIN_INITIAL_DELAY_SECONDS, MAX_INITIAL_DELAY_SECONDS) ); final Duration scrapeInterval = config.getScrapeInterval() - .orElse(operatorMetricsConfig.getScrapeInterval()); + .orElse(defaultScrapeInterval); getTimers().startTimerAtFixedRate( metricName, createGatherCustomMetric(metricName, config), initialDelay, scrapeInterval); @@ -137,29 +184,44 @@ private void handleGatheringMetrics(final GatherMetrics gatherMetrics) { log.withCorrelationId(dittoHeaders) .debug("Asking for count of custom metric <{}>..", metricName); - Patterns.ask(searchActor, sudoCountThings, Duration.ofSeconds(DEFAULT_COUNT_TIMEOUT_SECONDS)) - .whenComplete((response, throwable) -> { + final var askResult = Patterns.ask(searchActor, sudoCountThings, + Duration.ofSeconds(DEFAULT_COUNT_TIMEOUT_SECONDS)) + .thenApply(response -> { + final long durationMs = Duration.ofNanos(System.nanoTime() - startTs).toMillis(); if (response instanceof CountThingsResponse countThingsResponse) { - log.withCorrelationId(countThingsResponse) - .info("Received sudo CountThingsResponse for custom metric count <{}>: {} - " + - "duration: <{}ms>", - metricName, countThingsResponse.getCount(), - Duration.ofNanos(System.nanoTime() - startTs).toMillis() - ); - metricsGauges.get(metricName).set(countThingsResponse.getCount()); + return new MetricCountResult(metricName, countThingsResponse.getCount(), durationMs, + countThingsResponse.getDittoHeaders()); } else if (response instanceof DittoRuntimeException dre) { log.withCorrelationId(dittoHeaders).warning( "Received DittoRuntimeException when gathering count for " + "custom metric <{}>: {}", metricName, dre.getMessage(), dre ); + return new MetricCountResult(metricName, -1, durationMs, dittoHeaders); } else { - log.withCorrelationId(dittoHeaders).warning(throwable, - "Received unexpected result or throwable when gathering count for " + + log.withCorrelationId(dittoHeaders).warning( + "Received unexpected result when gathering count for " + "custom metric <{}>: {}", metricName, response ); + return new MetricCountResult(metricName, -1, durationMs, dittoHeaders); } }); + Patterns.pipe(askResult, getContext().getDispatcher()).to(getSelf()); + } + + private void handleMetricCountResult(final MetricCountResult result) { + if (result.count() >= 0) { + log.withCorrelationId(result.dittoHeaders()) + .info("Received sudo CountThingsResponse for custom metric count <{}>: {} - " + + "duration: <{}ms>", + result.metricName(), result.count(), result.durationMs()); + final Gauge gauge = metricsGauges.get(result.metricName()); + if (gauge != null) { + gauge.set(result.count()); + } + } } private record GatherMetrics(String metricName, CustomMetricConfig config) {} + + private record MetricCountResult(String metricName, long count, long durationMs, DittoHeaders dittoHeaders) {} } diff --git a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/updater/actors/BackgroundSyncActor.java b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/updater/actors/BackgroundSyncActor.java index ca3eaae394a..e658ab8071b 100644 --- a/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/updater/actors/BackgroundSyncActor.java +++ b/thingsearch/service/src/main/java/org/eclipse/ditto/thingsearch/service/updater/actors/BackgroundSyncActor.java @@ -155,6 +155,11 @@ protected void postEnhanceStatusReport(final JsonObjectBuilder statusReportBuild statusReportBuilder.set("progressIndexed", progressIndexed.toString()); } + @Override + protected Optional getDittoConfigPath() { + return Optional.of("ditto.things-search.updater.background-sync"); + } + @Override protected BackgroundSyncConfig parseConfig(final Config config) { return DefaultBackgroundSyncConfig.parse(config); diff --git a/wot/api/src/main/java/org/eclipse/ditto/wot/api/validator/DefaultWotThingModelValidator.java b/wot/api/src/main/java/org/eclipse/ditto/wot/api/validator/DefaultWotThingModelValidator.java index 814702f29b6..85149747b36 100644 --- a/wot/api/src/main/java/org/eclipse/ditto/wot/api/validator/DefaultWotThingModelValidator.java +++ b/wot/api/src/main/java/org/eclipse/ditto/wot/api/validator/DefaultWotThingModelValidator.java @@ -81,7 +81,7 @@ public final class DefaultWotThingModelValidator implements WotThingModelValidat private final Executor executor; @Nullable private final Cache jsonSchemaCache; - private TmValidationConfig dynamicConfig; + private volatile TmValidationConfig dynamicConfig; private DefaultWotThingModelValidator(final WotThingModelResolver thingModelResolver, final Executor executor,