@@ -8,6 +8,7 @@ import type { OrchestrateStreamOptions } from '@/lib/copilot/orchestrator'
88import { orchestrateCopilotStream } from '@/lib/copilot/orchestrator'
99import {
1010 createStreamEventWriter ,
11+ getStreamMeta ,
1112 resetStreamBuffer ,
1213 setStreamMeta ,
1314} from '@/lib/copilot/orchestrator/stream/buffer'
@@ -66,15 +67,46 @@ function getStreamAbortKey(streamId: string): string {
6667 */
6768export async function waitForPendingChatStream (
6869 chatId : string ,
69- timeoutMs = 5_000
70+ timeoutMs = 5_000 ,
71+ expectedStreamId ?: string
7072) : Promise < boolean > {
71- const entry = pendingChatStreams . get ( chatId )
72- if ( ! entry ) return true
73+ const redis = getRedisClient ( )
74+ const deadline = Date . now ( ) + timeoutMs
75+
76+ for ( ; ; ) {
77+ const entry = pendingChatStreams . get ( chatId )
78+ const localPending = ! ! entry && ( ! expectedStreamId || entry . streamId === expectedStreamId )
79+
80+ if ( redis ) {
81+ try {
82+ const ownerStreamId = await redis . get ( getChatStreamLockKey ( chatId ) )
83+ const lockReleased =
84+ ! ownerStreamId || ( expectedStreamId !== undefined && ownerStreamId !== expectedStreamId )
85+ if ( ! localPending && lockReleased ) {
86+ return true
87+ }
88+ } catch ( error ) {
89+ logger . warn ( 'Failed to check distributed chat stream lock while waiting' , {
90+ chatId,
91+ expectedStreamId,
92+ error : error instanceof Error ? error . message : String ( error ) ,
93+ } )
94+ }
95+ } else if ( ! localPending ) {
96+ return true
97+ }
7398
74- return await Promise . race ( [
75- entry . promise . then ( ( ) => true ) ,
76- new Promise < boolean > ( ( r ) => setTimeout ( ( ) => r ( false ) , timeoutMs ) ) ,
77- ] )
99+ if ( Date . now ( ) >= deadline ) return false
100+ await new Promise ( ( resolve ) => setTimeout ( resolve , 200 ) )
101+ }
102+ }
103+
104+ export async function releasePendingChatStream ( chatId : string , streamId : string ) : Promise < void > {
105+ const redis = getRedisClient ( )
106+ if ( redis ) {
107+ await releaseLock ( getChatStreamLockKey ( chatId ) , streamId ) . catch ( ( ) => false )
108+ }
109+ resolvePendingChatStream ( chatId , streamId )
78110}
79111
80112export async function acquirePendingChatStream (
@@ -86,14 +118,36 @@ export async function acquirePendingChatStream(
86118 if ( redis ) {
87119 const deadline = Date . now ( ) + timeoutMs
88120 for ( ; ; ) {
89- const acquired = await acquireLock (
90- getChatStreamLockKey ( chatId ) ,
91- streamId ,
92- CHAT_STREAM_LOCK_TTL_SECONDS
93- )
94- if ( acquired ) {
95- registerPendingChatStream ( chatId , streamId )
96- return true
121+ try {
122+ const acquired = await acquireLock (
123+ getChatStreamLockKey ( chatId ) ,
124+ streamId ,
125+ CHAT_STREAM_LOCK_TTL_SECONDS
126+ )
127+ if ( acquired ) {
128+ registerPendingChatStream ( chatId , streamId )
129+ return true
130+ }
131+ if ( ! pendingChatStreams . has ( chatId ) ) {
132+ const ownerStreamId = await redis . get ( getChatStreamLockKey ( chatId ) )
133+ if ( ownerStreamId ) {
134+ const ownerMeta = await getStreamMeta ( ownerStreamId )
135+ const ownerTerminal =
136+ ownerMeta ?. status === 'complete' ||
137+ ownerMeta ?. status === 'error' ||
138+ ownerMeta ?. status === 'cancelled'
139+ if ( ownerTerminal ) {
140+ await releaseLock ( getChatStreamLockKey ( chatId ) , ownerStreamId ) . catch ( ( ) => false )
141+ continue
142+ }
143+ }
144+ }
145+ } catch ( error ) {
146+ logger . warn ( 'Distributed chat stream lock failed; retrying distributed coordination' , {
147+ chatId,
148+ streamId,
149+ error : error instanceof Error ? error . message : String ( error ) ,
150+ } )
97151 }
98152 if ( Date . now ( ) >= deadline ) return false
99153 await new Promise ( ( resolve ) => setTimeout ( resolve , 200 ) )
0 commit comments