From f26bbafe0f021bdd226f539ca202eee50018decf Mon Sep 17 00:00:00 2001 From: Andrey Balarev Date: Wed, 1 Apr 2026 16:23:15 +0300 Subject: [PATCH] Multiple topics with extraFields handling fix Signed-off-by: Andrey Balarev --- .../OutboundMappingProcessorActor.java | 252 ++++++++------ .../OutboundMappingProcessorActorTest.java | 317 +++++++++++++++--- 2 files changed, 427 insertions(+), 142 deletions(-) diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/OutboundMappingProcessorActor.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/OutboundMappingProcessorActor.java index 2d07124367..0b047fe49f 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/OutboundMappingProcessorActor.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/OutboundMappingProcessorActor.java @@ -70,6 +70,7 @@ import org.eclipse.ditto.connectivity.api.OutboundSignalFactory; import org.eclipse.ditto.connectivity.model.Connection; import org.eclipse.ditto.connectivity.model.FilteredTopic; +import org.eclipse.ditto.connectivity.model.Topic; import org.eclipse.ditto.connectivity.model.LogCategory; import org.eclipse.ditto.connectivity.model.LogType; import org.eclipse.ditto.connectivity.model.MetricDirection; @@ -103,7 +104,6 @@ import org.eclipse.ditto.json.JsonPointer; import org.eclipse.ditto.json.JsonValue; import org.eclipse.ditto.placeholders.ExpressionResolver; -import org.eclipse.ditto.placeholders.HeadersPlaceholder; import org.eclipse.ditto.placeholders.PipelineElement; import org.eclipse.ditto.placeholders.PlaceholderFactory; import org.eclipse.ditto.placeholders.PlaceholderResolver; @@ -149,7 +149,6 @@ public final class OutboundMappingProcessorActor private static final ResourcePlaceholder RESOURCE_PLACEHOLDER = ResourcePlaceholder.getInstance(); private static final TimePlaceholder TIME_PLACEHOLDER = TimePlaceholder.getInstance(); private static final ThingJsonPlaceholder THING_JSON_PLACEHOLDER = ThingJsonPlaceholder.getInstance(); - private static final HeadersPlaceholder HEADERS_PLACEHOLDER = PlaceholderFactory.newHeadersPlaceholder(); private final ActorRef clientActor; private final Connection connection; @@ -411,61 +410,54 @@ protected void messageDiscarded(final OutboundSignal message, final QueueOfferRe * Create a flow that splits 1 outbound signal into many as follows. *
    *
  1. - * Targets with matching filtered topics without extra fields are grouped into 1 outbound signal, followed by + * For each target without extra fields, it produces a pair of outbound signal and empty set of topics. + * As these targets have already passed pre-filtering in an early stage, no more filtering is needed.
  2. + *
  3. + * For each target containing any extra field in its topics, it produces a pair of outbound signal and a set of its target topics. + * As the filter could incLude extra fields, an additional filtering must be performed after extracting the extra fields. + * This filtering should pass the first match only to not duplicate the outbound signal for the same target. + * The outbound signal should be enriched with only those extra fields, which are listed in the topic matched the filter. *
  4. - *
  5. one outbound signal for each target with a matching filtered topic with extra fields.
  6. - *
- * The matching filtered topic is attached in the latter case. - * Consequently, for each outbound signal leaving this flow, if it has a filtered topic attached, - * then it has 1 unique target with a matching topic with extra fields. + * * This satisfies the precondition of {@code this#enrichAndFilterSignal}. * * @return the flow. */ - private static Flow, NotUsed> splitByTargetExtraFieldsFlow() { + private static Flow>, NotUsed> splitByTargetExtraFieldsFlow() { return Flow.create() .mapConcat(outboundSignal -> { - final Pair, List>> splitTargets = - splitTargetsByExtraFields(outboundSignal); - - final boolean shouldSendSignalWithoutExtraFields = - !splitTargets.first().isEmpty() || - isCommandResponseWithReplyTarget(outboundSignal.getSource()) || - outboundSignal.getTargets().isEmpty(); // no target - this is an error response - final Stream> outboundSignalWithoutExtraFields = - shouldSendSignalWithoutExtraFields - ? Stream.of(Pair.create(outboundSignal.setTargets(splitTargets.first()), null)) - : Stream.empty(); - - final Stream> outboundSignalWithExtraFields = - splitTargets.second().stream() - .map(targetAndSelector -> Pair.create( - outboundSignal.setTargets( - Collections.singletonList(targetAndSelector.first())), - targetAndSelector.second())); - - return Stream.concat(outboundSignalWithoutExtraFields, outboundSignalWithExtraFields).toList(); + final boolean shouldSendSignalDirectly = + isCommandResponseWithReplyTarget(outboundSignal.getSource()) || + outboundSignal.getTargets().isEmpty(); // no target - this is an error response + return shouldSendSignalDirectly + ? List.of(Pair.create(outboundSignal, Collections.emptySet())) + : pairTargetsWithTopics(outboundSignal).stream() + .map(targetAndSelector -> Pair.create( + outboundSignal.setTargets(Collections.singletonList(targetAndSelector.first())), + targetAndSelector.second())) + .toList(); }); } - // Called inside stream; must be thread-safe // precondition: whenever filteredTopic != null, it contains an extra fields private CompletionStage> enrichAndFilterSignal( - final Pair outboundSignalWithExtraFields) { - + final Pair> outboundSignalWithExtraFields) { final OutboundSignalWithSender outboundSignal = outboundSignalWithExtraFields.first(); - final FilteredTopic filteredTopic = outboundSignalWithExtraFields.second(); - final ExpressionResolver expressionResolver = - Resolvers.forSignal(outboundSignal.getSource(), connection.getId()); - final Optional extraFieldsOptional = getExtraFields(expressionResolver, filteredTopic); - if (extraFieldsOptional.isEmpty()) { + final Set topics = outboundSignalWithExtraFields.second(); + + List allExtraFields = topics.stream() + .map(FilteredTopic::getExtraFields) + .flatMap(Optional::stream) + .toList(); + boolean topicWithNoFilterNoExtraFieldsExists = topics.stream().anyMatch(topic -> topic.getFilter().isEmpty() && topic.getExtraFields().isEmpty()); + if (allExtraFields.isEmpty() || topicWithNoFilterNoExtraFieldsExists) { + // Pre-filtering already did the job return CompletableFuture.completedFuture(Collections.singletonList(outboundSignal)); } - final JsonFieldSelector extraFields = extraFieldsOptional.get(); - final Target target = outboundSignal.getTargets().get(0); - + boolean topicWithNoFilterExists = topics.stream().anyMatch(topic -> topic.getFilter().isEmpty()); + final Target target = outboundSignal.getTargets().getFirst(); final DittoHeaders headers = DittoHeaders.newBuilder() .authorizationContext(target.getAuthorizationContext()) // the correlation-id MUST NOT be set! as the DittoHeaders are used as a caching key in the Caffeine @@ -474,21 +466,40 @@ private CompletionStage> enrichAndFilterSig .schemaVersion(JsonSchemaVersion.LATEST) .build(); - return extractEntityId(outboundSignal.delegate.getSource()) + final ExpressionResolver expressionResolver = + Resolvers.forSignal(outboundSignal.getSource(), connection.getId()); + Optional allExtraFieldsOptional = getExtraFields(expressionResolver, allExtraFields); + + // Avoid multiple calls to 'retrievePartialThing' (for each topic with extra fields) by combining extra fields from all topics + Optional> partialThingOptional = extractEntityId(outboundSignal.delegate.getSource()) .filter(ThingId.class::isInstance) .map(ThingId.class::cast) - .map(thingId -> - signalEnrichmentFacade.retrievePartialThing( - thingId, - extraFields, - headers, - outboundSignal.getSource()) - ) - .map(partialThingCompletionStage -> partialThingCompletionStage - .thenApply(outboundSignal::setExtra) - .thenApply(outboundSignalWithExtra -> applyFilter(outboundSignalWithExtra, filteredTopic))) - .orElse(CompletableFuture.completedFuture(outboundSignal) - .thenApply(outboundSignalWithExtra -> applyFilter(outboundSignalWithExtra, filteredTopic))) + .flatMap(thingId -> allExtraFieldsOptional + .map(resolvedExtraFields -> + signalEnrichmentFacade.retrievePartialThing( + thingId, + resolvedExtraFields, + headers, + outboundSignal.getSource()))); + + return partialThingOptional + .map(partialThing -> partialThing + .>thenApply(extra -> { + final Thing enrichedThing = ThingEventToThingConverter.mergeThingWithExtraFields( + outboundSignal.getSource(), + allExtraFieldsOptional.get(), + extra).orElse(null); + return topics.stream() + .filter(_ -> enrichedThing != null || topicWithNoFilterExists) + .flatMap(topic -> applyFilter(outboundSignal, enrichedThing, topic) + .map(signal -> setTrimmedExtra(signal, topic, expressionResolver, + extra, allExtraFieldsOptional.get())) + .stream()) + .findFirst() + .map(Collections::singletonList) + .orElse(Collections.emptyList()); + })) + .orElseGet(() -> CompletableFuture.completedFuture(Collections.singletonList(outboundSignal))) .exceptionally(error -> { logger.withCorrelationId(outboundSignal.getSource()) .warning("Could not retrieve extra data due to: {} {}", error.getClass().getSimpleName(), @@ -498,17 +509,35 @@ private CompletionStage> enrichAndFilterSig }); } + private static OutboundSignalWithSender setTrimmedExtra(final OutboundSignalWithSender signal, + final FilteredTopic topic, + final ExpressionResolver expressionResolver, + final JsonObject extra, + final JsonFieldSelector allExtraFields) { + + return topic.getExtraFields() + .flatMap(fields -> getExtraFields(expressionResolver, Collections.singletonList(fields))) + .map(neededFields -> { + final var builder = extra.toBuilder(); + allExtraFields.getPointers().stream() + .filter(pointer -> !neededFields.getPointers().contains(pointer)) + .forEach(pointer -> pointer.getRoot().ifPresent(builder::remove)); + return signal.setExtra(builder.build()); + }) + .orElse(signal); + } + private static Optional getExtraFields(final ExpressionResolver expressionResolver, - @Nullable final FilteredTopic filteredTopic) { - - return Optional.ofNullable(filteredTopic) - .flatMap(FilteredTopic::getExtraFields) - .map(extraFields -> extraFields.getPointers().stream() - .map(JsonPointer::toString) - .map(expressionResolver::resolve) - .flatMap(PipelineElement::toStream) - .map(JsonPointer::of) - .toList()) + final List extraFieldsSelectors) { + + return Optional.of( + extraFieldsSelectors.stream() + .flatMap(selector -> selector.getPointers().stream()) + .map(JsonPointer::toString) + .map(expressionResolver::resolve) + .flatMap(PipelineElement::toStream) + .map(JsonPointer::of) + .toList()) .filter(jsonPointers -> !jsonPointers.isEmpty()) .map(JsonFactory::newFieldSelector) .map(ThingFieldSelector::fromJsonFieldSelector); @@ -736,7 +765,7 @@ private CompletionStage> toMultiMappe logger); return List.of(); } else { - final ActorRef sender = outboundSignals.get(0).sender; + final ActorRef sender = outboundSignals.getFirst().sender; final List targetsToPublishAt = outboundSignals.stream() .map(OutboundSignal::getTargets) .flatMap(List::stream) @@ -787,15 +816,22 @@ private static Stream filterFailedEnrichments( }); } - private Collection applyFilter(final OutboundSignalWithSender outboundSignalWithExtra, - final FilteredTopic filteredTopic) { + private Optional applyFilter(final OutboundSignalWithSender outboundSignal, + @Nullable final Thing thing, final FilteredTopic topic) { + + final Signal signal = outboundSignal.getSource(); + final TopicPath topicPath = DITTO_PROTOCOL_ADAPTER.toTopicPath(signal); + + if (!topicMatchesTopicPath(topicPath, topic.getTopic())) { + return Optional.empty(); + } - final Optional filter = filteredTopic.getFilter(); - final Optional extraFields = filteredTopic.getExtraFields(); - if (filter.isPresent() && extraFields.isPresent()) { + final Optional filter = topic.getFilter(); + if (filter.isPresent()) { + if (thing == null) { + return Optional.empty(); + } // evaluate filter criteria again if signal enrichment is involved. - final Signal signal = outboundSignalWithExtra.getSource(); - final TopicPath topicPath = DITTO_PROTOCOL_ADAPTER.toTopicPath(signal); final PlaceholderResolver topicPathPlaceholderResolver = PlaceholderFactory .newPlaceholderResolver(TOPIC_PATH_PLACEHOLDER, topicPath); final PlaceholderResolver entityIdPlaceholderResolver = PlaceholderFactory @@ -815,27 +851,36 @@ private Collection applyFilter(final OutboundSignalWit topicPathPlaceholderResolver, entityIdPlaceholderResolver, thingPlaceholderResolver, featurePlaceholderResolver, resourcePlaceholderResolver, timePlaceholderResolver ).filterCriteria(filter.get(), dittoHeaders); - return outboundSignalWithExtra.getExtra() - .flatMap(extra -> ThingEventToThingConverter - .mergeThingWithExtraFields(signal, extraFields.get(), extra) - .filter(thing -> { - final PlaceholderResolver thingJsonPlaceholderResolver = PlaceholderFactory - .newPlaceholderResolver(THING_JSON_PLACEHOLDER, thing); - return ThingPredicateVisitor.apply(criteria, topicPathPlaceholderResolver, - entityIdPlaceholderResolver, thingPlaceholderResolver, - featurePlaceholderResolver, resourcePlaceholderResolver, - timePlaceholderResolver, thingJsonPlaceholderResolver) - .test(thing); - }) - .map(thing -> outboundSignalWithExtra)) - .map(Collections::singletonList) - .orElse(List.of()); + final PlaceholderResolver thingJsonPlaceholderResolver = PlaceholderFactory + .newPlaceholderResolver(THING_JSON_PLACEHOLDER, thing); + final var result = Optional.of(outboundSignal) + .filter(_ -> ThingPredicateVisitor + .apply(criteria, topicPathPlaceholderResolver, + entityIdPlaceholderResolver, thingPlaceholderResolver, + featurePlaceholderResolver, resourcePlaceholderResolver, + timePlaceholderResolver, thingJsonPlaceholderResolver) + .test(thing)); + return result; } else { // no signal enrichment: filtering is already done in SignalFilter since there is no ignored field - return Collections.singletonList(outboundSignalWithExtra); + return Optional.of(outboundSignal); } } + private static boolean topicMatchesTopicPath(final TopicPath topicPath, final Topic topic) { + return switch (topic) { + case TWIN_EVENTS -> topicPath.isChannel(TopicPath.Channel.TWIN) + && topicPath.isCriterion(TopicPath.Criterion.EVENTS); + case LIVE_EVENTS -> topicPath.isChannel(TopicPath.Channel.LIVE) + && topicPath.isCriterion(TopicPath.Criterion.EVENTS); + case LIVE_COMMANDS -> topicPath.isChannel(TopicPath.Channel.LIVE) + && topicPath.isCriterion(TopicPath.Criterion.COMMANDS); + case LIVE_MESSAGES -> topicPath.isChannel(TopicPath.Channel.LIVE) + && topicPath.isCriterion(TopicPath.Criterion.MESSAGES); + default -> false; + }; + } + private static String stackTraceAsString(final DittoRuntimeException exception) { final StringWriter stringWriter = new StringWriter(); exception.printStackTrace(new PrintWriter(stringWriter)); @@ -848,38 +893,39 @@ private static boolean isSuccessResponse(final CommandResponse response) { } /** - * Split the targets of an outbound signal into 2 parts: those without extra fields and those with. + * Pairs each target of an outbound signal with its topics, if any with an extra field. * * @param outboundSignal The outbound signal. - * @return A pair of lists. The first list contains targets without matching extra fields. - * The second list contains targets together with their extra fields matching the outbound signal. + * @return A list of pairs, one per target. + * If the target has at least one topic with extra fields, the target is paired with a set of its topics. + * Otherwise (no extra fields), it is paired with an empty set. + * If the signal has no streaming type, all targets are paired with an empty set. */ - private static Pair, List>> splitTargetsByExtraFields( + private static List>> pairTargetsWithTopics( final OutboundSignal outboundSignal) { final Optional streamingTypeOptional = StreamingType.fromSignal(outboundSignal.getSource()); if (streamingTypeOptional.isPresent()) { // Find targets with a matching topic with extra fields final StreamingType streamingType = streamingTypeOptional.get(); - final List targetsWithoutExtraFields = new ArrayList<>(outboundSignal.getTargets().size()); - final List> targetsWithExtraFields = + final List>> targetsPairedWithTopics = new ArrayList<>(outboundSignal.getTargets().size()); + for (final Target target : outboundSignal.getTargets()) { - final Optional matchingExtraFields = target.getTopics() - .stream() - .filter(filteredTopic -> filteredTopic.getExtraFields().isPresent() && - streamingType == StreamingType.fromTopic(filteredTopic.getTopic().getPubSubTopic())) - .findAny(); - if (matchingExtraFields.isPresent()) { - targetsWithExtraFields.add(Pair.create(target, matchingExtraFields.get())); + if (target.getTopics().stream() + .anyMatch(filteredTopic -> filteredTopic.getExtraFields().isPresent() && + streamingType == StreamingType.fromTopic(filteredTopic.getTopic().getPubSubTopic()))) { + targetsPairedWithTopics.add(Pair.create(target, target.getTopics())); } else { - targetsWithoutExtraFields.add(target); + targetsPairedWithTopics.add(Pair.create(target, Collections.emptySet())); } } - return Pair.create(targetsWithoutExtraFields, targetsWithExtraFields); + return targetsPairedWithTopics; } else { // The outbound signal has no streaming type: Do not attach extra fields. - return Pair.create(outboundSignal.getTargets(), Collections.emptyList()); + return outboundSignal.getTargets().stream() + .map(target -> Pair.create(target, Collections.emptySet())) + .toList(); } } diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/OutboundMappingProcessorActorTest.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/OutboundMappingProcessorActorTest.java index c10f1f5d5b..016747c025 100644 --- a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/OutboundMappingProcessorActorTest.java +++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/OutboundMappingProcessorActorTest.java @@ -41,21 +41,30 @@ import org.eclipse.ditto.connectivity.model.ConnectionType; import org.eclipse.ditto.connectivity.model.ConnectivityModelFactory; import org.eclipse.ditto.connectivity.model.ConnectivityStatus; +import org.eclipse.ditto.connectivity.model.FilteredTopic; import org.eclipse.ditto.connectivity.model.Source; import org.eclipse.ditto.connectivity.model.Target; import org.eclipse.ditto.connectivity.model.Topic; import org.eclipse.ditto.internal.utils.pekko.ActorSystemResource; import org.eclipse.ditto.internal.utils.protocol.ProtocolAdapterProvider; import org.eclipse.ditto.internal.utils.tracing.DittoTracingInitResource; +import org.eclipse.ditto.json.JsonKey; import org.eclipse.ditto.json.JsonObject; +import org.eclipse.ditto.json.JsonPointer; +import org.eclipse.ditto.policies.model.PolicyId; import org.eclipse.ditto.protocol.TopicPath; import org.eclipse.ditto.things.model.Attributes; +import org.eclipse.ditto.things.model.Feature; +import org.eclipse.ditto.things.model.FeatureProperties; import org.eclipse.ditto.things.model.Thing; import org.eclipse.ditto.things.model.ThingFieldSelector; import org.eclipse.ditto.things.model.ThingId; +import org.eclipse.ditto.things.model.ThingsModelFactory; import org.eclipse.ditto.things.model.signals.commands.query.RetrieveThing; import org.eclipse.ditto.things.model.signals.commands.query.RetrieveThingResponse; +import org.eclipse.ditto.things.model.signals.events.FeatureModified; import org.eclipse.ditto.things.model.signals.events.ThingModified; +import org.jspecify.annotations.NonNull; import org.junit.Before; import org.junit.ClassRule; import org.junit.FixMethodOrder; @@ -75,6 +84,7 @@ public final class OutboundMappingProcessorActorTest { DittoTracingInitResource.disableDittoTracing(); private static final Connection CONNECTION = createTestConnection(); + public static final Thing THING = createTestThing(); @Rule public final ActorSystemResource actorSystemResource = ActorSystemResource.newInstance( @@ -100,10 +110,7 @@ public void setUp() { @Test public void sendWeakAckForAllSourcesAndTargetsWhenDroppedByAllTargets() { new TestKit(actorSystemResource.getActorSystem()) {{ - final Props props = - OutboundMappingProcessorActor.props(clientActorProbe.ref(), getProcessors(), CONNECTION, - TestConstants.CONNECTIVITY_CONFIG, 3); - final ActorRef underTest = actorSystemResource.newActor(props); + final ActorRef underTest = getTestActorRef(CONNECTION); // WHEN: mapping processor actor receives outbound signal whose every authorized target is filtered out final OutboundSignal outboundSignal = outboundTwinEvent( @@ -129,12 +136,7 @@ public void sendWeakAckForAllSourcesAndTargetsWhenDroppedByAllTargets() { @Test public void eventsWithFailedEnrichmentIssueFailedAcks() { new TestKit(actorSystemResource.getActorSystem()) {{ - final Props props = OutboundMappingProcessorActor.props(clientActorProbe.ref(), - getProcessors(), - CONNECTION, - TestConstants.CONNECTIVITY_CONFIG, - 3); - final ActorRef underTest = actorSystemResource.newActor(props); + final ActorRef underTest = getTestActorRef(CONNECTION); final OutboundSignal outboundSignal = outboundTwinEvent( Attributes.newBuilder().set("target2", "wayne").build(), @@ -156,12 +158,7 @@ public void eventsWithFailedEnrichmentIssueFailedAcks() { @Test public void sendWeakAckWhenDroppedBySomeTarget() { new TestKit(actorSystemResource.getActorSystem()) {{ - final Props props = OutboundMappingProcessorActor.props(clientActorProbe.ref(), - getProcessors(), - CONNECTION, - TestConstants.CONNECTIVITY_CONFIG, - 3); - final ActorRef underTest = actorSystemResource.newActor(props); + final ActorRef underTest = getTestActorRef(CONNECTION); // WHEN: mapping processor actor receives outbound signal with 2 authorized targets, // 1 of which drops it via RQL filter after enrichment @@ -188,12 +185,7 @@ public void sendWeakAckWhenDroppedBySomeTarget() { @Test public void sendWeakAckWhenDroppedByMapper() { new TestKit(actorSystemResource.getActorSystem()) {{ - final Props props = OutboundMappingProcessorActor.props(clientActorProbe.ref(), - getProcessors(), - CONNECTION, - TestConstants.CONNECTIVITY_CONFIG, - 3); - final ActorRef underTest = actorSystemResource.newActor(props); + final ActorRef underTest = getTestActorRef(CONNECTION); // WHEN: mapping processor actor receives outbound signal with 2 authorized targets, // 1 of which drops it via payload mapping @@ -218,12 +210,7 @@ public void sendWeakAckWhenDroppedByMapper() { @Test public void doNotSendWeakAckForLiveResponse() { new TestKit(actorSystemResource.getActorSystem()) {{ - final Props props = OutboundMappingProcessorActor.props(clientActorProbe.ref(), - getProcessors(), - CONNECTION, - TestConstants.CONNECTIVITY_CONFIG, - 3); - final ActorRef underTest = actorSystemResource.newActor(props); + final ActorRef underTest = getTestActorRef(CONNECTION); // WHEN: mapping processor actor receives outbound signal with 3 authorized targets, // 2 of which drops it via payload mapping, 1 of which issues live-response @@ -248,12 +235,7 @@ public void doNotSendWeakAckForLiveResponse() { @Test public void expectNoTargetIssuedAckRequestInPublishedSignals() { new TestKit(actorSystemResource.getActorSystem()) {{ - final Props props = OutboundMappingProcessorActor.props(clientActorProbe.ref(), - getProcessors(), - CONNECTION, - TestConstants.CONNECTIVITY_CONFIG, - 3); - final ActorRef underTest = actorSystemResource.newActor(props); + final ActorRef underTest = getTestActorRef(CONNECTION); // WHEN: mapping processor actor receives outbound signal // with requests for source-declared and target-issued acks @@ -274,19 +256,226 @@ public void expectNoTargetIssuedAckRequestInPublishedSignals() { }}; } - private List getProcessors() { + private List getProcessors(Connection connection) { return List.of( - OutboundMappingProcessor.of(CONNECTION, - TestConstants.CONNECTIVITY_CONFIG, actorSystemResource.getActorSystem(), - protocolAdapterProvider.getProtocolAdapter("test"), - AbstractMessageMappingProcessorActorTest.mockLoggingAdapter(), null) + OutboundMappingProcessor.of(connection, + TestConstants.CONNECTIVITY_CONFIG, actorSystemResource.getActorSystem(), + protocolAdapterProvider.getProtocolAdapter("test"), + AbstractMessageMappingProcessorActorTest.mockLoggingAdapter(), null) ); } + @Test + public void multipleTopicsWithExtraFieldsFirstTopic() { + new TestKit(actorSystemResource.getActorSystem()) {{ + // Create a target with multiple FilteredTopics for the same streaming type with different extraFields + List targets = List.of(createTestTargetMultiTopics(Set.of(topic4(), topic3(), topic5()))); + final Connection connection = CONNECTION.toBuilder().setTargets(targets).build(); + final ActorRef underTest = getTestActorRef(connection); + + // Now test a signal that should match the second subscription (no extraFields) + final OutboundSignal outboundSignal = outboundFeatureTwinEvent(THING, + Feature.newBuilder().withId("feature4") + .build().setProperties(FeatureProperties.newBuilder().set("size", "large").build()), + List.of("multipleExtraFields"), targets, getRef()); + + underTest.tell(outboundSignal, getRef()); + partialRetrieveAndResponse(); + + // Expect a publish message with enriched signal + final BaseClientActor.PublishMappedMessage publishWithPolicy = + clientActorProbe.expectMsgClass(BaseClientActor.PublishMappedMessage.class); + + // Verify this is for the target with extraFields + assertThat(publishWithPolicy.getOutboundSignal().first().getTargets()) + .contains(targets.get(0)); + + assertThat(publishWithPolicy.getOutboundSignal().first().getAdaptable().getPayload().getExtra()) + .isPresent() + .hasValueSatisfying(extra -> { + assertThat(extra.contains(JsonKey.of("definition"))) + .as("Outbound signal does not contain the requested extra fields.") + .isTrue(); + assertThat(extra.contains(JsonKey.of("attributes"))) + .as("Redundant extra fields exists, which must not exist.") + .isFalse(); + }); + }}; + } + + @Test + public void multipleTopicsWithExtraFieldsMidTopic() { + new TestKit(actorSystemResource.getActorSystem()) {{ + // Create a target with multiple FilteredTopics for the same streaming type with different extraFields + List targets = List.of(createTestTargetMultiTopics(Set.of(topic4(), topic3(), topic5()))); + final Connection connection = CONNECTION.toBuilder().setTargets(targets).build(); + final ActorRef underTest = getTestActorRef(connection); + + // Now test a signal that should match the second subscription (no extraFields) + final OutboundSignal outboundSignal = outboundFeatureTwinEvent(THING, + Feature.newBuilder().withId("feature3") + .build().setProperties(FeatureProperties.newBuilder().set("level", "full").build()), + List.of("multipleExtraFields"), targets, getRef()); + + underTest.tell(outboundSignal, getRef()); + partialRetrieveAndResponse(); + + // Expect a publish message with enriched signal + final BaseClientActor.PublishMappedMessage publishWithPolicy = + clientActorProbe.expectMsgClass(BaseClientActor.PublishMappedMessage.class); + + // Verify this is for the target with extraFields + assertThat(publishWithPolicy.getOutboundSignal().first().getTargets()) + .contains(targets.get(0)); + + assertThat(publishWithPolicy.getOutboundSignal().first().getAdaptable().getPayload().getExtra()) + .as("Outbound signal must not contain any extra field.") + .isEmpty(); + }}; + } + + @Test + public void multipleTopicsWithExtraFieldsLastTopic() { + new TestKit(actorSystemResource.getActorSystem()) {{ + // Create a target with multiple FilteredTopics for the same streaming type with different extraFields + List targets = List.of(createTestTargetMultiTopics(Set.of(topic4(), topic3(), topic5()))); + final Connection connection = CONNECTION.toBuilder().setTargets(targets).build(); + final ActorRef underTest = getTestActorRef(connection); + + final OutboundSignal outboundSignal = outboundFeatureTwinEvent(THING, + Feature.newBuilder().withId("feature5") + .build().setProperties(FeatureProperties.newBuilder().set("level", "full").build()), + List.of("multipleExtraFields"), targets, getRef()); + + underTest.tell(outboundSignal, getRef()); + partialRetrieveAndResponse(); + + final BaseClientActor.PublishMappedMessage publishWithPolicy = + clientActorProbe.expectMsgClass(BaseClientActor.PublishMappedMessage.class); + + assertThat(publishWithPolicy.getOutboundSignal().first().getTargets()) + .contains(targets.get(0)); + + assertThat(publishWithPolicy.getOutboundSignal().first().getAdaptable().getPayload().getExtra()) + .isPresent() + .hasValueSatisfying(extra -> { + assertThat(extra.contains(JsonKey.of("attributes"))) + .as("Outbound signal does not contain the requested extra fields.") + .isTrue(); + assertThat(extra.contains(JsonKey.of("definition"))) + .as("Redundant extra fields exists, which must not exist.") + .isFalse(); + }); + }}; + } + + @Test + public void multipleTopicsWithoutExtraFieldsFastProcessed() { + new TestKit(actorSystemResource.getActorSystem()) {{ + // Create a target with multiple topics all without extra fields. + // Must send outbound signal, skipping filtering as the pe-filtering has already done the job + List targets = List.of(createTestTargetMultiTopics(Set.of(topic3(), topic3a()))); + final Connection connection = CONNECTION.toBuilder().targets(targets).build(); + final ActorRef underTest = getTestActorRef(connection); + + final OutboundSignal outboundSignal = outboundFeatureTwinEvent(THING, + Feature.newBuilder().withId("water-tank").build(), + List.of("multipleWithoutExtraFields"), targets, getRef()); + underTest.tell(outboundSignal, getRef()); + + final BaseClientActor.PublishMappedMessage publishWithPolicy = + clientActorProbe.expectMsgClass(BaseClientActor.PublishMappedMessage.class); + + assertThat(publishWithPolicy.getOutboundSignal().first().getTargets()) + .contains(targets.get(0)); + + assertThat(publishWithPolicy.getOutboundSignal().first().getAdaptable().getPayload().getExtra()) + .isEmpty(); + }}; + } + + @Test + public void multipleTopicsExtraFieldsFilterless() { + new TestKit(actorSystemResource.getActorSystem()) {{ + // Create a target with multiple topics with some extra fields, but with a topic without a filter + // Must send outbound signal, skipping filtering but enriching with the requested extra fields + List targets = List.of(createTestTargetMultiTopics(Set.of(topic3(), topic5(), topic2()))); + final Connection connection = CONNECTION.toBuilder().targets(targets).build(); + final ActorRef underTest = getTestActorRef(connection); + + final OutboundSignal outboundSignal = outboundFeatureTwinEvent(THING, + Feature.newBuilder().withId("some-feature").build(), + List.of("multipleWithExtraFields"), targets, getRef()); + underTest.tell(outboundSignal, getRef()); + partialRetrieveAndResponse(); + + final BaseClientActor.PublishMappedMessage publishWithPolicy = + clientActorProbe.expectMsgClass(BaseClientActor.PublishMappedMessage.class); + + assertThat(publishWithPolicy.getOutboundSignal().first().getTargets()) + .contains(targets.get(0)); + + assertThat(publishWithPolicy.getOutboundSignal().first().getAdaptable().getPayload().getExtra()) + .isPresent() + .hasValueSatisfying(extra -> assertThat(extra.contains(JsonKey.of("definition"))) + .as("Outbound signal does not contain the requested extra fields.") + .isTrue()); + + }}; + } + + private void partialRetrieveAndResponse() { + // Expect enrichment request for all fields in all topics + final RetrieveThing retrieveEnrichedThing = proxyActorProbe.expectMsgClass(RetrieveThing.class); + assertThat(retrieveEnrichedThing.getSelectedFields()) + .isPresent() + .hasValueSatisfying(fields -> + assertThat(fields.getPointers()) + .contains(JsonPointer.of("/attributes"), JsonPointer.of("/definition"))); + + // Reply with enriched Thing + final Thing enrichedThing = Thing.newBuilder() + .setAttributes(Attributes.newBuilder().set("attr1", "attrValue1").build()) + .setDefinition(ThingsModelFactory.newDefinition("testNamespace:TestDefinition:1.2.3")) + .build(); + proxyActorProbe.reply(RetrieveThingResponse.of(thingId(), enrichedThing, null, null, DittoHeaders.empty())); + } + + private ActorRef getTestActorRef(Connection connection) { + final Props props = OutboundMappingProcessorActor.props(clientActorProbe.ref(), + getProcessors(connection), + connection, + TestConstants.CONNECTIVITY_CONFIG, + 3); + return actorSystemResource.newActor(props); + } + + private static OutboundSignal outboundFeatureTwinEvent(final Thing thing, final Feature feature, final Collection requestedAcks, + final List targets, final ActorRef testRef) { + final List readGrantedSubjects = targets.stream() + .map(Target::getAuthorizationContext) + .flatMap(authContext -> authContext.getAuthorizationSubjects().stream()) + .distinct() + .toList(); + + final FeatureModified featureModified = FeatureModified.of(thing.getEntityId().get(), feature, 2L, Instant.EPOCH, + DittoHeaders.newBuilder() + .acknowledgementRequests(requestedAcks.stream() + .map(AcknowledgementRequest::parseAcknowledgementRequest) + .toList()) + .readGrantedSubjects(readGrantedSubjects) + .putHeader(DittoHeaderDefinition.DITTO_ACKREGATOR_ADDRESS.getKey(), + testRef.path().toSerializationFormat()) + .build(), + Metadata.newMetadata(JsonObject.empty())); + return OutboundSignalFactory.newOutboundSignal(featureModified, targets); + } + private static OutboundSignal outboundTwinEvent(final Attributes attributes, final Collection requestedAcks, final List targets, final ActorRef testRef) { final Thing thing = Thing.newBuilder().setId(thingId()) .setAttributes(attributes) + .setPolicyId(PolicyId.of("test1:policy1")) .build(); final List readGrantedSubjects = targets.stream() .map(Target::getAuthorizationContext) @@ -333,6 +522,56 @@ private static Connection createTestConnection() { .build(); } + private static @NonNull Thing createTestThing() { + return Thing.newBuilder().setId(thingId()) + .setAttributes(Attributes.newBuilder().set("attr1", "attrValue1").build()) + .setPolicyId(PolicyId.of("test1:policy1")) + .build(); + } + + private static FilteredTopic topic1() { + return ConnectivityModelFactory.newFilteredTopicBuilder(Topic.TWIN_EVENTS).build(); + } + + private static FilteredTopic topic2() { + return ConnectivityModelFactory.newFilteredTopicBuilder(Topic.TWIN_EVENTS) + .withExtraFields(ThingFieldSelector.fromString("definition")).build(); + + } + + private static FilteredTopic topic3() { + return ConnectivityModelFactory.newFilteredTopicBuilder(Topic.TWIN_EVENTS) + .withFilter("and(eq(topic:action,\"modified\"),eq(resource:path,\"/features/feature3\"))").build(); + } + + private static FilteredTopic topic3a() { + return ConnectivityModelFactory.newFilteredTopicBuilder(Topic.TWIN_EVENTS) + .withFilter("and(eq(topic:action,\"modified\"),eq(resource:path,\"/features/feature3a\"))").build(); + } + + private static FilteredTopic topic4() { + return ConnectivityModelFactory.newFilteredTopicBuilder(Topic.TWIN_EVENTS) + .withFilter("and(eq(topic:action,\"modified\"),eq(resource:path,\"/features/feature4\"))") + .withExtraFields(ThingFieldSelector.fromString("definition")).build(); + + } + + private static FilteredTopic topic5() { + return ConnectivityModelFactory.newFilteredTopicBuilder(Topic.TWIN_EVENTS) + .withFilter("and(eq(topic:action,\"modified\"),eq(resource:path,\"/features/feature5\"))") + .withExtraFields(ThingFieldSelector.fromString("attributes")).build(); + + } + + private static @NonNull Target createTestTargetMultiTopics(Set topics) { + return ConnectivityModelFactory.newTargetBuilder() + .address("multipleExtraFieldsTarget") + .authorizationContext(singletonContext(target1Subject())) + .topics(topics) + .issuedAcknowledgementLabel(AcknowledgementLabel.of("multipleExtraFields")) + .build(); + } + private static Source createTestSource() { return ConnectivityModelFactory.newSourceBuilder() .address("source1")