[WIP][REVIEW-DONE] Apply StateStore's scan operation to optimize the pattern of range scan for TWS#25
[WIP][REVIEW-DONE] Apply StateStore's scan operation to optimize the pattern of range scan for TWS#25HeartSaVioR wants to merge 11 commits intomasterfrom
Conversation
3bd04e1 to
7846304
Compare
Use bounded scan ranges in transformWithState TTL eviction and timer expiry to narrow the iteration scope: - TTLState.ttlEvictionIterator: use store.scan with startKey from prevBatchTimestampMs+1 and endKey from batchTimestampMs+1 to skip entries already cleaned up in the previous batch. - TimerStateImpl.getExpiredTimers: use store.scan with startKey from prevExpiryTimestampMs+1 and endKey from expiryTimestampMs+1. Processing-time timers use prevBatchTimestampMs; event-time timers use eventTimeWatermarkForLateEvents. Thread prevBatchTimestampMs from IncrementalExecution (via prevOffsetSeqMetadata) through TransformWithStateExec -> StatefulProcessorHandleImpl -> TTLState / TimerStateImpl. Copy UnsafeRow results from encodeTTLRow/UnsafeProjection to avoid the mutable-row-reuse bug where startKey and endKey alias the same internal buffer.
7846304 to
63892c7
Compare
...ng/operators/stateful/transformwithstate/statefulprocessor/StatefulProcessorHandleImpl.scala
Show resolved
Hide resolved
...rk/sql/execution/streaming/operators/stateful/transformwithstate/timers/TimerStateImpl.scala
Show resolved
Hide resolved
| val startKey = prevExpiryTimestampMs.flatMap { prevTs => | ||
| if (prevTs < Long.MaxValue) { | ||
| val row = new GenericInternalRow(keySchemaForSecIndex.length) | ||
| row.setLong(0, prevTs + 1) |
There was a problem hiding this comment.
Self-review: Do we consider the prevExpiryTimestampMs to be exclusive? If then it makes sense. It's not clear since we haven't documented the method doc. Let's document it.
| expiryTimestampMs: Long, | ||
| prevExpiryTimestampMs: Option[Long] = None): Iterator[(Any, Long)] = { | ||
| val startKey = prevExpiryTimestampMs.flatMap { prevTs => | ||
| if (prevTs < Long.MaxValue) { |
There was a problem hiding this comment.
Self-review: maybe extract this if/else statement to inner method since we use the same code in startKey and endKey?
...sql/execution/streaming/operators/stateful/transformwithstate/ttl/ListStateImplWithTTL.scala
Show resolved
Hide resolved
.../sql/execution/streaming/operators/stateful/transformwithstate/ttl/MapStateImplWithTTL.scala
Show resolved
Hide resolved
| // The schema of the UnsafeRow returned by this iterator is (expirationMs, elementKey). | ||
| private[sql] def ttlEvictionIterator(): Iterator[UnsafeRow] = { | ||
| val ttlIterator = store.iterator(TTL_INDEX) | ||
| val dummyElementKey = UnsafeProjection.create(elementKeySchema) |
There was a problem hiding this comment.
Self-review: UnsafeProject.create() is causing a huge cost (could trigger codegen). Can we move this into private field, and only call apply() in here? If we have the same pattern in elsewhere in this PR, we should fix them as well.
| } | ||
| val ttlIterator = store.rangeScan(startKey, endKey, TTL_INDEX) | ||
|
|
||
| // Recall that the format is (expirationMs, elementKey) -> TTL_EMPTY_VALUE_ROW, so |
There was a problem hiding this comment.
Self-review: maybe this code comment is still valid? If then let's just leave it as it is.
| StreamingSymmetricHashJoinHelper.getStateWatermarkPredicates( | ||
| j.left.output, j.right.output, j.leftKeys, j.rightKeys, j.condition.full, | ||
| iwEviction, !allowMultipleStatefulOperators) | ||
| iwEviction, !allowMultipleStatefulOperators, iwLateEvents) |
There was a problem hiding this comment.
Self-review: is this change really coupled with this branch? This branch is for TWS.
What changes were proposed in this pull request?
Why are the changes needed?
Does this PR introduce any user-facing change?
How was this patch tested?
Was this patch authored or co-authored using generative AI tooling?