Skip to content

Commit 4a3ffb7

Browse files
朱孟柱claude
andcommitted
fix(sse): fix zombie SSE connection memory leak
Root cause: adapter.bun.ts explicitly set idleTimeout: 0, disabling Bun's idle timeout entirely. When connections enter TCP CLOSE_WAIT (reverse proxy or Electron shell swallowing client FIN), the server has no signal that the connection is dead — three resource classes leak permanently: AsyncQueue grows without bound (14 MB/s, peak 24.5 GB), setInterval heartbeats keep ticking, Bus subscriptions are never cancelled. Five changes: - server/adapter.bun.ts: idleTimeout: 0 → 120. Heartbeat interval is 10s so active connections reset the timer on every beat and are never killed; dead connections are cleaned up within 120s - server/instance/event.ts: register c.req.raw.signal abort listener as a second cleanup path independent of responseReadable.cancel(). Hono skips this listener on Bun 1.2+ (isOldBunVersion gate); we add it unconditionally. writeSSE try/catch kept for forward compatibility — Hono 4.x write() has an empty catch so this block does not fire on the current version - server/instance/global.ts: same two changes applied to streamEvents() - util/queue.ts: add capacity limit (default 1000), drop oldest entry on overflow — defensive backstop so growth is bounded even if a cleanup signal is missed - bus/global.ts: setMaxListeners(100) — fixes #22422 MaxListenersExceededWarning; each SSE connection adds one listener, default limit of 10 is too low for normal concurrent usage Fixes #22198, fixes #22422 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 5b60e51 commit 4a3ffb7

5 files changed

Lines changed: 46 additions & 5 deletions

File tree

packages/opencode/src/bus/global.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,4 @@ export const GlobalBus = new EventEmitter<{
1010
},
1111
]
1212
}>()
13+
GlobalBus.setMaxListeners(100)

packages/opencode/src/server/adapter.bun.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,11 @@ export const adapter: Adapter = {
1111
const args = {
1212
fetch: app.fetch,
1313
hostname: opts.hostname,
14-
idleTimeout: 0,
14+
// Default is 10s which is too aggressive for SSE connections.
15+
// 0 disables the timeout entirely — dead connections (CLOSE_WAIT) are
16+
// never cleaned up, causing unbounded memory growth. 120s gives the
17+
// cleanup chain enough time to fire while still bounding leak duration.
18+
idleTimeout: 120,
1519
websocket: ws.websocket,
1620
} as const
1721
const start = (port: number) => {

packages/opencode/src/server/instance/event.ts

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,11 +75,25 @@ export const EventRoutes = () =>
7575
})
7676

7777
stream.onAbort(stop)
78+
// Second abort path: req.raw.signal fires before stream.onAbort on direct
79+
// connections (~2ms earlier), and provides an independent cleanup path when
80+
// the responseReadable.cancel() chain is broken (e.g. reverse proxy).
81+
// Hono only registers this on Bun 1.0/1.1 (isOldBunVersion gate); we add
82+
// it unconditionally so Bun 1.2+ is also covered.
83+
c.req.raw.signal.addEventListener("abort", stop)
7884

7985
try {
8086
for await (const data of q) {
8187
if (data === null) return
82-
await stream.writeSSE({ data })
88+
try {
89+
await stream.writeSSE({ data })
90+
} catch {
91+
// Hono 4.x StreamingApi.write() has an empty catch — this block
92+
// never fires on the current version. Kept for forward compatibility
93+
// in case a future Hono version propagates write errors.
94+
stop()
95+
return
96+
}
8397
}
8498
} finally {
8599
stop()

packages/opencode/src/server/instance/global.ts

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,25 @@ async function streamEvents(c: Context, subscribe: (q: AsyncQueue<string | null>
5757
const unsub = subscribe(q)
5858

5959
stream.onAbort(stop)
60+
// Second abort path: req.raw.signal fires before stream.onAbort on direct
61+
// connections (~2ms earlier), and provides an independent cleanup path when
62+
// the responseReadable.cancel() chain is broken (e.g. reverse proxy).
63+
// Hono only registers this on Bun 1.0/1.1 (isOldBunVersion gate); we add
64+
// it unconditionally so Bun 1.2+ is also covered.
65+
c.req.raw.signal.addEventListener("abort", stop)
6066

6167
try {
6268
for await (const data of q) {
6369
if (data === null) return
64-
await stream.writeSSE({ data })
70+
try {
71+
await stream.writeSSE({ data })
72+
} catch {
73+
// Hono 4.x StreamingApi.write() has an empty catch — this block
74+
// never fires on the current version. Kept for forward compatibility
75+
// in case a future Hono version propagates write errors.
76+
stop()
77+
return
78+
}
6579
}
6680
} finally {
6781
stop()

packages/opencode/src/util/queue.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,18 @@ export class AsyncQueue<T> implements AsyncIterable<T> {
22
private queue: T[] = []
33
private resolvers: ((value: T) => void)[] = []
44

5+
constructor(private limit = 1000) {}
6+
57
push(item: T) {
68
const resolve = this.resolvers.shift()
7-
if (resolve) resolve(item)
8-
else this.queue.push(item)
9+
if (resolve) {
10+
resolve(item)
11+
} else {
12+
if (this.queue.length >= this.limit) {
13+
this.queue.shift()
14+
}
15+
this.queue.push(item)
16+
}
917
}
1018

1119
async next(): Promise<T> {

0 commit comments

Comments
 (0)