Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 38 additions & 20 deletions packages/trailbase-db-collection/src/trailbase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,12 @@ export function trailBaseCollectionOptions<

const internalSyncMode = config.syncMode ?? `eager`
let fullSyncCompleted = false
// Tracks whether subscribe("*") succeeded. Set to true inside start() when SSE
// connects. When false (e.g. 403 due to row-level access rules that can't be
// evaluated at wildcard subscription time), onInsert/onUpdate/onDelete skip
// awaitIds() — which relies on SSE events to populate seenIds — to avoid a
// 120 s timeout that would roll back every optimistic mutation.
let sseAvailable = false

const awaitIds = (
ids: Array<string>,
Expand Down Expand Up @@ -294,12 +300,25 @@ export function trailBaseCollectionOptions<
}

async function start() {
const eventStream = await config.recordApi.subscribe(`*`)
const reader = (eventReader = eventStream.getReader())

// Start listening for subscriptions first. Otherwise, we'd risk a gap
// between the initial fetch and starting to listen.
listen(reader)
// Attempt to subscribe to live updates. Some TrailBase configurations
// deny wildcard subscriptions (403) when table access rules use
// row-level predicates (_ROW_.*) that can't be evaluated without a
// concrete record. In that case we fall back to polling only.
let liveUpdatesAvailable = false
try {
const eventStream = await config.recordApi.subscribe(`*`)
const reader = (eventReader = eventStream.getReader())

// Start listening for subscriptions first. Otherwise, we'd risk a gap
// between the initial fetch and starting to listen.
listen(reader)
liveUpdatesAvailable = true
sseAvailable = true
} catch {
console.debug(
`[trailbase] subscribe/* unavailable — falling back to polling only`,
)
}

try {
// Eager mode: perform initial fetch to populate everything
Expand All @@ -309,14 +328,16 @@ export function trailBaseCollectionOptions<
fullSyncCompleted = true
}
} catch (e) {
cancelEventReader()
if (liveUpdatesAvailable) cancelEventReader()
throw e
} finally {
// Mark ready both if everything went well or if there's an error to
// avoid blocking apps waiting for `.preload()` to finish.
markReady()
}

if (!liveUpdatesAvailable) return

// Lastly, start a periodic cleanup task that will be removed when the
// reader closes.
const periodicCleanupTask = setInterval(() => {
Expand All @@ -337,7 +358,7 @@ export function trailBaseCollectionOptions<
})
}, 120 * 1000)

reader.closed.finally(() => clearInterval(periodicCleanupTask))
eventReader!.closed.finally(() => clearInterval(periodicCleanupTask))
}

start()
Expand Down Expand Up @@ -380,10 +401,13 @@ export function trailBaseCollectionOptions<
}),
)

// The optimistic mutation overlay is removed on return, so at this point
// we have to ensure that the new record was properly added to the local
// DB by the subscription.
await awaitIds(ids.map((id) => String(id)))
// When SSE is available: wait for the subscription event confirming the
// server has persisted the record before removing the optimistic overlay.
// When SSE is unavailable (polling-only mode): skip awaitIds — seenIds is
// never populated without SSE events, so waiting would time out after 120 s
// and roll back the optimistic insert. The collection will reflect server
// state after the next polling cycle.
if (sseAvailable) await awaitIds(ids.map((id) => String(id)))

return ids
},
Expand All @@ -401,10 +425,7 @@ export function trailBaseCollectionOptions<
}),
)

// The optimistic mutation overlay is removed on return, so at this point
// we have to ensure that the new record was properly updated in the local
// DB by the subscription.
await awaitIds(ids)
if (sseAvailable) await awaitIds(ids)
},
onDelete: async (params: DeleteMutationFnParams<TItem, TKey>) => {
const ids: Array<string> = await Promise.all(
Expand All @@ -419,10 +440,7 @@ export function trailBaseCollectionOptions<
}),
)

// The optimistic mutation overlay is removed on return, so at this point
// we have to ensure that the new record was properly updated in the local
// DB by the subscription.
await awaitIds(ids)
if (sseAvailable) await awaitIds(ids)
},
utils: {
cancel: cancelEventReader,
Expand Down