Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/fix-stale-optimistic-server-key.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@tanstack/db': patch
---

Fix stale optimistic rows persisting when sync confirms a different server-generated key. Previously, direct transactions (from `collection.insert()` etc.) had their optimistic rows exempted from stale-row cleanup, which prevented temp-key rows from being removed when the server returned a different primary key.
2 changes: 1 addition & 1 deletion .github/workflows/e2e-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
- name: Setup Node.js
uses: actions/setup-node@49933ea5288caeca8642d1e84afbd3f7d6820020 # v4.4.0
with:
node-version: '20'
node-version: '22.13'
cache: 'pnpm'

- name: Install dependencies
Expand Down
13 changes: 3 additions & 10 deletions packages/db/src/collection/mutations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import {
UndefinedKeyError,
UpdateKeyNotFoundError,
} from '../errors'
import { DIRECT_TRANSACTION_METADATA_KEY } from './transaction-metadata.js'
import type { Collection, CollectionImpl } from './index.js'
import type { StandardSchemaV1 } from '@standard-schema/spec'
import type {
Expand Down Expand Up @@ -231,9 +230,7 @@ export class CollectionMutationsManager<
} else {
// Create a new transaction with a mutation function that calls the onInsert handler
const directOpTransaction = createTransaction<TOutput>({
metadata: {
[DIRECT_TRANSACTION_METADATA_KEY]: true,
},
metadata: {},
mutationFn: async (params) => {
// Call the onInsert handler with the transaction and collection
return await this.config.onInsert!({
Expand Down Expand Up @@ -430,9 +427,7 @@ export class CollectionMutationsManager<

// Create a new transaction with a mutation function that calls the onUpdate handler
const directOpTransaction = createTransaction<TOutput>({
metadata: {
[DIRECT_TRANSACTION_METADATA_KEY]: true,
},
metadata: {},
mutationFn: async (params) => {
// Call the onUpdate handler with the transaction and collection
return this.config.onUpdate!({
Expand Down Expand Up @@ -536,9 +531,7 @@ export class CollectionMutationsManager<
// Create a new transaction with a mutation function that calls the onDelete handler
const directOpTransaction = createTransaction<TOutput>({
autoCommit: true,
metadata: {
[DIRECT_TRANSACTION_METADATA_KEY]: true,
},
metadata: {},
mutationFn: async (params) => {
// Call the onDelete handler with the transaction and collection
return this.config.onDelete!({
Expand Down
79 changes: 24 additions & 55 deletions packages/db/src/collection/state.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { deepEquals } from '../utils'
import { SortedMap } from '../SortedMap'
import { enrichRowWithVirtualProps } from '../virtual-props.js'
import { DIRECT_TRANSACTION_METADATA_KEY } from './transaction-metadata.js'
import type {
VirtualOrigin,
VirtualRowProps,
Expand Down Expand Up @@ -81,8 +80,6 @@ export class CollectionStateManager<
public optimisticDeletes = new Set<TKey>()
public pendingOptimisticUpserts = new Map<TKey, TOutput>()
public pendingOptimisticDeletes = new Set<TKey>()
public pendingOptimisticDirectUpserts = new Set<TKey>()
public pendingOptimisticDirectDeletes = new Set<TKey>()

/**
* Tracks the origin of confirmed changes for each row.
Expand Down Expand Up @@ -480,8 +477,6 @@ export class CollectionStateManager<

// Update pending optimistic state for completed/failed transactions
for (const transaction of this.transactions.values()) {
const isDirectTransaction =
transaction.metadata[DIRECT_TRANSACTION_METADATA_KEY] === true
if (transaction.state === `completed`) {
for (const mutation of transaction.mutations) {
if (!this.isThisCollection(mutation.collection)) {
Expand All @@ -499,24 +494,10 @@ export class CollectionStateManager<
mutation.modified as TOutput,
)
this.pendingOptimisticDeletes.delete(mutation.key)
if (isDirectTransaction) {
this.pendingOptimisticDirectUpserts.add(mutation.key)
this.pendingOptimisticDirectDeletes.delete(mutation.key)
} else {
this.pendingOptimisticDirectUpserts.delete(mutation.key)
this.pendingOptimisticDirectDeletes.delete(mutation.key)
}
break
case `delete`:
this.pendingOptimisticUpserts.delete(mutation.key)
this.pendingOptimisticDeletes.add(mutation.key)
if (isDirectTransaction) {
this.pendingOptimisticDirectUpserts.delete(mutation.key)
this.pendingOptimisticDirectDeletes.add(mutation.key)
} else {
this.pendingOptimisticDirectUpserts.delete(mutation.key)
this.pendingOptimisticDirectDeletes.delete(mutation.key)
}
break
}
}
Expand All @@ -529,8 +510,6 @@ export class CollectionStateManager<
if (mutation.optimistic) {
this.pendingOptimisticUpserts.delete(mutation.key)
this.pendingOptimisticDeletes.delete(mutation.key)
this.pendingOptimisticDirectUpserts.delete(mutation.key)
this.pendingOptimisticDirectDeletes.delete(mutation.key)
}
}
}
Expand All @@ -548,35 +527,11 @@ export class CollectionStateManager<
pendingSyncKeys.add(operation.key as TKey)
}
}
const staleOptimisticUpserts: Array<TKey> = []
for (const [key, value] of this.pendingOptimisticUpserts) {
if (
pendingSyncKeys.has(key) ||
this.pendingOptimisticDirectUpserts.has(key)
) {
this.optimisticUpserts.set(key, value)
} else {
staleOptimisticUpserts.push(key)
}
}
for (const key of staleOptimisticUpserts) {
this.pendingOptimisticUpserts.delete(key)
this.pendingLocalOrigins.delete(key)
this.optimisticUpserts.set(key, value)
}
const staleOptimisticDeletes: Array<TKey> = []
for (const key of this.pendingOptimisticDeletes) {
if (
pendingSyncKeys.has(key) ||
this.pendingOptimisticDirectDeletes.has(key)
) {
this.optimisticDeletes.add(key)
} else {
staleOptimisticDeletes.push(key)
}
}
for (const key of staleOptimisticDeletes) {
this.pendingOptimisticDeletes.delete(key)
this.pendingLocalOrigins.delete(key)
this.optimisticDeletes.add(key)
}

const activeTransactions: Array<Transaction<any>> = []
Expand Down Expand Up @@ -987,8 +942,6 @@ export class CollectionStateManager<
this.pendingLocalOrigins.delete(key)
this.pendingOptimisticUpserts.delete(key)
this.pendingOptimisticDeletes.delete(key)
this.pendingOptimisticDirectUpserts.delete(key)
this.pendingOptimisticDirectDeletes.delete(key)
break
case `update`: {
if (rowUpdateMode === `partial`) {
Expand All @@ -1007,8 +960,6 @@ export class CollectionStateManager<
this.pendingLocalOrigins.delete(key)
this.pendingOptimisticUpserts.delete(key)
this.pendingOptimisticDeletes.delete(key)
this.pendingOptimisticDirectUpserts.delete(key)
this.pendingOptimisticDirectDeletes.delete(key)
break
}
case `delete`:
Expand All @@ -1020,8 +971,6 @@ export class CollectionStateManager<
this.pendingLocalOrigins.delete(key)
this.pendingOptimisticUpserts.delete(key)
this.pendingOptimisticDeletes.delete(key)
this.pendingOptimisticDirectUpserts.delete(key)
this.pendingOptimisticDirectDeletes.delete(key)
break
}
}
Expand Down Expand Up @@ -1160,6 +1109,28 @@ export class CollectionStateManager<
}
}

// A completed optimistic insert may have used a temporary client key while
// the sync confirmation used a different server-generated key. Once a
// sync commit has been applied, stop retaining completed optimistic keys
// that were not confirmed by this commit so the temporary row is removed.
for (const [key, previousValue] of this.pendingOptimisticUpserts) {
if (!changedKeys.has(key)) {
changedKeys.add(key)
if (!currentVisibleState.has(key)) {
currentVisibleState.set(key, previousValue)
}
this.pendingOptimisticUpserts.delete(key)
this.pendingLocalOrigins.delete(key)
}
}
for (const key of this.pendingOptimisticDeletes) {
if (!changedKeys.has(key)) {
changedKeys.add(key)
}
this.pendingOptimisticDeletes.delete(key)
this.pendingLocalOrigins.delete(key)
}

// Now check what actually changed in the final visible state
for (const key of changedKeys) {
const previousVisibleValue = currentVisibleState.get(key)
Expand Down Expand Up @@ -1378,8 +1349,6 @@ export class CollectionStateManager<
this.optimisticDeletes.clear()
this.pendingOptimisticUpserts.clear()
this.pendingOptimisticDeletes.clear()
this.pendingOptimisticDirectUpserts.clear()
this.pendingOptimisticDirectDeletes.clear()
this.clearOriginTrackingState()
this.isLocalOnly = false
this.size = 0
Expand Down
1 change: 0 additions & 1 deletion packages/db/src/collection/transaction-metadata.ts

This file was deleted.

90 changes: 90 additions & 0 deletions packages/db/tests/collection.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
MissingUpdateHandlerError,
} from '../src/errors'
import { createTransaction } from '../src/transactions'
import { createLiveQueryCollection, eq } from '../src/query/index.js'
import {
flushPromises,
mockSyncCollectionOptionsNoInitialState,
Expand Down Expand Up @@ -41,6 +42,95 @@ describe(`Collection`, () => {
expect(() => createCollection()).toThrow(CollectionRequiresConfigError)
})

it(`removes optimistic insert when sync confirms with a different server-generated key`, async () => {
const options = mockSyncCollectionOptionsNoInitialState<{
id: number
text: string
}>({
id: `server-generated-key-test`,
getKey: (item) => item.id,
startSync: true,
})
const collection = createCollection(options)

options.utils.markReady()
await collection.stateWhenReady()

const tx = collection.insert({ id: 4733, text: `two` })

expect(getStateEntries(collection)).toEqual([
[4733, { id: 4733, text: `two` }],
])

options.utils.begin()
options.utils.write({ type: `insert`, value: { id: 24, text: `two` } })
options.utils.commit()

// The sync commit is held while the local insert transaction is persisting.
expect(getStateEntries(collection)).toEqual([
[4733, { id: 4733, text: `two` }],
])

options.utils.resolveSync()
await tx.isPersisted.promise
await flushPromises()

expect(getStateEntries(collection)).toEqual([[24, { id: 24, text: `two` }]])
})

it(`updates live queries when an optimistic insert is replaced by a different server key`, async () => {
const options = mockSyncCollectionOptionsNoInitialState<{
id: number
text: string
project_id: number
}>({
id: `server-generated-key-live-query-test`,
getKey: (item) => item.id,
startSync: true,
})
const collection = createCollection(options)
const liveCollection = createLiveQueryCollection((q) =>
q
.from({ collection })
.where(({ collection }) => eq(collection.project_id, 1))
.select(({ collection }) => ({
id: collection.id,
text: collection.text,
project_id: collection.project_id,
$synced: collection.$synced,
$origin: collection.$origin,
$key: collection.$key,
})),
)

options.utils.markReady()
await liveCollection.preload()

const tx = collection.insert({ id: 4733, text: `two`, project_id: 1 })

expect(liveCollection.toArray.map((todo) => todo.id)).toEqual([4733])

options.utils.begin()
options.utils.write({
type: `insert`,
value: { id: 24, text: `two`, project_id: 1 },
})
options.utils.commit()

options.utils.resolveSync()
await tx.isPersisted.promise
await flushPromises()

expect(
liveCollection.toArray.map((todo) => ({
id: todo.id,
synced: todo.$synced,
origin: todo.$origin,
key: todo.$key,
})),
).toEqual([{ id: 24, synced: true, origin: `remote`, key: 24 }])
})

it(`should throw an error when trying to use mutation operations outside of a transaction`, async () => {
// Create a collection with sync but no mutationFn
const collection = createCollection<{ value: string }>({
Expand Down
1 change: 1 addition & 0 deletions packages/db/tests/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ type MockSyncCollectionConfigNoInitialState<T> = {
id: string
getKey: (item: T) => string | number
autoIndex?: `off` | `eager`
startSync?: boolean
defaultIndexType?: IndexConstructor
}

Expand Down
43 changes: 43 additions & 0 deletions packages/electric-db-collection/tests/electric.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,49 @@ describe(`Electric Integration`, () => {
expect(testCollection._state.syncedData.size).toEqual(1)
})

it(`should remove optimistic insert when txid sync confirms a different server-generated key`, async () => {
const txid = 1234
const onInsert = vi.fn().mockResolvedValue({ txid })

const testCollection = createCollection(
electricCollectionOptions({
id: `test-server-generated-key-txid`,
shapeOptions: {
url: `http://test-url`,
params: { table: `test_table` },
},
startSync: true,
getKey: (item: Row) => item.id as number,
onInsert,
}),
)

const tx = testCollection.insert({ id: 4733, text: `two` })

expect(stripVirtualProps(testCollection.get(4733))).toEqual({
id: 4733,
text: `two`,
})

subscriber([
{
key: `24`,
value: { id: 24, text: `two` },
headers: { operation: `insert`, txids: [txid] },
},
{ headers: { control: `up-to-date` } },
])

await tx.isPersisted.promise

expect(testCollection.has(4733)).toBe(false)
expect(stripVirtualProps(testCollection.get(24))).toEqual({
id: 24,
text: `two`,
})
expect(Array.from(testCollection.state.keys())).toEqual([24])
})

it(`should support void strategy when handler returns nothing`, async () => {
const onInsert = vi.fn().mockResolvedValue(undefined)

Expand Down
Loading