Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions packages/socket.io-adapter/lib/cluster-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ export abstract class ClusterAdapter extends Adapter {
opts: BroadcastOptions,
clientCountCallback: (clientCount: number) => void,
ack: (...args: any[]) => void,
) {
): { cleanup: () => void } {
const onlyLocal = opts?.flags?.local;
if (!onlyLocal) {
const requestId = randomId();
Expand All @@ -508,7 +508,7 @@ export abstract class ClusterAdapter extends Adapter {
}, opts.flags!.timeout);
}

super.broadcastWithAck(packet, opts, clientCountCallback, ack);
return super.broadcastWithAck(packet, opts, clientCountCallback, ack);
}

override async addSockets(opts: BroadcastOptions, rooms: Room[]) {
Expand Down
19 changes: 18 additions & 1 deletion packages/socket.io-adapter/lib/in-memory-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ export class Adapter extends EventEmitter {
opts: BroadcastOptions,
clientCountCallback: (clientCount: number) => void,
ack: (...args: any[]) => void,
) {
): { cleanup: () => void } {
const flags = opts.flags || {};
const packetOpts = {
preEncoded: true,
Expand All @@ -214,12 +214,15 @@ export class Adapter extends EventEmitter {
const encodedPackets = this._encode(packet, packetOpts);

let clientCount = 0;
const sentToSockets: any[] = [];

this.apply(opts, (socket) => {
// track the total number of acknowledgements that are expected
clientCount++;
// call the ack callback for each client response
socket.acks.set(packet.id, ack);
// track sockets for cleanup on timeout
sentToSockets.push(socket);

if (typeof socket.notifyOutgoingListeners === "function") {
socket.notifyOutgoingListeners(packet);
Expand All @@ -229,6 +232,20 @@ export class Adapter extends EventEmitter {
});

clientCountCallback(clientCount);

const packetId = packet.id;
return {
cleanup: () => {
// remove pending acks from all sockets on timeout
for (const socket of sentToSockets) {
if (socket.acks) {
socket.acks.delete(packetId);
}
}
// clear array to release socket references
sentToSockets.length = 0;
},
};
}

private _encode(packet: unknown, packetOpts: Record<string, unknown>) {
Expand Down
12 changes: 11 additions & 1 deletion packages/socket.io/lib/broadcast-operator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -232,9 +232,14 @@ export class BroadcastOperator<EmitEvents extends EventsMap, SocketData>
const ack = data.pop() as (...args: any[]) => void;
let timedOut = false;
let responses: any[] = [];
let cleanup: (() => void) | undefined;

const timer = setTimeout(() => {
timedOut = true;
// cleanup pending acks to prevent memory leak
if (cleanup) {
cleanup();
}
ack.apply(this, [
new Error("operation has timed out"),
this.flags.expectSingleResponse ? null : responses,
Expand All @@ -259,7 +264,7 @@ export class BroadcastOperator<EmitEvents extends EventsMap, SocketData>
}
};

this.adapter.broadcastWithAck(
const result = this.adapter.broadcastWithAck(
packet,
{
rooms: this.rooms,
Expand All @@ -279,6 +284,11 @@ export class BroadcastOperator<EmitEvents extends EventsMap, SocketData>
},
);

// store cleanup function for timeout handler
if (result && typeof result.cleanup === "function") {
cleanup = result.cleanup;
}

this.adapter.serverCount().then((serverCount) => {
expectedServerCount = serverCount;
checkCompleteness();
Expand Down
71 changes: 71 additions & 0 deletions packages/socket.io/test/socket-timeout.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,75 @@ describe("timeout", () => {
success(done, io, client);
});
});

it("should cleanup pending acks on broadcast timeout (memory leak fix)", (done) => {
const io = new Server(0);
const client = createClient(io, "/");

// Client does not acknowledge the event (simulates timeout scenario)
client.on("test-event", () => {
// intentionally not calling the callback
});

io.on("connection", async (socket) => {
socket.join("test-room");

// Get initial acks count (cast to any to access private property in test)
const initialAcksSize = (socket as any).acks.size;

try {
await io.timeout(50).to("test-room").emitWithAck("test-event", "data");
expect().fail("should have timed out");
} catch (err) {
expect(err).to.be.an(Error);

// After timeout, acks should be cleaned up (no memory leak)
// Wait a bit for cleanup to complete
setTimeout(() => {
expect((socket as any).acks.size).to.be(initialAcksSize);
success(done, io, client);
}, 10);
}
});
});

it("should cleanup pending acks on broadcast timeout with multiple clients", (done) => {
const io = new Server(0);
const client1 = createClient(io, "/");
const client2 = createClient(io, "/");

let connectedSockets: any[] = [];

// Clients do not acknowledge
client1.on("test-event", () => {});
client2.on("test-event", () => {});

io.on("connection", (socket) => {
socket.join("test-room");
connectedSockets.push(socket);

if (connectedSockets.length === 2) {
runTest();
}
});

async function runTest() {
const initialAcksSizes = connectedSockets.map((s) => s.acks.size);

try {
await io.timeout(50).to("test-room").emitWithAck("test-event", "data");
expect().fail("should have timed out");
} catch (err) {
expect(err).to.be.an(Error);

setTimeout(() => {
// All sockets should have their acks cleaned up
connectedSockets.forEach((socket, i) => {
expect(socket.acks.size).to.be(initialAcksSizes[i]);
});
success(done, io, client1, client2);
}, 10);
}
}
});
});