Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import me.golemcore.bot.domain.model.ToolDefinition;
import me.golemcore.bot.domain.service.ToolArtifactService;
import me.golemcore.bot.domain.service.RuntimeConfigService;
import me.golemcore.bot.domain.system.LlmErrorPatterns;
import me.golemcore.bot.port.outbound.ModelConfigPort;
import me.golemcore.bot.port.outbound.LlmPort;
import com.fasterxml.jackson.core.type.TypeReference;
Expand Down Expand Up @@ -71,13 +72,14 @@
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;

import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Base64;
import java.time.Duration;
import java.util.Collections;
import java.util.Deque;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
Expand Down Expand Up @@ -138,6 +140,13 @@
private static final java.util.regex.Pattern RESET_SECONDS_PATTERN = java.util.regex.Pattern
.compile("\"reset_seconds\"\\s*:\\s*(\\d+)");
private static final String TOOL_ATTACHMENT_REOPEN_HINT = "Re-open the file with a tool if deeper inspection is needed.";
private static final Set<String> RATE_LIMIT_MARKERS = Set.of(
"rate_limit",
"rate limit",
"token_quota_exceeded",
"too many requests",
"model_cooldown",
"cooling down");

private record MessageConversionResult(List<ChatMessage> messages, boolean hydratedToolImages) {
}
Expand Down Expand Up @@ -500,21 +509,49 @@
}

private boolean isRateLimitError(Throwable e) {
// Walk the cause chain looking for rate limit indicators
Throwable current = e;
while (current != null) {
// langchain4j maps HTTP 429 to RateLimitException regardless of body content
// Use identity semantics for the visited set: the intent is "have I seen this
// exact cause object before" (cycle guard), not value equality. A future
// provider exception that overrides equals/hashCode - or a wrapped cause that
// happens to compare equal to an earlier frame - must not collapse into a
// single visited entry and prematurely terminate the cause walk.
Set<Throwable> visited = Collections.newSetFromMap(new IdentityHashMap<>());
for (Throwable current = e; current != null && visited.add(current); current = current.getCause()) {

Check warning on line 518 in src/main/java/me/golemcore/bot/adapter/outbound/llm/Langchain4jAdapter.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Reduce the total number of break and continue statements in this loop to use at most one.

See more on https://sonarcloud.io/project/issues?id=alexk-dev_golemcore-bot&issues=AZ2OACsGDQWxOyURfzSV&open=AZ2OACsGDQWxOyURfzSV&pullRequest=284
// langchain4j maps HTTP 429 to RateLimitException regardless of body content.
if (current instanceof dev.langchain4j.exception.RateLimitException) {
return true;
}
String msg = current.getMessage();
if (msg != null && (msg.contains("rate_limit") || msg.contains("token_quota_exceeded")
|| msg.contains("too_many_tokens") || msg.contains("Too Many Requests")
|| msg.contains("429") || msg.contains("model_cooldown")
|| msg.contains("cooling down"))) {
String raw = current.getMessage();
if (raw == null) {
continue;
}
String normalized = raw.toLowerCase(Locale.ROOT);
// Context-overflow errors sometimes carry rate-limit-adjacent wording
// ("too many tokens") - they must NOT be retried as rate limits.
if (LlmErrorPatterns.matchesContextOverflow(normalized)) {
continue;
}
if (containsAny(normalized, RATE_LIMIT_MARKERS) || looksLikeHttp429(normalized)) {
return true;
}
}
return false;
}

private static boolean looksLikeHttp429(String normalized) {
// Avoid matching an arbitrary "429" in stack traces, byte counts, or IDs.
return normalized.contains("http 429")
|| normalized.contains("status 429")
|| normalized.contains("status: 429")
|| normalized.contains("status_code=429")
|| normalized.contains("code 429")
|| normalized.contains("429 too many");
}

private static boolean containsAny(String haystack, Set<String> needles) {
for (String needle : needles) {
if (haystack.contains(needle)) {
return true;
}
current = current.getCause();
}
return false;
}
Expand Down Expand Up @@ -705,7 +742,7 @@

@Override
public List<String> getSupportedModels() {
// Build from models.json provider/modelName for each configured provider
// Build from models.json - provider/modelName for each configured provider.
List<String> models = new ArrayList<>();
Map<String, ModelCatalogEntry> modelsConfig = modelConfig
.getAllModels();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@
* Trigger source for conversation compaction.
*/
public enum CompactionReason {
AUTO_THRESHOLD, MANUAL_COMMAND, CONTEXT_OVERFLOW_RECOVERY
AUTO_THRESHOLD, MANUAL_COMMAND, CONTEXT_OVERFLOW_RECOVERY, REQUEST_PREFLIGHT
}
36 changes: 21 additions & 15 deletions src/main/java/me/golemcore/bot/domain/model/ContextAttributes.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ public final class ContextAttributes {
private ContextAttributes() {
}

/** Boolean signal to stop the agent loop after current iteration. */
/** LlmResponse response from LLM execution. */
/** Boolean - signal to stop the agent loop after current iteration. */
/** LlmResponse - response from LLM execution. */
public static final String LLM_RESPONSE = "llm.response";

/** String LLM error message. */
/** String - LLM error message. */
public static final String LLM_ERROR = "llm.error";

/** String machine-readable LLM error classification code. */
/** String - machine-readable LLM error classification code. */
public static final String LLM_ERROR_CODE = "llm.error.code";

/** String ? active request trace identifier. */
Expand Down Expand Up @@ -78,16 +78,16 @@ private ContextAttributes() {
*/
public static final String CONTEXT_SCOPED_TOOLS = "context.scoped.tools";

/** Boolean tools were executed in this iteration. */
/** Boolean - tools were executed in this iteration. */

/**
* String LLM model name that last generated tool calls in the session
* String - LLM model name that last generated tool calls in the session
* (persisted in session metadata).
*/
public static final String LLM_MODEL = "llm.model";

/**
* String reasoning effort used for the current LLM call (e.g. "low",
* String - reasoning effort used for the current LLM call (e.g. "low",
* "medium", "high").
*/
public static final String LLM_REASONING = "llm.reasoning";
Expand All @@ -109,10 +109,10 @@ private ContextAttributes() {
/** Boolean - session-scoped lock for the selected model tier. */
public static final String SESSION_MODEL_TIER_FORCE = "session.modelTier.force";

/** Boolean plan mode is active for the current session. */
/** Boolean - plan mode is active for the current session. */
public static final String PLAN_MODE_ACTIVE = "plan.mode.active";

/** String plan ID that needs user approval before execution. */
/** String - plan ID that needs user approval before execution. */
public static final String PLAN_APPROVAL_NEEDED = "plan.approval.needed";

/**
Expand Down Expand Up @@ -141,7 +141,7 @@ private ContextAttributes() {
public static final String ITERATION_LIMIT_REACHED = "iteration.limit.reached";

/**
* Boolean tool loop stopped due to internal limit (LLM calls / tool
* Boolean - tool loop stopped due to internal limit (LLM calls / tool
* executions / deadline).
*/
public static final String TOOL_LOOP_LIMIT_REACHED = "toolloop.limit.reached";
Expand All @@ -152,16 +152,16 @@ private ContextAttributes() {
public static final String TOOL_LOOP_LIMIT_REASON = "toolloop.limit.reason";

/**
* List<RuntimeEvent> runtime execution events for the current turn.
* List<RuntimeEvent> - runtime execution events for the current turn.
*/
public static final String RUNTIME_EVENTS = "runtime.events";

/**
* Boolean stop tool execution between tool calls for current turn.
* Boolean - stop tool execution between tool calls for current turn.
*/
public static final String TURN_INTERRUPT_REQUESTED = "turn.interrupt.requested";

/** String queue kind for inbound message while a turn is running. */
/** String - queue kind for inbound message while a turn is running. */
public static final String TURN_QUEUE_KIND = "turn.queue.kind";

/**
Expand All @@ -180,7 +180,7 @@ private ContextAttributes() {
public static final String TURN_QUEUE_KIND_DELAYED_ACTION = "delayed_action";

/**
* Boolean current turn scheduled an internal retry instead of user feedback.
* Boolean - current turn scheduled an internal retry instead of user feedback.
*/
public static final String TURN_INTERNAL_RETRY_SCHEDULED = "turn.internal.retry.scheduled";

Expand Down Expand Up @@ -217,7 +217,7 @@ private ContextAttributes() {
/** Boolean ? current inbound message is an internal runtime-only message. */
public static final String TURN_INPUT_INTERNAL = "turn.input.internal";

/** WebSocketSession reference to WebSocket session for streaming. */
/** WebSocketSession - reference to WebSocket session for streaming. */
public static final String WEB_STREAM_SINK = "web.stream.sink";

/** Boolean ? webhook delivery should be routed to an external channel. */
Expand Down Expand Up @@ -373,6 +373,12 @@ private ContextAttributes() {
/** Map<String,Object> ? latest structured compaction details. */
public static final String COMPACTION_LAST_DETAILS = "compaction.last.details";

/** Map<String,Object> - latest LLM request token preflight diagnostics. */
public static final String LLM_REQUEST_PREFLIGHT = "llm.request.preflight";

/** Map<String,Object> - latest LLM context-overflow recovery diagnostics. */
public static final String LLM_CONTEXT_OVERFLOW_RECOVERY = "llm.context.overflow.recovery";

/** List<Map<String,Object>> ? per-turn edited file stats for UI hints. */
public static final String TURN_FILE_CHANGES = "turn.file.changes";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,19 @@
import java.util.Optional;

/**
* Central compaction pipeline: prepare boundaries, summarize, persist details.
* Central compaction pipeline shared by every trigger source (auto-compaction,
* manual command, preflight, context-overflow recovery).
*
* <p>
* The service coordinates four collaborators: a
* {@link CompactionPreparationService} that decides which messages to drop vs.
* keep, a {@link CompactionService} that produces the summary text, a
* {@link CompactionDetailsExtractor} that builds the structured audit trail,
* and the {@link SessionPort} that persists the compacted session. The flow
* produces a single {@link CompactionResult} whether the compaction removed
* messages, became a no-op, or failed to locate the session, so callers can
* record diagnostics from one result shape.
* </p>
*/
@Service
public class CompactionOrchestrationService {
Expand Down Expand Up @@ -46,6 +58,35 @@ public CompactionOrchestrationService(SessionPort sessionPort,
this.clock = clock;
}

/**
* Compact the named session, keeping at most {@code keepLast} trailing messages
* and summarizing the rest if the preparation service can identify messages
* that may be removed.
*
* <p>
* Returns a {@link CompactionResult} with:
* </p>
* <ul>
* <li>{@code removed = -1} when the session cannot be located - the caller
* still receives a well-formed result and records diagnostics rather than
* throwing.</li>
* <li>{@code removed = 0} with no-op details when there was nothing to remove
* (keepLast larger than the session, or the preparation service refused to
* split a turn).</li>
* <li>{@code removed > 0} when messages were dropped. A summary message is
* present only when the summarizer returns non-blank text; otherwise
* {@code usedSummary=false} and the kept messages are persisted without a
* synthetic summary.</li>
* </ul>
*
* @param sessionId
* the session to compact
* @param reason
* classification used downstream for telemetry and summary prompt
* selection
* @param keepLast
* trailing messages to retain intact
*/
public CompactionResult compact(String sessionId, CompactionReason reason, int keepLast) {
Optional<AgentSession> sessionOptional = sessionPort.get(sessionId);
if (sessionOptional.isEmpty()) {
Expand Down Expand Up @@ -79,13 +120,14 @@ public CompactionResult compact(String sessionId, CompactionReason reason, int k
false,
0,
runtimeConfigService.getCompactionDetailsMaxItemsPerCategory());
persistDetails(session, emptyDetails);
return CompactionResult.builder()
CompactionResult emptyResult = CompactionResult.builder()
.removed(0)
.usedSummary(false)
.summaryMessage(null)
.details(emptyDetails)
.build();
persistDetails(session, emptyResult);
return emptyResult;
}

long startedAt = clock.millis();
Expand Down Expand Up @@ -123,62 +165,29 @@ public CompactionResult compact(String sessionId, CompactionReason reason, int k
Map<String, Object> metadata = summaryMessage.getMetadata() != null
? new LinkedHashMap<>(summaryMessage.getMetadata())
: new LinkedHashMap<>();
metadata.put(METADATA_KEY_COMPACTION_DETAILS, toDetailsMap(details));
metadata.put(METADATA_KEY_COMPACTION_DETAILS, CompactionPayloadMapper.toDetailsMap(details));
summaryMessage.setMetadata(metadata);
}

persistDetails(session, details);
sessionPort.save(session);

return CompactionResult.builder()
CompactionResult result = CompactionResult.builder()
.removed(preparation.messagesToCompact().size())
.usedSummary(usedSummary)
.summaryMessage(summaryMessage)
.details(details)
.build();
persistDetails(session, result);
sessionPort.save(session);
return result;
}

private void persistDetails(AgentSession session, CompactionDetails details) {
if (session == null || details == null) {
private void persistDetails(AgentSession session, CompactionResult result) {
if (session == null || result == null || result.details() == null) {
return;
}
if (session.getMetadata() == null) {
session.setMetadata(new LinkedHashMap<>());
}
session.getMetadata().put(ContextAttributes.COMPACTION_LAST_DETAILS, toDetailsMap(details));
}

private Map<String, Object> toDetailsMap(CompactionDetails details) {
Map<String, Object> result = new LinkedHashMap<>();
result.put("schemaVersion", details.schemaVersion());
result.put("reason", details.reason() != null ? details.reason().name() : null);
result.put("summarizedCount", details.summarizedCount());
result.put("keptCount", details.keptCount());
result.put("usedLlmSummary", details.usedLlmSummary());
result.put("summaryLength", details.summaryLength());
result.put("toolCount", details.toolCount());
result.put("readFilesCount", details.readFilesCount());
result.put("modifiedFilesCount", details.modifiedFilesCount());
result.put("durationMs", details.durationMs());
result.put("toolNames", details.toolNames());
result.put("readFiles", details.readFiles());
result.put("modifiedFiles", details.modifiedFiles());
result.put("splitTurnDetected", details.splitTurnDetected());
result.put("fallbackUsed", details.fallbackUsed());

List<Map<String, Object>> fileChanges = new ArrayList<>();
if (details.fileChanges() != null) {
for (CompactionDetails.FileChangeStat fileChange : details.fileChanges()) {
Map<String, Object> fileChangeMap = new LinkedHashMap<>();
fileChangeMap.put("path", fileChange.path());
fileChangeMap.put("addedLines", fileChange.addedLines());
fileChangeMap.put("removedLines", fileChange.removedLines());
fileChangeMap.put("deleted", fileChange.deleted());
fileChanges.add(fileChangeMap);
}
}
result.put("fileChanges", fileChanges);

return result;
session.getMetadata().put(ContextAttributes.COMPACTION_LAST_DETAILS,
CompactionPayloadMapper.toPayload(result));
}
}
Loading
Loading