From ddf96a84b7cdd70d5a6ddc2ae948a9729291a42a Mon Sep 17 00:00:00 2001 From: junaiddshaukat Date: Sat, 3 Jan 2026 12:53:03 +0500 Subject: [PATCH 1/5] Add LRU cache eviction to CachingStateProvider Fixes #37213 Implements LRU (Least Recently Used) cache eviction to prevent unbounded memory growth in long-running workers. Adds configurable maxCacheSize parameter (default: 1000 entries) and maintains LRU order using JavaScript Map's insertion order. - Add maxCacheSize constructor parameter with default value of 1000 - Implement evictIfNeeded() to remove oldest entry when cache is full - Implement touchCacheEntry() to move accessed items to end (LRU) - Add comprehensive test coverage in state_provider_test.ts This addresses the TODO comment in the code and improves reliability for production workloads. --- .../src/apache_beam/worker/state.ts | 38 ++- sdks/typescript/test/state_provider_test.ts | 253 ++++++++++++++++++ 2 files changed, 289 insertions(+), 2 deletions(-) create mode 100644 sdks/typescript/test/state_provider_test.ts diff --git a/sdks/typescript/src/apache_beam/worker/state.ts b/sdks/typescript/src/apache_beam/worker/state.ts index 5a340cbb64f0..354674ec9cc2 100644 --- a/sdks/typescript/src/apache_beam/worker/state.ts +++ b/sdks/typescript/src/apache_beam/worker/state.ts @@ -49,9 +49,37 @@ export interface StateProvider { export class CachingStateProvider implements StateProvider { underlying: StateProvider; cache: Map> = new Map(); + maxCacheSize: number; - constructor(underlying: StateProvider) { + constructor(underlying: StateProvider, maxCacheSize: number = 1000) { this.underlying = underlying; + this.maxCacheSize = maxCacheSize; + } + + /** + * Evicts the least recently used entry if the cache is at capacity. + * JavaScript Maps preserve insertion order, so the first entry is the oldest. + */ + private evictIfNeeded() { + if (this.cache.size >= this.maxCacheSize) { + // Remove the first (oldest) entry + const firstKey = this.cache.keys().next().value; + if (firstKey !== undefined) { + this.cache.delete(firstKey); + } + } + } + + /** + * Moves a cache entry to the end (most recently used) by deleting and re-adding it. + * This maintains LRU order: most recently accessed items are at the end. + */ + private touchCacheEntry(cacheKey: string) { + const value = this.cache.get(cacheKey); + if (value !== undefined) { + this.cache.delete(cacheKey); + this.cache.set(cacheKey, value); + } } getState(stateKey: fnApi.StateKey, decode: (data: Uint8Array) => T) { @@ -62,20 +90,26 @@ export class CachingStateProvider implements StateProvider { "base64", ); if (this.cache.has(cacheKey)) { + // Cache hit: move to end (most recently used) + this.touchCacheEntry(cacheKey); return this.cache.get(cacheKey)!; } + // Cache miss: fetch from underlying provider let result = this.underlying.getState(stateKey, decode); const this_ = this; if (result.type === "promise") { result = { type: "promise", promise: result.promise.then((value) => { + // When promise resolves, update cache with resolved value + this_.evictIfNeeded(); this_.cache.set(cacheKey, { type: "value", value }); return value; }), }; } - // TODO: (Perf) Cache eviction. + // Evict if needed before adding new entry + this.evictIfNeeded(); this.cache.set(cacheKey, result); return result; } diff --git a/sdks/typescript/test/state_provider_test.ts b/sdks/typescript/test/state_provider_test.ts new file mode 100644 index 000000000000..29c76739bdc5 --- /dev/null +++ b/sdks/typescript/test/state_provider_test.ts @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as assert from "assert"; +import { + CachingStateProvider, + StateProvider, + MaybePromise, +} from "../src/apache_beam/worker/state"; +import * as fnApi from "../src/apache_beam/proto/beam_fn_api"; + +/** + * Mock StateProvider for testing that tracks call counts. + */ +class MockStateProvider implements StateProvider { + callCount: number = 0; + values: Map = new Map(); + delayMs: number = 0; + + constructor(delayMs: number = 0) { + this.delayMs = delayMs; + } + + setValue(key: string, value: any) { + this.values.set(key, value); + } + + getState( + stateKey: fnApi.StateKey, + decode: (data: Uint8Array) => T, + ): MaybePromise { + this.callCount++; + const key = Buffer.from(fnApi.StateKey.toBinary(stateKey)).toString( + "base64", + ); + const value = this.values.get(key); + + if (this.delayMs > 0) { + return { + type: "promise", + promise: new Promise((resolve) => { + setTimeout(() => resolve(value), this.delayMs); + }), + }; + } else { + return { type: "value", value }; + } + } +} + +describe("CachingStateProvider", function () { + it("caches values and returns cached result on subsequent calls", function () { + const mockProvider = new MockStateProvider(); + const cache = new CachingStateProvider(mockProvider, 100); + + const stateKey: fnApi.StateKey = { + type: { + oneofKind: "bagUserState", + bagUserState: fnApi.StateKey_BagUserState.create({ + transformId: "test", + userStateId: "state1", + window: new Uint8Array(0), + key: new Uint8Array(0), + }), + }, + }; + + const decode = (data: Uint8Array) => data.toString(); + + // Set value in mock + const testValue = "cached_value"; + const key = Buffer.from(fnApi.StateKey.toBinary(stateKey)).toString( + "base64", + ); + mockProvider.setValue(key, testValue); + + // First call should hit underlying provider + const result1 = cache.getState(stateKey, decode); + assert.equal(mockProvider.callCount, 1); + assert.equal(result1.type, "value"); + if (result1.type === "value") { + assert.equal(result1.value, testValue); + } + + // Second call should use cache + const result2 = cache.getState(stateKey, decode); + assert.equal(mockProvider.callCount, 1); // Still 1, not 2 + assert.equal(result2.type, "value"); + if (result2.type === "value") { + assert.equal(result2.value, testValue); + } + }); + + it("evicts least recently used entry when cache is full", function () { + const mockProvider = new MockStateProvider(); + const cache = new CachingStateProvider(mockProvider, 3); // Small cache for testing + + const decode = (data: Uint8Array) => data.toString(); + + // Create 4 different state keys + const keys: fnApi.StateKey[] = []; + for (let i = 0; i < 4; i++) { + keys.push({ + type: { + oneofKind: "bagUserState", + bagUserState: fnApi.StateKey_BagUserState.create({ + transformId: "test", + userStateId: `state${i}`, + window: new Uint8Array(0), + key: new Uint8Array(0), + }), + }, + }); + } + + // Set values in mock + for (let i = 0; i < 4; i++) { + const key = Buffer.from(fnApi.StateKey.toBinary(keys[i])).toString( + "base64", + ); + mockProvider.setValue(key, `value${i}`); + } + + // Fill cache with 3 entries + cache.getState(keys[0], decode); + cache.getState(keys[1], decode); + cache.getState(keys[2], decode); + assert.equal(mockProvider.callCount, 3); + assert.equal(cache.cache.size, 3); + + // Access keys[0] to make it most recently used + cache.getState(keys[0], decode); + assert.equal(mockProvider.callCount, 3); // Still cached + + // Add 4th entry - should evict keys[1] (least recently used, not keys[0]) + cache.getState(keys[3], decode); + assert.equal(mockProvider.callCount, 4); + assert.equal(cache.cache.size, 3); // Still at max size + + // keys[1] should be evicted (not in cache) + const result1 = cache.getState(keys[1], decode); + assert.equal(mockProvider.callCount, 5); // Had to fetch again + assert.equal(result1.type, "value"); + if (result1.type === "value") { + assert.equal(result1.value, "value1"); + } + + // keys[0] should still be cached (was most recently used) + const result0 = cache.getState(keys[0], decode); + assert.equal(mockProvider.callCount, 5); // Still cached, no new call + assert.equal(result0.type, "value"); + if (result0.type === "value") { + assert.equal(result0.value, "value0"); + } + }); + + it("handles promise-based state fetches correctly", async function () { + const mockProvider = new MockStateProvider(10); // 10ms delay + const cache = new CachingStateProvider(mockProvider, 100); + + const stateKey: fnApi.StateKey = { + type: { + oneofKind: "bagUserState", + bagUserState: fnApi.StateKey_BagUserState.create({ + transformId: "test", + userStateId: "async_state", + window: new Uint8Array(0), + key: new Uint8Array(0), + }), + }, + }; + + const decode = (data: Uint8Array) => data.toString(); + const key = Buffer.from(fnApi.StateKey.toBinary(stateKey)).toString( + "base64", + ); + mockProvider.setValue(key, "async_value"); + + // First call returns promise + const result1 = cache.getState(stateKey, decode); + assert.equal(result1.type, "promise"); + assert.equal(mockProvider.callCount, 1); + + // Wait for promise to resolve + if (result1.type === "promise") { + const value1 = await result1.promise; + assert.equal(value1, "async_value"); + + // Second call should return cached value (not promise) + const result2 = cache.getState(stateKey, decode); + assert.equal(result2.type, "value"); + assert.equal(mockProvider.callCount, 1); // Still only 1 call + if (result2.type === "value") { + assert.equal(result2.value, "async_value"); + } + } + }); + + it("respects custom maxCacheSize", function () { + const mockProvider = new MockStateProvider(); + const cache = new CachingStateProvider(mockProvider, 2); // Very small cache + + const decode = (data: Uint8Array) => data.toString(); + + const keys: fnApi.StateKey[] = []; + for (let i = 0; i < 3; i++) { + keys.push({ + type: { + oneofKind: "bagUserState", + bagUserState: fnApi.StateKey_BagUserState.create({ + transformId: "test", + userStateId: `state${i}`, + window: new Uint8Array(0), + key: new Uint8Array(0), + }), + }, + }); + const key = Buffer.from(fnApi.StateKey.toBinary(keys[i])).toString( + "base64", + ); + mockProvider.setValue(key, `value${i}`); + } + + // Fill cache to max (2 entries) + cache.getState(keys[0], decode); + cache.getState(keys[1], decode); + assert.equal(cache.cache.size, 2); + + // Add 3rd entry - should evict first + cache.getState(keys[2], decode); + assert.equal(cache.cache.size, 2); // Still at max + + // First entry should be evicted + cache.getState(keys[0], decode); + assert.equal(mockProvider.callCount, 4); // Had to fetch keys[0] again + }); +}); + From 60e8de09ae305f9dae58d6d3a87c5ab0f284ea62 Mon Sep 17 00:00:00 2001 From: junaiddshaukat Date: Mon, 5 Jan 2026 23:58:48 +0500 Subject: [PATCH 2/5] Address review comments: size-based LRU eviction for CachingStateProvider - Fixed bug: removed incorrect evictIfNeeded() call in promise callback - Removed unnecessary this_ variable (arrow functions capture this) - Changed from count-based to size-based eviction (similar to Python statecache.py) - Added estimateSize() to calculate memory weight of cached values - Default cache weight: 100MB - Updated tests to work with weight-based eviction --- .../src/apache_beam/worker/state.ts | 98 ++++++++++++++++--- sdks/typescript/test/state_provider_test.ts | 54 ++++++++-- 2 files changed, 131 insertions(+), 21 deletions(-) diff --git a/sdks/typescript/src/apache_beam/worker/state.ts b/sdks/typescript/src/apache_beam/worker/state.ts index 354674ec9cc2..868c8bf5514d 100644 --- a/sdks/typescript/src/apache_beam/worker/state.ts +++ b/sdks/typescript/src/apache_beam/worker/state.ts @@ -46,25 +46,87 @@ export interface StateProvider { } // TODO: (Advanced) Cross-bundle caching. +/** + * Wrapper for cached values that tracks their weight (memory size). + */ +interface WeightedCacheEntry { + entry: MaybePromise; + weight: number; +} + +/** + * Estimates the memory size of a value in bytes. + * This is a simplified estimation - actual memory usage may vary. + */ +function estimateSize(value: any): number { + if (value === null || value === undefined) { + return 8; + } + + const type = typeof value; + + if (type === "boolean") { + return 4; + } + if (type === "number") { + return 8; + } + if (type === "string") { + // Each character is 2 bytes in JavaScript (UTF-16) + overhead + return 40 + value.length * 2; + } + if (value instanceof Uint8Array || value instanceof Buffer) { + return 40 + value.length; + } + if (Array.isArray(value)) { + let size = 40; // Array overhead + for (const item of value) { + size += estimateSize(item); + } + return size; + } + if (type === "object") { + let size = 40; // Object overhead + for (const key of Object.keys(value)) { + size += estimateSize(key) + estimateSize(value[key]); + } + return size; + } + + // Default for unknown types + return 64; +} + +// Default cache size: 100MB +const DEFAULT_MAX_CACHE_WEIGHT = 100 * 1024 * 1024; + export class CachingStateProvider implements StateProvider { underlying: StateProvider; - cache: Map> = new Map(); - maxCacheSize: number; + cache: Map> = new Map(); + maxCacheWeight: number; + currentWeight: number = 0; - constructor(underlying: StateProvider, maxCacheSize: number = 1000) { + constructor( + underlying: StateProvider, + maxCacheWeight: number = DEFAULT_MAX_CACHE_WEIGHT, + ) { this.underlying = underlying; - this.maxCacheSize = maxCacheSize; + this.maxCacheWeight = maxCacheWeight; } /** - * Evicts the least recently used entry if the cache is at capacity. + * Evicts least recently used entries until the cache is under the weight limit. * JavaScript Maps preserve insertion order, so the first entry is the oldest. */ private evictIfNeeded() { - if (this.cache.size >= this.maxCacheSize) { + while (this.currentWeight > this.maxCacheWeight && this.cache.size > 0) { // Remove the first (oldest) entry const firstKey = this.cache.keys().next().value; if (firstKey !== undefined) { + const evicted = this.cache.get(firstKey); + if (evicted !== undefined) { + this.currentWeight -= evicted.weight; + } this.cache.delete(firstKey); } } @@ -92,25 +154,39 @@ export class CachingStateProvider implements StateProvider { if (this.cache.has(cacheKey)) { // Cache hit: move to end (most recently used) this.touchCacheEntry(cacheKey); - return this.cache.get(cacheKey)!; + return this.cache.get(cacheKey)!.entry; } // Cache miss: fetch from underlying provider let result = this.underlying.getState(stateKey, decode); - const this_ = this; if (result.type === "promise") { result = { type: "promise", promise: result.promise.then((value) => { // When promise resolves, update cache with resolved value - this_.evictIfNeeded(); - this_.cache.set(cacheKey, { type: "value", value }); + // Get the current entry to update its weight + const currentEntry = this.cache.get(cacheKey); + if (currentEntry !== undefined) { + // Remove old weight from total + this.currentWeight -= currentEntry.weight; + } + const resolvedWeight = estimateSize(value); + this.cache.set(cacheKey, { + entry: { type: "value", value }, + weight: resolvedWeight, + }); + this.currentWeight += resolvedWeight; + this.evictIfNeeded(); return value; }), }; } + // Estimate weight for the new entry + const weight = + result.type === "value" ? estimateSize(result.value) : 64; // Promise placeholder weight // Evict if needed before adding new entry + this.currentWeight += weight; this.evictIfNeeded(); - this.cache.set(cacheKey, result); + this.cache.set(cacheKey, { entry: result, weight }); return result; } } diff --git a/sdks/typescript/test/state_provider_test.ts b/sdks/typescript/test/state_provider_test.ts index 29c76739bdc5..e830754129ab 100644 --- a/sdks/typescript/test/state_provider_test.ts +++ b/sdks/typescript/test/state_provider_test.ts @@ -66,7 +66,8 @@ class MockStateProvider implements StateProvider { describe("CachingStateProvider", function () { it("caches values and returns cached result on subsequent calls", function () { const mockProvider = new MockStateProvider(); - const cache = new CachingStateProvider(mockProvider, 100); + // Use large weight limit to ensure no eviction for this test + const cache = new CachingStateProvider(mockProvider, 10 * 1024); const stateKey: fnApi.StateKey = { type: { @@ -106,9 +107,11 @@ describe("CachingStateProvider", function () { } }); - it("evicts least recently used entry when cache is full", function () { + it("evicts least recently used entry when cache weight exceeds limit", function () { const mockProvider = new MockStateProvider(); - const cache = new CachingStateProvider(mockProvider, 3); // Small cache for testing + // Each small string "valueX" is approximately 52 bytes (40 + 6*2) + // Set weight limit to hold approximately 3 entries + const cache = new CachingStateProvider(mockProvider, 180); const decode = (data: Uint8Array) => data.toString(); @@ -150,7 +153,6 @@ describe("CachingStateProvider", function () { // Add 4th entry - should evict keys[1] (least recently used, not keys[0]) cache.getState(keys[3], decode); assert.equal(mockProvider.callCount, 4); - assert.equal(cache.cache.size, 3); // Still at max size // keys[1] should be evicted (not in cache) const result1 = cache.getState(keys[1], decode); @@ -171,7 +173,8 @@ describe("CachingStateProvider", function () { it("handles promise-based state fetches correctly", async function () { const mockProvider = new MockStateProvider(10); // 10ms delay - const cache = new CachingStateProvider(mockProvider, 100); + // Use large weight limit to ensure no eviction for this test + const cache = new CachingStateProvider(mockProvider, 10 * 1024); const stateKey: fnApi.StateKey = { type: { @@ -211,9 +214,10 @@ describe("CachingStateProvider", function () { } }); - it("respects custom maxCacheSize", function () { + it("respects custom maxCacheWeight and evicts based on memory size", function () { const mockProvider = new MockStateProvider(); - const cache = new CachingStateProvider(mockProvider, 2); // Very small cache + // Set weight limit to hold approximately 2 small string entries + const cache = new CachingStateProvider(mockProvider, 120); const decode = (data: Uint8Array) => data.toString(); @@ -236,18 +240,48 @@ describe("CachingStateProvider", function () { mockProvider.setValue(key, `value${i}`); } - // Fill cache to max (2 entries) + // Fill cache with 2 entries cache.getState(keys[0], decode); cache.getState(keys[1], decode); assert.equal(cache.cache.size, 2); - // Add 3rd entry - should evict first + // Add 3rd entry - should evict oldest to stay under weight limit cache.getState(keys[2], decode); - assert.equal(cache.cache.size, 2); // Still at max // First entry should be evicted cache.getState(keys[0], decode); assert.equal(mockProvider.callCount, 4); // Had to fetch keys[0] again }); + + it("tracks cache weight correctly", function () { + const mockProvider = new MockStateProvider(); + const cache = new CachingStateProvider(mockProvider, 10 * 1024); + + const decode = (data: Uint8Array) => data.toString(); + + const stateKey: fnApi.StateKey = { + type: { + oneofKind: "bagUserState", + bagUserState: fnApi.StateKey_BagUserState.create({ + transformId: "test", + userStateId: "state1", + window: new Uint8Array(0), + key: new Uint8Array(0), + }), + }, + }; + + const key = Buffer.from(fnApi.StateKey.toBinary(stateKey)).toString( + "base64", + ); + mockProvider.setValue(key, "test_value"); + + // Cache should start with 0 weight + assert.equal(cache.currentWeight, 0); + + // After adding an entry, weight should increase + cache.getState(stateKey, decode); + assert.ok(cache.currentWeight > 0); + }); }); From 3a49569d062700c15719b5067c1130faa02c7163 Mon Sep 17 00:00:00 2001 From: junaiddshaukat Date: Tue, 6 Jan 2026 00:05:56 +0500 Subject: [PATCH 3/5] Fix prettier formatting --- sdks/typescript/src/apache_beam/worker/state.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdks/typescript/src/apache_beam/worker/state.ts b/sdks/typescript/src/apache_beam/worker/state.ts index 868c8bf5514d..dee7a866c2c7 100644 --- a/sdks/typescript/src/apache_beam/worker/state.ts +++ b/sdks/typescript/src/apache_beam/worker/state.ts @@ -181,8 +181,7 @@ export class CachingStateProvider implements StateProvider { }; } // Estimate weight for the new entry - const weight = - result.type === "value" ? estimateSize(result.value) : 64; // Promise placeholder weight + const weight = result.type === "value" ? estimateSize(result.value) : 64; // Promise placeholder weight // Evict if needed before adding new entry this.currentWeight += weight; this.evictIfNeeded(); From de22dce45d2b52f83316ece9b497fa4b87e9e822 Mon Sep 17 00:00:00 2001 From: junaiddshaukat Date: Wed, 14 Jan 2026 22:55:41 +0500 Subject: [PATCH 4/5] Address review comments: circular references, eviction ordering, tests - Fixed sizeof function to handle circular references using visited Set - Fixed eviction ordering: add to cache first, then evict (fixes edge case) - Added test for oversized item that exceeds maxCacheWeight - Implemented custom sizeof instead of object-sizeof package (has Node.js compatibility issues) --- sdks/typescript/package-lock.json | 90 ++++++++----------- .../src/apache_beam/worker/state.ts | 47 ++++++---- sdks/typescript/test/state_provider_test.ts | 43 ++++++++- 3 files changed, 110 insertions(+), 70 deletions(-) diff --git a/sdks/typescript/package-lock.json b/sdks/typescript/package-lock.json index 76f314ad490c..29918b01ab80 100644 --- a/sdks/typescript/package-lock.json +++ b/sdks/typescript/package-lock.json @@ -49,7 +49,6 @@ "version": "0.8.0", "resolved": "https://registry.npmjs.org/@cspotcode/source-map-consumer/-/source-map-consumer-0.8.0.tgz", "integrity": "sha512-41qniHzTU8yAGbCp04ohlmSrZf8bkf/iJsl3V0dRGsQN/5GFfx+LbCSsCpp2gqrqjTVg/K6O8ycoV35JIwAzAg==", - "peer": true, "engines": { "node": ">= 12" } @@ -58,7 +57,6 @@ "version": "0.7.0", "resolved": "https://registry.npmjs.org/@cspotcode/source-map-support/-/source-map-support-0.7.0.tgz", "integrity": "sha512-X4xqRHqN8ACt2aHVe51OxeA2HjbcL4MqFqXkrmQszJ1NOUuUu5u6Vqx/0lZSVNku7velL5FC/s5uEAj1lsBMhA==", - "peer": true, "dependencies": { "@cspotcode/source-map-consumer": "0.8.0" }, @@ -194,6 +192,7 @@ "version": "1.4.6", "resolved": "https://registry.npmjs.org/@grpc/grpc-js/-/grpc-js-1.4.6.tgz", "integrity": "sha512-Byau4xiXfIixb1PnW30V/P9mkrZ05lknyNqiK+cVY9J5hj3gecxd/anwaUbAM8j834zg1x78NvAbwGnMfWEu7A==", + "peer": true, "dependencies": { "@grpc/proto-loader": "^0.6.4", "@types/node": ">=12.12.47" @@ -553,26 +552,22 @@ "node_modules/@tsconfig/node10": { "version": "1.0.8", "resolved": "https://registry.npmjs.org/@tsconfig/node10/-/node10-1.0.8.tgz", - "integrity": "sha512-6XFfSQmMgq0CFLY1MslA/CPUfhIL919M1rMsa5lP2P097N2Wd1sSX0tx1u4olM16fLNhtHZpRhedZJphNJqmZg==", - "peer": true + "integrity": "sha512-6XFfSQmMgq0CFLY1MslA/CPUfhIL919M1rMsa5lP2P097N2Wd1sSX0tx1u4olM16fLNhtHZpRhedZJphNJqmZg==" }, "node_modules/@tsconfig/node12": { "version": "1.0.9", "resolved": "https://registry.npmjs.org/@tsconfig/node12/-/node12-1.0.9.tgz", - "integrity": "sha512-/yBMcem+fbvhSREH+s14YJi18sp7J9jpuhYByADT2rypfajMZZN4WQ6zBGgBKp53NKmqI36wFYDb3yaMPurITw==", - "peer": true + "integrity": "sha512-/yBMcem+fbvhSREH+s14YJi18sp7J9jpuhYByADT2rypfajMZZN4WQ6zBGgBKp53NKmqI36wFYDb3yaMPurITw==" }, "node_modules/@tsconfig/node14": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/@tsconfig/node14/-/node14-1.0.1.tgz", - "integrity": "sha512-509r2+yARFfHHE7T6Puu2jjkoycftovhXRqW328PDXTVGKihlb1P8Z9mMZH04ebyajfRY7dedfGynlrFHJUQCg==", - "peer": true + "integrity": "sha512-509r2+yARFfHHE7T6Puu2jjkoycftovhXRqW328PDXTVGKihlb1P8Z9mMZH04ebyajfRY7dedfGynlrFHJUQCg==" }, "node_modules/@tsconfig/node16": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/@tsconfig/node16/-/node16-1.0.2.tgz", - "integrity": "sha512-eZxlbI8GZscaGS7kkc/trHTT5xgrjH3/1n2JDwusC9iahPKWMRvRjJSAN5mCXviuTGQ/lHnhvv8Q1YTpnfz9gA==", - "peer": true + "integrity": "sha512-eZxlbI8GZscaGS7kkc/trHTT5xgrjH3/1n2JDwusC9iahPKWMRvRjJSAN5mCXviuTGQ/lHnhvv8Q1YTpnfz9gA==" }, "node_modules/@types/duplexify": { "version": "3.6.1", @@ -602,7 +597,8 @@ "node_modules/@types/node": { "version": "17.0.8", "resolved": "https://registry.npmjs.org/@types/node/-/node-17.0.8.tgz", - "integrity": "sha512-YofkM6fGv4gDJq78g4j0mMuGMkZVxZDgtU0JRdx6FgiJDG+0fY0GKVolOV8WqVmEhLCXkQRjwDdKyPxJp/uucg==" + "integrity": "sha512-YofkM6fGv4gDJq78g4j0mMuGMkZVxZDgtU0JRdx6FgiJDG+0fY0GKVolOV8WqVmEhLCXkQRjwDdKyPxJp/uucg==", + "peer": true }, "node_modules/@typescript-eslint/eslint-plugin": { "version": "5.24.0", @@ -642,6 +638,7 @@ "resolved": "https://registry.npmjs.org/@typescript-eslint/parser/-/parser-5.24.0.tgz", "integrity": "sha512-4q29C6xFYZ5B2CXqSBBdcS0lPyfM9M09DoQLtHS5kf+WbpV8pBBhHDLNhXfgyVwFnhrhYzOu7xmg02DzxeF2Uw==", "dev": true, + "peer": true, "dependencies": { "@typescript-eslint/scope-manager": "5.24.0", "@typescript-eslint/types": "5.24.0", @@ -809,6 +806,7 @@ "version": "8.7.1", "resolved": "https://registry.npmjs.org/acorn/-/acorn-8.7.1.tgz", "integrity": "sha512-Xx54uLJQZ19lKygFXOWsscKUbsBZW0CPykPhVQdhIeIwrbPmJzqeASDInc8nKBnp/JT6igTs82qPXz069H8I/A==", + "peer": true, "bin": { "acorn": "bin/acorn" }, @@ -829,7 +827,6 @@ "version": "8.2.0", "resolved": "https://registry.npmjs.org/acorn-walk/-/acorn-walk-8.2.0.tgz", "integrity": "sha512-k+iyHEuPgSw6SbuDpGQM+06HQUa04DZ3o+F6CSzXMvvI5KMvnaEqXe+YVe555R9nn6GPt404fos4wcgpw12SDA==", - "peer": true, "engines": { "node": ">=0.4.0" } @@ -919,8 +916,7 @@ "node_modules/arg": { "version": "4.1.3", "resolved": "https://registry.npmjs.org/arg/-/arg-4.1.3.tgz", - "integrity": "sha512-58S9QDqG0Xx27YwPSt9fJxivjYl432YCwfDMfZ+71RAqUrZef7LrKQZ3LHLOwCS4FLNBplP533Zx895SeOCHvA==", - "peer": true + "integrity": "sha512-58S9QDqG0Xx27YwPSt9fJxivjYl432YCwfDMfZ+71RAqUrZef7LrKQZ3LHLOwCS4FLNBplP533Zx895SeOCHvA==" }, "node_modules/argle": { "version": "1.1.1", @@ -1294,8 +1290,7 @@ "node_modules/create-require": { "version": "1.1.1", "resolved": "https://registry.npmjs.org/create-require/-/create-require-1.1.1.tgz", - "integrity": "sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ==", - "peer": true + "integrity": "sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ==" }, "node_modules/cross-spawn": { "version": "7.0.3", @@ -1526,6 +1521,7 @@ "resolved": "https://registry.npmjs.org/eslint/-/eslint-8.15.0.tgz", "integrity": "sha512-GG5USZ1jhCu8HJkzGgeK8/+RGnHaNYZGrGDzUtigK3BsGESW/rs2az23XqE0WVwDxy1VRvvjSSGu5nB0Bu+6SA==", "dev": true, + "peer": true, "dependencies": { "@eslint/eslintrc": "^1.2.3", "@humanwhocodes/config-array": "^0.9.2", @@ -2874,8 +2870,7 @@ "node_modules/make-error": { "version": "1.3.6", "resolved": "https://registry.npmjs.org/make-error/-/make-error-1.3.6.tgz", - "integrity": "sha512-s8UhlNe7vPKomQhC1qFelMokr/Sc3AgNbso3n74mVPA5LTZwkB9NlXf4XPamLxJE8h0gh73rM94xvwRT2CVInw==", - "peer": true + "integrity": "sha512-s8UhlNe7vPKomQhC1qFelMokr/Sc3AgNbso3n74mVPA5LTZwkB9NlXf4XPamLxJE8h0gh73rM94xvwRT2CVInw==" }, "node_modules/marked": { "version": "4.2.5", @@ -2957,6 +2952,7 @@ "integrity": "sha512-8uJR5RTC2NgpY3GrYcgpZrsEd9zKbPDpob1RezyR2upGHRQtHWofmzTMzTMSV6dru3tj5Ukt0+Vnq1qhFEEwAg==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "ansi-colors": "^4.1.3", "browser-stdout": "^1.3.1", @@ -3890,7 +3886,6 @@ "version": "10.7.0", "resolved": "https://registry.npmjs.org/ts-node/-/ts-node-10.7.0.tgz", "integrity": "sha512-TbIGS4xgJoX2i3do417KSaep1uRAW/Lu+WAL2doDHC0D6ummjirVOXU5/7aiZotbQ5p1Zp9tP7U6cYhA0O7M8A==", - "peer": true, "dependencies": { "@cspotcode/source-map-support": "0.7.0", "@tsconfig/node10": "^1.0.7", @@ -3933,7 +3928,6 @@ "version": "4.0.2", "resolved": "https://registry.npmjs.org/diff/-/diff-4.0.2.tgz", "integrity": "sha512-58lmxKSA4BNyLz+HHMUzlOEpg09FV+ev6ZMe3vJihgdxzgcwZ8VoEEPmALCZG9LmqfVoNMMKpttIYTVG6uDY7A==", - "peer": true, "engines": { "node": ">=0.3.1" } @@ -4069,6 +4063,7 @@ "version": "4.7.4", "resolved": "https://registry.npmjs.org/typescript/-/typescript-4.7.4.tgz", "integrity": "sha512-C0WQT0gezHuw6AdY1M2jxUO83Rjf0HP7Sk1DtXj6j1EwkQNZrHAg2XPWlq62oqEhYvONq5pkC2Y9oPljWToLmQ==", + "peer": true, "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" @@ -4139,8 +4134,7 @@ "node_modules/v8-compile-cache-lib": { "version": "3.0.0", "resolved": "https://registry.npmjs.org/v8-compile-cache-lib/-/v8-compile-cache-lib-3.0.0.tgz", - "integrity": "sha512-mpSYqfsFvASnSn5qMiwrr4VKfumbPyONLCOPmsR3A6pTY/r0+tSaVbgPWSAIuzbk3lCTa+FForeTiO+wBQGkjA==", - "peer": true + "integrity": "sha512-mpSYqfsFvASnSn5qMiwrr4VKfumbPyONLCOPmsR3A6pTY/r0+tSaVbgPWSAIuzbk3lCTa+FForeTiO+wBQGkjA==" }, "node_modules/vscode-oniguruma": { "version": "1.7.0", @@ -4302,7 +4296,6 @@ "version": "3.1.1", "resolved": "https://registry.npmjs.org/yn/-/yn-3.1.1.tgz", "integrity": "sha512-Ux4ygGWsu2c7isFWe8Yu1YluJmqVhxqK2cLXNQA5AcC3QfbGNpM7fu0Y8b/z16pXLnFxZYvWhd3fhBY9DLmC6Q==", - "peer": true, "engines": { "node": ">=6" } @@ -4324,14 +4317,12 @@ "@cspotcode/source-map-consumer": { "version": "0.8.0", "resolved": "https://registry.npmjs.org/@cspotcode/source-map-consumer/-/source-map-consumer-0.8.0.tgz", - "integrity": "sha512-41qniHzTU8yAGbCp04ohlmSrZf8bkf/iJsl3V0dRGsQN/5GFfx+LbCSsCpp2gqrqjTVg/K6O8ycoV35JIwAzAg==", - "peer": true + "integrity": "sha512-41qniHzTU8yAGbCp04ohlmSrZf8bkf/iJsl3V0dRGsQN/5GFfx+LbCSsCpp2gqrqjTVg/K6O8ycoV35JIwAzAg==" }, "@cspotcode/source-map-support": { "version": "0.7.0", "resolved": "https://registry.npmjs.org/@cspotcode/source-map-support/-/source-map-support-0.7.0.tgz", "integrity": "sha512-X4xqRHqN8ACt2aHVe51OxeA2HjbcL4MqFqXkrmQszJ1NOUuUu5u6Vqx/0lZSVNku7velL5FC/s5uEAj1lsBMhA==", - "peer": true, "requires": { "@cspotcode/source-map-consumer": "0.8.0" } @@ -4440,6 +4431,7 @@ "version": "1.4.6", "resolved": "https://registry.npmjs.org/@grpc/grpc-js/-/grpc-js-1.4.6.tgz", "integrity": "sha512-Byau4xiXfIixb1PnW30V/P9mkrZ05lknyNqiK+cVY9J5hj3gecxd/anwaUbAM8j834zg1x78NvAbwGnMfWEu7A==", + "peer": true, "requires": { "@grpc/proto-loader": "^0.6.4", "@types/node": ">=12.12.47" @@ -4707,26 +4699,22 @@ "@tsconfig/node10": { "version": "1.0.8", "resolved": "https://registry.npmjs.org/@tsconfig/node10/-/node10-1.0.8.tgz", - "integrity": "sha512-6XFfSQmMgq0CFLY1MslA/CPUfhIL919M1rMsa5lP2P097N2Wd1sSX0tx1u4olM16fLNhtHZpRhedZJphNJqmZg==", - "peer": true + "integrity": "sha512-6XFfSQmMgq0CFLY1MslA/CPUfhIL919M1rMsa5lP2P097N2Wd1sSX0tx1u4olM16fLNhtHZpRhedZJphNJqmZg==" }, "@tsconfig/node12": { "version": "1.0.9", "resolved": "https://registry.npmjs.org/@tsconfig/node12/-/node12-1.0.9.tgz", - "integrity": "sha512-/yBMcem+fbvhSREH+s14YJi18sp7J9jpuhYByADT2rypfajMZZN4WQ6zBGgBKp53NKmqI36wFYDb3yaMPurITw==", - "peer": true + "integrity": "sha512-/yBMcem+fbvhSREH+s14YJi18sp7J9jpuhYByADT2rypfajMZZN4WQ6zBGgBKp53NKmqI36wFYDb3yaMPurITw==" }, "@tsconfig/node14": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/@tsconfig/node14/-/node14-1.0.1.tgz", - "integrity": "sha512-509r2+yARFfHHE7T6Puu2jjkoycftovhXRqW328PDXTVGKihlb1P8Z9mMZH04ebyajfRY7dedfGynlrFHJUQCg==", - "peer": true + "integrity": "sha512-509r2+yARFfHHE7T6Puu2jjkoycftovhXRqW328PDXTVGKihlb1P8Z9mMZH04ebyajfRY7dedfGynlrFHJUQCg==" }, "@tsconfig/node16": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/@tsconfig/node16/-/node16-1.0.2.tgz", - "integrity": "sha512-eZxlbI8GZscaGS7kkc/trHTT5xgrjH3/1n2JDwusC9iahPKWMRvRjJSAN5mCXviuTGQ/lHnhvv8Q1YTpnfz9gA==", - "peer": true + "integrity": "sha512-eZxlbI8GZscaGS7kkc/trHTT5xgrjH3/1n2JDwusC9iahPKWMRvRjJSAN5mCXviuTGQ/lHnhvv8Q1YTpnfz9gA==" }, "@types/duplexify": { "version": "3.6.1", @@ -4756,7 +4744,8 @@ "@types/node": { "version": "17.0.8", "resolved": "https://registry.npmjs.org/@types/node/-/node-17.0.8.tgz", - "integrity": "sha512-YofkM6fGv4gDJq78g4j0mMuGMkZVxZDgtU0JRdx6FgiJDG+0fY0GKVolOV8WqVmEhLCXkQRjwDdKyPxJp/uucg==" + "integrity": "sha512-YofkM6fGv4gDJq78g4j0mMuGMkZVxZDgtU0JRdx6FgiJDG+0fY0GKVolOV8WqVmEhLCXkQRjwDdKyPxJp/uucg==", + "peer": true }, "@typescript-eslint/eslint-plugin": { "version": "5.24.0", @@ -4780,6 +4769,7 @@ "resolved": "https://registry.npmjs.org/@typescript-eslint/parser/-/parser-5.24.0.tgz", "integrity": "sha512-4q29C6xFYZ5B2CXqSBBdcS0lPyfM9M09DoQLtHS5kf+WbpV8pBBhHDLNhXfgyVwFnhrhYzOu7xmg02DzxeF2Uw==", "dev": true, + "peer": true, "requires": { "@typescript-eslint/scope-manager": "5.24.0", "@typescript-eslint/types": "5.24.0", @@ -4870,7 +4860,8 @@ "acorn": { "version": "8.7.1", "resolved": "https://registry.npmjs.org/acorn/-/acorn-8.7.1.tgz", - "integrity": "sha512-Xx54uLJQZ19lKygFXOWsscKUbsBZW0CPykPhVQdhIeIwrbPmJzqeASDInc8nKBnp/JT6igTs82qPXz069H8I/A==" + "integrity": "sha512-Xx54uLJQZ19lKygFXOWsscKUbsBZW0CPykPhVQdhIeIwrbPmJzqeASDInc8nKBnp/JT6igTs82qPXz069H8I/A==", + "peer": true }, "acorn-jsx": { "version": "5.3.2", @@ -4882,8 +4873,7 @@ "acorn-walk": { "version": "8.2.0", "resolved": "https://registry.npmjs.org/acorn-walk/-/acorn-walk-8.2.0.tgz", - "integrity": "sha512-k+iyHEuPgSw6SbuDpGQM+06HQUa04DZ3o+F6CSzXMvvI5KMvnaEqXe+YVe555R9nn6GPt404fos4wcgpw12SDA==", - "peer": true + "integrity": "sha512-k+iyHEuPgSw6SbuDpGQM+06HQUa04DZ3o+F6CSzXMvvI5KMvnaEqXe+YVe555R9nn6GPt404fos4wcgpw12SDA==" }, "agent-base": { "version": "6.0.2", @@ -4944,8 +4934,7 @@ "arg": { "version": "4.1.3", "resolved": "https://registry.npmjs.org/arg/-/arg-4.1.3.tgz", - "integrity": "sha512-58S9QDqG0Xx27YwPSt9fJxivjYl432YCwfDMfZ+71RAqUrZef7LrKQZ3LHLOwCS4FLNBplP533Zx895SeOCHvA==", - "peer": true + "integrity": "sha512-58S9QDqG0Xx27YwPSt9fJxivjYl432YCwfDMfZ+71RAqUrZef7LrKQZ3LHLOwCS4FLNBplP533Zx895SeOCHvA==" }, "argle": { "version": "1.1.1", @@ -5213,8 +5202,7 @@ "create-require": { "version": "1.1.1", "resolved": "https://registry.npmjs.org/create-require/-/create-require-1.1.1.tgz", - "integrity": "sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ==", - "peer": true + "integrity": "sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ==" }, "cross-spawn": { "version": "7.0.3", @@ -5376,6 +5364,7 @@ "resolved": "https://registry.npmjs.org/eslint/-/eslint-8.15.0.tgz", "integrity": "sha512-GG5USZ1jhCu8HJkzGgeK8/+RGnHaNYZGrGDzUtigK3BsGESW/rs2az23XqE0WVwDxy1VRvvjSSGu5nB0Bu+6SA==", "dev": true, + "peer": true, "requires": { "@eslint/eslintrc": "^1.2.3", "@humanwhocodes/config-array": "^0.9.2", @@ -6402,8 +6391,7 @@ "make-error": { "version": "1.3.6", "resolved": "https://registry.npmjs.org/make-error/-/make-error-1.3.6.tgz", - "integrity": "sha512-s8UhlNe7vPKomQhC1qFelMokr/Sc3AgNbso3n74mVPA5LTZwkB9NlXf4XPamLxJE8h0gh73rM94xvwRT2CVInw==", - "peer": true + "integrity": "sha512-s8UhlNe7vPKomQhC1qFelMokr/Sc3AgNbso3n74mVPA5LTZwkB9NlXf4XPamLxJE8h0gh73rM94xvwRT2CVInw==" }, "marked": { "version": "4.2.5", @@ -6462,6 +6450,7 @@ "resolved": "https://registry.npmjs.org/mocha/-/mocha-11.1.0.tgz", "integrity": "sha512-8uJR5RTC2NgpY3GrYcgpZrsEd9zKbPDpob1RezyR2upGHRQtHWofmzTMzTMSV6dru3tj5Ukt0+Vnq1qhFEEwAg==", "dev": true, + "peer": true, "requires": { "ansi-colors": "^4.1.3", "browser-stdout": "^1.3.1", @@ -7113,7 +7102,6 @@ "version": "10.7.0", "resolved": "https://registry.npmjs.org/ts-node/-/ts-node-10.7.0.tgz", "integrity": "sha512-TbIGS4xgJoX2i3do417KSaep1uRAW/Lu+WAL2doDHC0D6ummjirVOXU5/7aiZotbQ5p1Zp9tP7U6cYhA0O7M8A==", - "peer": true, "requires": { "@cspotcode/source-map-support": "0.7.0", "@tsconfig/node10": "^1.0.7", @@ -7133,8 +7121,7 @@ "diff": { "version": "4.0.2", "resolved": "https://registry.npmjs.org/diff/-/diff-4.0.2.tgz", - "integrity": "sha512-58lmxKSA4BNyLz+HHMUzlOEpg09FV+ev6ZMe3vJihgdxzgcwZ8VoEEPmALCZG9LmqfVoNMMKpttIYTVG6uDY7A==", - "peer": true + "integrity": "sha512-58lmxKSA4BNyLz+HHMUzlOEpg09FV+ev6ZMe3vJihgdxzgcwZ8VoEEPmALCZG9LmqfVoNMMKpttIYTVG6uDY7A==" } } }, @@ -7228,7 +7215,8 @@ "typescript": { "version": "4.7.4", "resolved": "https://registry.npmjs.org/typescript/-/typescript-4.7.4.tgz", - "integrity": "sha512-C0WQT0gezHuw6AdY1M2jxUO83Rjf0HP7Sk1DtXj6j1EwkQNZrHAg2XPWlq62oqEhYvONq5pkC2Y9oPljWToLmQ==" + "integrity": "sha512-C0WQT0gezHuw6AdY1M2jxUO83Rjf0HP7Sk1DtXj6j1EwkQNZrHAg2XPWlq62oqEhYvONq5pkC2Y9oPljWToLmQ==", + "peer": true }, "uglify-js": { "version": "3.15.1", @@ -7282,8 +7270,7 @@ "v8-compile-cache-lib": { "version": "3.0.0", "resolved": "https://registry.npmjs.org/v8-compile-cache-lib/-/v8-compile-cache-lib-3.0.0.tgz", - "integrity": "sha512-mpSYqfsFvASnSn5qMiwrr4VKfumbPyONLCOPmsR3A6pTY/r0+tSaVbgPWSAIuzbk3lCTa+FForeTiO+wBQGkjA==", - "peer": true + "integrity": "sha512-mpSYqfsFvASnSn5qMiwrr4VKfumbPyONLCOPmsR3A6pTY/r0+tSaVbgPWSAIuzbk3lCTa+FForeTiO+wBQGkjA==" }, "vscode-oniguruma": { "version": "1.7.0", @@ -7408,8 +7395,7 @@ "yn": { "version": "3.1.1", "resolved": "https://registry.npmjs.org/yn/-/yn-3.1.1.tgz", - "integrity": "sha512-Ux4ygGWsu2c7isFWe8Yu1YluJmqVhxqK2cLXNQA5AcC3QfbGNpM7fu0Y8b/z16pXLnFxZYvWhd3fhBY9DLmC6Q==", - "peer": true + "integrity": "sha512-Ux4ygGWsu2c7isFWe8Yu1YluJmqVhxqK2cLXNQA5AcC3QfbGNpM7fu0Y8b/z16pXLnFxZYvWhd3fhBY9DLmC6Q==" }, "yocto-queue": { "version": "0.1.0", diff --git a/sdks/typescript/src/apache_beam/worker/state.ts b/sdks/typescript/src/apache_beam/worker/state.ts index dee7a866c2c7..89ecbe212540 100644 --- a/sdks/typescript/src/apache_beam/worker/state.ts +++ b/sdks/typescript/src/apache_beam/worker/state.ts @@ -54,15 +54,26 @@ interface WeightedCacheEntry { weight: number; } +// Default weight for values that cannot be sized (e.g., promises) +const DEFAULT_WEIGHT = 64; + /** * Estimates the memory size of a value in bytes. - * This is a simplified estimation - actual memory usage may vary. + * Handles circular references by tracking visited objects. */ -function estimateSize(value: any): number { +function sizeof(value: any, visited: Set = new Set()): number { if (value === null || value === undefined) { return 8; } + // Handle circular references for objects + if (typeof value === "object") { + if (visited.has(value)) { + return 8; // Account for reference size, not the full object again + } + visited.add(value); + } + const type = typeof value; if (type === "boolean") { @@ -81,20 +92,20 @@ function estimateSize(value: any): number { if (Array.isArray(value)) { let size = 40; // Array overhead for (const item of value) { - size += estimateSize(item); + size += sizeof(item, visited); } return size; } if (type === "object") { let size = 40; // Object overhead for (const key of Object.keys(value)) { - size += estimateSize(key) + estimateSize(value[key]); + size += sizeof(key, visited) + sizeof(value[key], visited); } return size; } // Default for unknown types - return 64; + return DEFAULT_WEIGHT; } // Default cache size: 100MB @@ -120,15 +131,16 @@ export class CachingStateProvider implements StateProvider { */ private evictIfNeeded() { while (this.currentWeight > this.maxCacheWeight && this.cache.size > 0) { - // Remove the first (oldest) entry + // Get the first (oldest) entry const firstKey = this.cache.keys().next().value; - if (firstKey !== undefined) { - const evicted = this.cache.get(firstKey); - if (evicted !== undefined) { - this.currentWeight -= evicted.weight; - } - this.cache.delete(firstKey); + if (firstKey === undefined) { + break; + } + const evictedEntry = this.cache.get(firstKey); + if (evictedEntry !== undefined) { + this.currentWeight -= evictedEntry.weight; } + this.cache.delete(firstKey); } } @@ -169,7 +181,7 @@ export class CachingStateProvider implements StateProvider { // Remove old weight from total this.currentWeight -= currentEntry.weight; } - const resolvedWeight = estimateSize(value); + const resolvedWeight = sizeof(value); this.cache.set(cacheKey, { entry: { type: "value", value }, weight: resolvedWeight, @@ -180,12 +192,13 @@ export class CachingStateProvider implements StateProvider { }), }; } - // Estimate weight for the new entry - const weight = result.type === "value" ? estimateSize(result.value) : 64; // Promise placeholder weight - // Evict if needed before adding new entry + // Calculate weight for the new entry + const weight = + result.type === "value" ? sizeof(result.value) : DEFAULT_WEIGHT; + // Add new entry to cache and then evict if needed this.currentWeight += weight; - this.evictIfNeeded(); this.cache.set(cacheKey, { entry: result, weight }); + this.evictIfNeeded(); return result; } } diff --git a/sdks/typescript/test/state_provider_test.ts b/sdks/typescript/test/state_provider_test.ts index e830754129ab..30b71e782955 100644 --- a/sdks/typescript/test/state_provider_test.ts +++ b/sdks/typescript/test/state_provider_test.ts @@ -283,5 +283,46 @@ describe("CachingStateProvider", function () { cache.getState(stateKey, decode); assert.ok(cache.currentWeight > 0); }); -}); + it("evicts oversized item that exceeds maxCacheWeight", function () { + const mockProvider = new MockStateProvider(); + // Set a very small weight limit (10 bytes) + const cache = new CachingStateProvider(mockProvider, 10); + + const decode = (data: Uint8Array) => data.toString(); + + const stateKey: fnApi.StateKey = { + type: { + oneofKind: "bagUserState", + bagUserState: fnApi.StateKey_BagUserState.create({ + transformId: "test", + userStateId: "oversized_state", + window: new Uint8Array(0), + key: new Uint8Array(0), + }), + }, + }; + + const key = Buffer.from(fnApi.StateKey.toBinary(stateKey)).toString( + "base64", + ); + // Create a large value that exceeds the cache weight limit + const largeValue = "this_is_a_very_large_value_that_exceeds_the_limit"; + mockProvider.setValue(key, largeValue); + + // Cache should start empty + assert.equal(cache.cache.size, 0); + assert.equal(cache.currentWeight, 0); + + // Add the oversized item - it should be added and then immediately evicted + cache.getState(stateKey, decode); + + // The cache should be empty after eviction (item was added then evicted) + assert.equal(cache.cache.size, 0); + assert.equal(cache.currentWeight, 0); + + // Fetching again should hit the underlying provider since item was evicted + cache.getState(stateKey, decode); + assert.equal(mockProvider.callCount, 2); + }); +}); From c1f759838b206e746784487cc137bf4302e16768 Mon Sep 17 00:00:00 2001 From: junaiddshaukat Date: Thu, 15 Jan 2026 11:18:12 +0500 Subject: [PATCH 5/5] Address Gemini comments: fix race condition, optimize evictIfNeeded - Fixed critical race condition in promise callback: only update cache if the entry is still the same promise we're resolving - Optimized evictIfNeeded: use entries() iterator and removed redundant checks --- .../src/apache_beam/worker/state.ts | 37 +++++++++---------- 1 file changed, 18 insertions(+), 19 deletions(-) diff --git a/sdks/typescript/src/apache_beam/worker/state.ts b/sdks/typescript/src/apache_beam/worker/state.ts index 89ecbe212540..5e7466a2a864 100644 --- a/sdks/typescript/src/apache_beam/worker/state.ts +++ b/sdks/typescript/src/apache_beam/worker/state.ts @@ -131,15 +131,11 @@ export class CachingStateProvider implements StateProvider { */ private evictIfNeeded() { while (this.currentWeight > this.maxCacheWeight && this.cache.size > 0) { - // Get the first (oldest) entry - const firstKey = this.cache.keys().next().value; - if (firstKey === undefined) { - break; - } - const evictedEntry = this.cache.get(firstKey); - if (evictedEntry !== undefined) { - this.currentWeight -= evictedEntry.weight; - } + // Get the first (oldest) entry from the map iterator + const firstEntry = this.cache.entries().next().value; + const firstKey = firstEntry[0]; + const evictedEntry = firstEntry[1]; + this.currentWeight -= evictedEntry.weight; this.cache.delete(firstKey); } } @@ -175,19 +171,22 @@ export class CachingStateProvider implements StateProvider { type: "promise", promise: result.promise.then((value) => { // When promise resolves, update cache with resolved value - // Get the current entry to update its weight const currentEntry = this.cache.get(cacheKey); - if (currentEntry !== undefined) { - // Remove old weight from total + // Only update if the entry in the cache is still the promise we are resolving. + // This prevents a race condition where the entry is evicted and replaced + // before this promise resolves. + if (currentEntry?.entry === result) { + // Remove old weight (of the promise) from total this.currentWeight -= currentEntry.weight; + + const resolvedWeight = sizeof(value); + this.cache.set(cacheKey, { + entry: { type: "value", value }, + weight: resolvedWeight, + }); + this.currentWeight += resolvedWeight; + this.evictIfNeeded(); } - const resolvedWeight = sizeof(value); - this.cache.set(cacheKey, { - entry: { type: "value", value }, - weight: resolvedWeight, - }); - this.currentWeight += resolvedWeight; - this.evictIfNeeded(); return value; }), };