diff --git a/packages/common_client/lib/src/data_sources/fdv2/protocol_handler.dart b/packages/common_client/lib/src/data_sources/fdv2/protocol_handler.dart index 3436fa6..74318e5 100644 --- a/packages/common_client/lib/src/data_sources/fdv2/protocol_handler.dart +++ b/packages/common_client/lib/src/data_sources/fdv2/protocol_handler.dart @@ -263,7 +263,9 @@ final class FDv2ProtocolHandler { } ProtocolAction _processError(ServerErrorEvent data) { - _logger.info('Server error encountered receiving updates: ${data.reason}'); + _logger.info('An issue was encountered receiving updates for payload ' + "'${data.payloadId ?? ''}' with reason: '${data.reason}'. " + 'Automatic retry will occur.'); _resetAfterError(); return ActionServerError(data.reason, id: data.payloadId); } diff --git a/packages/common_client/lib/src/data_sources/fdv2/streaming_base.dart b/packages/common_client/lib/src/data_sources/fdv2/streaming_base.dart new file mode 100644 index 0000000..4ba0acb --- /dev/null +++ b/packages/common_client/lib/src/data_sources/fdv2/streaming_base.dart @@ -0,0 +1,333 @@ +import 'dart:async'; +import 'dart:convert'; + +import 'package:http/http.dart' as http; +import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart'; +import 'package:launchdarkly_event_source_client/launchdarkly_event_source_client.dart'; + +import 'flag_eval_mapper.dart'; +import 'protocol_handler.dart'; +import 'protocol_types.dart'; +import 'source.dart'; +import 'source_result.dart'; + +/// Long-lived streaming data source over SSE. +/// +/// Wraps an [SSEClient] with FDv2 protocol semantics. Each named SSE +/// event is parsed as JSON, wrapped in an [FDv2Event], and fed to a +/// fresh [FDv2ProtocolHandler]. The first emitted [ProtocolAction] +/// per event is translated into an [FDv2SourceResult]: +/// +/// - [ActionPayload] --> [ChangeSetResult] with `persist: true`. +/// - [ActionGoodbye] --> goodbye [StatusResult]; the SSE connection is +/// closed. +/// - [ActionServerError] / [ActionError] --> interrupted +/// [StatusResult]; the SSE client's built-in retry handles the +/// reconnect. +/// - [ActionNone] --> no emission (waiting for more events). +/// +/// Legacy `ping` events are routed to the injected [PingHandler] (which +/// performs a one-shot poll) and the result is forwarded to the stream. +/// +/// The `x-ld-fd-fallback` header on the initial connection's response +/// is detected and produces a terminal-error result with +/// `fdv1Fallback: true`. The connection is closed. +/// +/// Lifecycle: a single-subscription stream. [results] starts the SSE +/// connection on subscribe. [close] stops the source, emits a shutdown +/// [StatusResult], and closes the stream. Both paths funnel through a +/// `Completer _stoppedSignal` so async callbacks short-circuit +/// safely. +/// +/// `SSEClient.restart` is intentionally not surfaced here. The +/// orchestrator drives connection lifecycle by tearing down a +/// streaming source and constructing a fresh one, not by reconnecting +/// an existing one. +final class FDv2StreamingBase { + final SSEClient _sseClient; + final PingHandler _pingHandler; + final DateTime Function() _now; + final LDLogger _logger; + + late final StreamController _controller; + final Completer _stoppedSignal = Completer(); + StreamSubscription? _sseSubscription; + FDv2ProtocolHandler? _handler; + String? _environmentId; + bool _pingInFlight = false; + bool _pingPending = false; + + FDv2StreamingBase({ + required SSEClient sseClient, + required PingHandler pingHandler, + required LDLogger logger, + DateTime Function()? now, + }) : _sseClient = sseClient, + _pingHandler = pingHandler, + _logger = logger.subLogger('FDv2StreamingBase'), + _now = now ?? DateTime.now { + _controller = StreamController( + onListen: _onListen, + onCancel: _onCancel, + ); + } + + /// Single-subscription stream of results. The SSE connection is + /// established lazily on the first [Stream.listen] call. + Stream get results => _controller.stream; + + /// Stops the source, emits a shutdown [StatusResult], and closes the + /// stream. Idempotent. + void close() { + _terminate( + finalResult: + FDv2SourceResults.shutdown(message: 'Streaming source closed')); + } + + /// Terminal-path helper used by [close] and by the in-stream + /// terminal paths (goodbye event, fdv1-fallback header). Completes + /// [_stoppedSignal] *first* so any subsequent [close] call -- e.g. + /// from inside an `onData` listener reacting to the [finalResult] + /// we are about to emit -- short-circuits at its guard instead of + /// racing into a closed controller. Idempotent. + void _terminate({FDv2SourceResult? finalResult}) { + if (_stoppedSignal.isCompleted) return; + _stoppedSignal.complete(); + _tearDownConnection(); + if (!_controller.isClosed) { + if (finalResult != null) { + _controller.add(finalResult); + } + _controller.close(); + } + } + + void _onListen() { + _resetHandler(); + _sseSubscription = _sseClient.stream.listen( + _handleEvent, + onError: _handleSseError, + ); + } + + /// Builds a fresh [FDv2ProtocolHandler]. Called on initial connect + /// and on every subsequent [OpenEvent] (SSE auto-reconnect), so a + /// partial transfer from the previous connection cannot bleed into + /// the new one. Also called after a mid-event throw inside + /// `processEvent` so any half-accumulated state is discarded. + void _resetHandler() { + _handler = FDv2ProtocolHandler( + objProcessors: {flagEvalKind: processFlagEval}, + logger: _logger, + ); + } + + Future _onCancel() async { + if (_stoppedSignal.isCompleted) return; + _stoppedSignal.complete(); + _tearDownConnection(); + // No shutdown emission -- the subscriber asked us to stop. Close + // the controller so its internal state is released; we keep no + // subscribers and will never emit again. + if (!_controller.isClosed) { + _controller.close(); + } + } + + void _tearDownConnection() { + _sseSubscription?.cancel(); + _sseSubscription = null; + // Best-effort close. The SSE client may already be closed if it + // emitted an error; that's fine -- the operation is documented as + // safe in any state. + _sseClient.close(); + } + + void _handleEvent(Event event) { + if (_stoppedSignal.isCompleted) return; + switch (event) { + case OpenEvent open: + _handleOpen(open); + case MessageEvent message: + unawaited(_handleMessage(message)); + } + } + + void _handleOpen(OpenEvent event) { + // Every OpenEvent represents a (re)established connection. Rebuild + // the protocol handler so a partial transfer from the prior + // connection cannot bleed into this one -- the SDK must defend + // against this regardless of whether the server respects the + // protocol's "re-send server-intent on resume" semantic. + _resetHandler(); + + final headers = event.headers; + if (headers == null) return; + + final envId = headers['x-ld-envid']; + if (envId != null) { + _environmentId = envId; + } + + final fallback = headers['x-ld-fd-fallback']?.toLowerCase() == 'true'; + if (fallback) { + // Server told us to fall back; route through the terminal helper + // so a close() from the listener's onData -- a natural reaction + // to a fallback signal -- doesn't race with our own close. + _terminate( + finalResult: FDv2SourceResults.terminalError( + message: 'Server requested FDv1 fallback', + fdv1Fallback: true, + )); + } + } + + Future _handleMessage(MessageEvent event) async { + if (event.type == 'ping') { + // Legacy bridge: older servers may still send `ping` instead of + // FDv2 events. Defer to the injected handler for a one-shot poll. + await _handlePing(); + return; + } + + // Capture freshness as close to message arrival as possible, before + // any parse/dispatch work, so the timestamp reflects when the SDK + // saw the update -- not when it finished processing it. + final freshness = _now(); + + final ProtocolAction action; + try { + final decoded = jsonDecode(event.data); + if (decoded is! Map) { + _logger.warn('Ignoring SSE event with non-object data: ' + 'event=${event.type}'); + _emit(FDv2SourceResults.interrupted( + message: 'Streaming event payload was not a JSON object')); + return; + } + // Wrap the protocol-handler dispatch in the same try/catch as the + // jsonDecode: the structural casts inside the per-event fromJson + // factories (e.g. PayloadIntent, PutObjectEvent) throw TypeError + // on shape mismatch and would otherwise become unhandled async + // exceptions. + action = + _handler!.processEvent(FDv2Event(event: event.type, data: decoded)); + } catch (err) { + _logger.warn('Failed to parse or process SSE event (${err.runtimeType})'); + // Reset the handler -- a mid-event throw can leave it with stale + // _tempUpdates from the partially-processed payload. + _resetHandler(); + _emit(FDv2SourceResults.interrupted( + message: 'Streaming event payload was malformed')); + return; + } + + if (_stoppedSignal.isCompleted) return; + + switch (action) { + case ActionPayload(:final payload): + _emit(ChangeSetResult( + payload: payload, + environmentId: _environmentId, + freshness: freshness, + persist: true, + )); + case ActionGoodbye(:final reason): + // Server told us to disconnect; route through the terminal + // helper so a close() from the listener's onData -- a natural + // reaction to a goodbye -- doesn't race with our own close. + _terminate( + finalResult: FDv2SourceResults.goodbyeResult(message: reason)); + case ActionServerError(:final reason): + _emit(FDv2SourceResults.interrupted(message: reason)); + case ActionError(:final message): + _emit(FDv2SourceResults.interrupted(message: message)); + case ActionNone(): + // No emission; continue accumulating events until the handler + // reaches a terminal action. + break; + } + } + + Future _handlePing() async { + // The FDv2 ping semantic is "go re-poll". Two competing concerns: + // + // 1. Concurrent polls race on emit-order and amplify load on the + // polling endpoint, so only one poll may be in flight at a time. + // 2. Simply dropping pings that arrive during an in-flight poll + // can leave the SDK on a stale snapshot: if server state changed + // between when the in-flight poll captured it and when the + // dropped ping arrived, no further poll fires and the change is + // never seen. + // + // Coalesce: pings that arrive while a poll is running set a + // `_pingPending` flag. When the in-flight poll returns we drain the + // flag with one more poll, capturing whatever the latest state is. + // Multiple pings during the same in-flight window collapse to a + // single follow-up. + if (_pingInFlight) { + _pingPending = true; + return; + } + _pingInFlight = true; + try { + do { + _pingPending = false; + final FDv2SourceResult result; + try { + result = await _pingHandler(); + } catch (err) { + _logger.warn('Ping handler threw unexpectedly: ${err.runtimeType}'); + if (_stoppedSignal.isCompleted) return; + _emit(FDv2SourceResults.interrupted( + message: 'Ping handler raised error unexpectedly')); + return; + } + if (_stoppedSignal.isCompleted) return; + _emit(result); + } while (_pingPending && !_stoppedSignal.isCompleted); + } finally { + _pingInFlight = false; + } + } + + void _handleSseError(Object err, StackTrace stack) { + if (_stoppedSignal.isCompleted) return; + // The SSE client's built-in backoff handles reconnection. Surface + // the disruption as interrupted; the orchestrator decides whether + // to fall through to a different source after enough time. + // + // Don't log the raw exception. http.ClientException's toString + // formats as 'ClientException: , uri=', and in GET + // mode the URL embeds the base64-encoded context. Only the + // category and a synthetic stack header go to the log. + _logger.warn('SSE error (${err.runtimeType}); will retry'); + _logger.debug('SSE error stack:\n$stack'); + _emit(FDv2SourceResults.interrupted(message: _describeError(err))); + } + + /// Categorizes an exception surfaced on the SSE stream into a fixed + /// sanitized message. Mirrors the polling base's helper so neither + /// surface (the public StatusResult.message nor the warn log) ever + /// echoes a raw http.ClientException -- whose toString carries the + /// full request URL. + String _describeError(Object err) { + if (err is TimeoutException) { + return 'Streaming request timed out'; + } + if (err is http.ClientException) { + return 'Network error during streaming request'; + } + final type = err.runtimeType.toString(); + if (type.contains('Tls') || type.contains('Handshake')) { + return 'TLS error during streaming request'; + } + return 'Streaming connection error'; + } + + void _emit(FDv2SourceResult result) { + if (_stoppedSignal.isCompleted) return; + if (_controller.isClosed) return; + _controller.add(result); + } +} diff --git a/packages/common_client/lib/src/data_sources/fdv2/streaming_synchronizer.dart b/packages/common_client/lib/src/data_sources/fdv2/streaming_synchronizer.dart new file mode 100644 index 0000000..12dde88 --- /dev/null +++ b/packages/common_client/lib/src/data_sources/fdv2/streaming_synchronizer.dart @@ -0,0 +1,22 @@ +import 'source.dart'; +import 'source_result.dart'; +import 'streaming_base.dart'; + +/// Long-lived streaming synchronizer. +/// +/// A thin adapter that exposes [FDv2StreamingBase.results] as a +/// [Synchronizer]. The base class already implements all of the +/// connection lifecycle, protocol parsing, and error handling; this +/// wrapper exists only to satisfy the [Synchronizer] interface so the +/// orchestrator can treat polling and streaming uniformly. +final class FDv2StreamingSynchronizer implements Synchronizer { + final FDv2StreamingBase _base; + + FDv2StreamingSynchronizer({required FDv2StreamingBase base}) : _base = base; + + @override + Stream get results => _base.results; + + @override + void close() => _base.close(); +} diff --git a/packages/common_client/test/data_sources/fdv2/protocol_handler_test.dart b/packages/common_client/test/data_sources/fdv2/protocol_handler_test.dart index 2532151..af2c7b6 100644 --- a/packages/common_client/test/data_sources/fdv2/protocol_handler_test.dart +++ b/packages/common_client/test/data_sources/fdv2/protocol_handler_test.dart @@ -1,10 +1,21 @@ -import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart'; import 'package:launchdarkly_common_client/src/data_sources/fdv2/payload.dart'; import 'package:launchdarkly_common_client/src/data_sources/fdv2/protocol_handler.dart'; import 'package:launchdarkly_common_client/src/data_sources/fdv2/protocol_types.dart'; +import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart'; +import 'package:mocktail/mocktail.dart'; import 'package:test/test.dart'; +class MockLogAdapter extends Mock implements LDLogAdapter {} + void main() { + setUpAll(() { + registerFallbackValue(LDLogRecord( + level: LDLogLevel.debug, + message: '', + time: DateTime.now(), + logTag: '')); + }); + final logger = LDLogger(); FDv2ProtocolHandler makeHandler() { @@ -433,6 +444,52 @@ void main() { // but temp updates should be cleared. New data can follow. expect(handler.state, equals(ProtocolState.full)); }); + + test('logs the spec-mandated message format with payload id and reason', + () { + final adapter = MockLogAdapter(); + when(() => adapter.log(any())).thenReturn(null); + final loggerWithAdapter = LDLogger(adapter: adapter); + final handler = FDv2ProtocolHandler( + objProcessors: {'flag-eval': (obj) => obj}, + logger: loggerWithAdapter, + ); + handler.processEvent(serverIntent('xfer-full')); + + handler.processEvent(FDv2Event( + event: FDv2EventTypes.error, + data: {'payload_id': 'p-7', 'reason': 'oops'})); + + final records = verify(() => adapter.log(captureAny())).captured; + final messages = records.map((r) => (r as LDLogRecord).message).toList(); + expect( + messages, + contains("An issue was encountered receiving updates for payload 'p-7' " + "with reason: 'oops'. Automatic retry will occur."), + ); + }); + + test('uses a placeholder for the payload id when the field is missing', () { + final adapter = MockLogAdapter(); + when(() => adapter.log(any())).thenReturn(null); + final loggerWithAdapter = LDLogger(adapter: adapter); + final handler = FDv2ProtocolHandler( + objProcessors: {'flag-eval': (obj) => obj}, + logger: loggerWithAdapter, + ); + handler.processEvent(serverIntent('xfer-full')); + + handler.processEvent( + FDv2Event(event: FDv2EventTypes.error, data: {'reason': 'oops'})); + + final records = verify(() => adapter.log(captureAny())).captured; + final messages = records.map((r) => (r as LDLogRecord).message).toList(); + expect( + messages.any( + (m) => m.contains("for payload '' with reason: 'oops'")), + isTrue, + ); + }); }); group('heartbeat', () { diff --git a/packages/common_client/test/data_sources/fdv2/streaming_base_test.dart b/packages/common_client/test/data_sources/fdv2/streaming_base_test.dart new file mode 100644 index 0000000..e2f7d42 --- /dev/null +++ b/packages/common_client/test/data_sources/fdv2/streaming_base_test.dart @@ -0,0 +1,647 @@ +import 'dart:async'; +import 'dart:collection'; +import 'dart:convert'; + +import 'package:http/http.dart' as http; +import 'package:launchdarkly_common_client/src/data_sources/fdv2/payload.dart'; +import 'package:launchdarkly_common_client/src/data_sources/fdv2/source_result.dart'; +import 'package:launchdarkly_common_client/src/data_sources/fdv2/streaming_base.dart'; +import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart'; +import 'package:launchdarkly_event_source_client/launchdarkly_event_source_client.dart'; +import 'package:mocktail/mocktail.dart'; +import 'package:test/test.dart'; + +class MockLogAdapter extends Mock implements LDLogAdapter {} + +TestSseClient makeSse() => SSEClient.testClient(Uri.parse('/test'), const {}); + +void emitOpen(TestSseClient sse, {Map? headers}) { + sse.emitEvent(OpenEvent( + headers: headers == null ? null : UnmodifiableMapView(headers), + )); +} + +void emitMessage(TestSseClient sse, String type, String data, {String? id}) { + sse.emitEvent(MessageEvent(type, data, id)); +} + +String serverIntent({String intentCode = 'xfer-full', int target = 1}) => + jsonEncode({ + 'payloads': [ + { + 'id': 'p1', + 'target': target, + 'intentCode': intentCode, + 'reason': 'test', + } + ] + }); + +String putObject({ + String key = 'flag-a', + int version = 1, +}) => + jsonEncode({ + 'kind': 'flag-eval', + 'key': key, + 'version': version, + 'object': {'value': true, 'version': version, 'variation': 0}, + }); + +String payloadTransferred({String state = 'sel-1', int version = 1}) => + jsonEncode({ + 'state': state, + 'version': version, + }); + +void emitFullPayload(TestSseClient sse, + {String state = 'sel-1', String flagKey = 'flag-a'}) { + emitMessage(sse, 'server-intent', serverIntent()); + emitMessage(sse, 'put-object', putObject(key: flagKey)); + emitMessage(sse, 'payload-transferred', payloadTransferred(state: state)); +} + +FDv2StreamingBase makeBase( + TestSseClient sse, { + Future Function()? pingHandler, + DateTime Function()? now, +}) { + return FDv2StreamingBase( + sseClient: sse, + pingHandler: pingHandler ?? + () async => FDv2SourceResults.interrupted(message: 'no ping handler'), + logger: LDLogger(level: LDLogLevel.error), + now: now, + ); +} + +void main() { + setUpAll(() { + registerFallbackValue(LDLogRecord( + level: LDLogLevel.debug, + message: '', + time: DateTime.now(), + logTag: '')); + }); + + group('connection lifecycle', () { + test('opens the SSE stream on first listen', () async { + final sse = makeSse(); + final base = makeBase(sse); + final emissions = []; + + final sub = base.results.listen(emissions.add); + await Future.delayed(Duration.zero); + + // No emission yet -- nothing has come in over the SSE stream. + expect(emissions, isEmpty); + + // Drive a full xfer-full sequence and confirm the resulting + // ChangeSetResult is emitted. + emitFullPayload(sse); + await Future.delayed(Duration.zero); + + expect(emissions, hasLength(1)); + expect(emissions.single, isA()); + + await sub.cancel(); + }); + + test( + 'subscription cancel tears down the SSE client without emitting ' + 'shutdown', () async { + final sse = makeSse(); + final base = makeBase(sse); + final emissions = []; + final sub = base.results.listen(emissions.add); + await Future.delayed(Duration.zero); + + await sub.cancel(); + await Future.delayed(Duration.zero); + + expect(sse.isClosed, isTrue); + expect(emissions.whereType(), isEmpty); + }); + + test('close() emits shutdown then closes the stream', () async { + final sse = makeSse(); + final base = makeBase(sse); + final emissions = []; + final done = Completer(); + base.results.listen(emissions.add, onDone: done.complete); + await Future.delayed(Duration.zero); + + base.close(); + await done.future; + + expect(emissions, hasLength(1)); + expect((emissions.single as StatusResult).state, + equals(SourceState.shutdown)); + expect(sse.isClosed, isTrue); + }); + + test('close() is idempotent', () async { + final sse = makeSse(); + final base = makeBase(sse); + base.close(); + expect(() => base.close(), returnsNormally); + }); + + test('close() is safe to call from a listener reacting to a goodbye', + () async { + // The orchestrator can legitimately react to a goodbye result + // by calling base.close(). That path must not race with the + // self-close the goodbye branch already does internally. + final sse = makeSse(); + final base = makeBase(sse); + Object? caught; + runZonedGuarded(() { + base.results.listen((event) { + if (event is StatusResult && event.state == SourceState.goodbye) { + base.close(); + } + }); + }, (err, _) => caught = err); + + await Future.delayed(Duration.zero); + emitMessage(sse, 'server-intent', serverIntent()); + emitMessage(sse, 'goodbye', jsonEncode({'reason': 'maintenance'})); + // Yield enough times for the listener and any async work to settle. + for (var i = 0; i < 5; i++) { + await Future.delayed(Duration.zero); + } + + expect(caught, isNull, + reason: + 'close() from onData on a goodbye must not throw, got $caught'); + expect(sse.isClosed, isTrue); + }); + + test( + 'close() is safe to call from a listener reacting to an FDv1 ' + 'fallback', () async { + // Same race risk as the goodbye case, but for the fdv1-fallback + // terminal branch. + final sse = makeSse(); + final base = makeBase(sse); + Object? caught; + runZonedGuarded(() { + base.results.listen((event) { + if (event is StatusResult && event.fdv1Fallback) { + base.close(); + } + }); + }, (err, _) => caught = err); + + await Future.delayed(Duration.zero); + emitOpen(sse, headers: {'x-ld-fd-fallback': 'true'}); + for (var i = 0; i < 5; i++) { + await Future.delayed(Duration.zero); + } + + expect(caught, isNull, + reason: + 'close() from onData on fallback must not throw, got $caught'); + expect(sse.isClosed, isTrue); + }); + }); + + group('event handling', () { + test('xfer-full sequence produces ChangeSetResult with full payload', + () async { + final sse = makeSse(); + final fixedNow = DateTime.utc(2026, 1, 1); + final base = makeBase(sse, now: () => fixedNow); + final emissions = []; + final sub = base.results.listen(emissions.add); + await Future.delayed(Duration.zero); + + emitFullPayload(sse, state: 'sel-99', flagKey: 'k1'); + await Future.delayed(Duration.zero); + + expect(emissions, hasLength(1)); + final cs = emissions.single as ChangeSetResult; + expect(cs.payload.type, equals(PayloadType.full)); + expect(cs.payload.selector.state, equals('sel-99')); + expect(cs.payload.updates.single.key, equals('k1')); + expect(cs.persist, isTrue); + expect(cs.freshness, equals(fixedNow)); + + await sub.cancel(); + }); + + test('environmentId from x-ld-envid header rides on the ChangeSetResult', + () async { + final sse = makeSse(); + final base = makeBase(sse); + final emissions = []; + final sub = base.results.listen(emissions.add); + await Future.delayed(Duration.zero); + + emitOpen(sse, headers: {'x-ld-envid': 'env-abc'}); + emitFullPayload(sse); + await Future.delayed(Duration.zero); + + expect((emissions.single as ChangeSetResult).environmentId, + equals('env-abc')); + + await sub.cancel(); + }); + + test('goodbye event closes the source and emits a goodbye result', + () async { + final sse = makeSse(); + final base = makeBase(sse); + final emissions = []; + final done = Completer(); + base.results.listen(emissions.add, onDone: done.complete); + await Future.delayed(Duration.zero); + + emitMessage(sse, 'server-intent', serverIntent()); + emitMessage(sse, 'goodbye', jsonEncode({'reason': 'maintenance'})); + await done.future; + + expect(emissions, hasLength(1)); + expect((emissions.single as StatusResult).state, + equals(SourceState.goodbye)); + expect(sse.isClosed, isTrue); + }); + + test('unparseable event data is reported as interrupted, no throw', + () async { + final sse = makeSse(); + final base = makeBase(sse); + final emissions = []; + final sub = base.results.listen(emissions.add); + await Future.delayed(Duration.zero); + + emitMessage(sse, 'put-object', 'not json'); + await Future.delayed(Duration.zero); + + expect((emissions.single as StatusResult).state, + equals(SourceState.interrupted)); + + await sub.cancel(); + }); + + test( + 'malformed-shape event data that passes the Map check but fails ' + 'inside processEvent is reported as interrupted, no unhandled async', + () async { + // The data is a JSON object, but its inner structure violates + // the FDv2 protocol: payloads must be a list, not a string. The + // List cast inside protocol_types.dart throws TypeError + // synchronously from processEvent. The streaming source must + // catch it and surface as interrupted, not let it become an + // unhandled async exception. + final sse = makeSse(); + final base = makeBase(sse); + final emissions = []; + Object? caughtAsync; + late final StreamSubscription sub; + runZonedGuarded(() { + sub = base.results.listen(emissions.add); + }, (err, _) => caughtAsync = err); + await Future.delayed(Duration.zero); + + emitMessage(sse, 'server-intent', jsonEncode({'payloads': 'not-a-list'})); + for (var i = 0; i < 5; i++) { + await Future.delayed(Duration.zero); + } + + expect(caughtAsync, isNull, + reason: 'malformed inner shape must not become unhandled async, ' + 'got $caughtAsync'); + expect((emissions.single as StatusResult).state, + equals(SourceState.interrupted)); + + await sub.cancel(); + }); + + test('non-object event data is reported as interrupted, no throw', + () async { + final sse = makeSse(); + final base = makeBase(sse); + final emissions = []; + final sub = base.results.listen(emissions.add); + await Future.delayed(Duration.zero); + + emitMessage(sse, 'server-intent', '[1,2,3]'); + await Future.delayed(Duration.zero); + + expect((emissions.single as StatusResult).state, + equals(SourceState.interrupted)); + + await sub.cancel(); + }); + + test( + 'an SSE reconnect resets the protocol handler so partial transfer ' + 'state from the previous connection does not bleed into the new one', + () async { + // Connection 1: server-intent + put-object, then the connection + // drops BEFORE payload-transferred. The handler is mid-payload + // with one accumulated update. + // + // Connection 2 (auto-reconnect, fresh OpenEvent on the same SSE + // subscriber): a Last-Event-ID resumption could have the server + // skip server-intent and continue with put-object directly. + // Without a per-OpenEvent handler reset, the new put-object + // accumulates on top of the stale buffer. The eventual + // payload-transferred would emit BOTH puts as one payload. + // + // With the reset, the new OpenEvent rebuilds the handler. Absent + // a server-intent the new put-object lands on an inactive + // handler and is rejected; the payload-transferred surfaces as a + // protocol error rather than as a corrupted ChangeSet. + final sse = makeSse(); + final base = makeBase(sse); + final emissions = []; + final sub = base.results.listen(emissions.add); + await Future.delayed(Duration.zero); + + emitOpen(sse); + emitMessage(sse, 'server-intent', serverIntent()); + emitMessage(sse, 'put-object', putObject(key: 'old-flag', version: 1)); + + // Reconnect. Server skips server-intent (Last-Event-ID resume). + emitOpen(sse); + emitMessage(sse, 'put-object', putObject(key: 'new-flag', version: 2)); + emitMessage(sse, 'payload-transferred', payloadTransferred()); + + await Future.delayed(Duration.zero); + + for (final result in emissions.whereType()) { + final keys = result.payload.updates.map((u) => u.key).toSet(); + expect(keys, isNot(contains('old-flag')), + reason: 'old-flag from the previous connection bled into ' + "the new connection's payload"); + } + + await sub.cancel(); + }); + + test('SSE transport error is reported as interrupted', () async { + final sse = makeSse(); + final base = makeBase(sse); + final emissions = []; + final sub = base.results.listen(emissions.add); + await Future.delayed(Duration.zero); + + sse.emitError(error: Exception('connection dropped')); + await Future.delayed(Duration.zero); + + expect((emissions.single as StatusResult).state, + equals(SourceState.interrupted)); + + await sub.cancel(); + }); + + test( + 'SSE transport error log records do not echo the request URL ' + 'or any other detail of the underlying exception', () async { + // http.ClientException's toString format is + // 'ClientException: , uri='. The URL embeds the + // base64-encoded context in GET mode, which is reversible. + // The streaming source must categorize the error and log only + // the sanitized form, like the polling sibling does. + final adapter = MockLogAdapter(); + when(() => adapter.log(any())).thenReturn(null); + final logger = LDLogger(adapter: adapter, level: LDLogLevel.debug); + + final sse = makeSse(); + final base = FDv2StreamingBase( + sseClient: sse, + pingHandler: () async => + FDv2SourceResults.interrupted(message: 'no ping'), + logger: logger, + ); + final sub = base.results.listen((_) {}); + await Future.delayed(Duration.zero); + + const secret = 'SECRET-ENCODED-CONTEXT'; + sse.emitError( + error: http.ClientException('Connection refused', + Uri.parse('https://example.test/sdk/stream/eval/$secret')), + ); + await Future.delayed(Duration.zero); + + final records = verify(() => adapter.log(captureAny())).captured; + for (final record in records) { + expect((record as LDLogRecord).message, isNot(contains(secret))); + } + + await sub.cancel(); + }); + }); + + group('FDv1 fallback header on connect', () { + test( + 'x-ld-fd-fallback: true on the OpenEvent emits terminalError and ' + 'closes', () async { + final sse = makeSse(); + final base = makeBase(sse); + final emissions = []; + final done = Completer(); + base.results.listen(emissions.add, onDone: done.complete); + await Future.delayed(Duration.zero); + + emitOpen(sse, headers: {'x-ld-fd-fallback': 'true'}); + await done.future; + + expect(emissions, hasLength(1)); + final status = emissions.single as StatusResult; + expect(status.state, equals(SourceState.terminalError)); + expect(status.fdv1Fallback, isTrue); + expect(sse.isClosed, isTrue); + }); + + test('fallback header is matched case-insensitively', () async { + final sse = makeSse(); + final base = makeBase(sse); + final emissions = []; + final sub = base.results.listen(emissions.add); + await Future.delayed(Duration.zero); + + emitOpen(sse, headers: {'x-ld-fd-fallback': 'TRUE'}); + await Future.delayed(Duration.zero); + + expect((emissions.single as StatusResult).fdv1Fallback, isTrue); + + await sub.cancel(); + }); + + test('fallback header value other than true is ignored', () async { + final sse = makeSse(); + final base = makeBase(sse); + final emissions = []; + final sub = base.results.listen(emissions.add); + await Future.delayed(Duration.zero); + + emitOpen(sse, headers: {'x-ld-fd-fallback': 'false'}); + emitFullPayload(sse); + await Future.delayed(Duration.zero); + + expect(emissions.single, isA()); + expect(emissions.single.fdv1Fallback, isFalse); + + await sub.cancel(); + }); + }); + + group('legacy ping bridge', () { + test( + 'ping event invokes the PingHandler and forwards its result to ' + 'the stream', () async { + var pingCallCount = 0; + final pingResult = ChangeSetResult( + payload: const Payload(type: PayloadType.full, updates: []), + persist: true, + freshness: DateTime.utc(2026, 1, 1), + ); + final sse = makeSse(); + final base = makeBase( + sse, + pingHandler: () async { + pingCallCount++; + return pingResult; + }, + ); + final emissions = []; + final sub = base.results.listen(emissions.add); + await Future.delayed(Duration.zero); + + emitMessage(sse, 'ping', ''); + await Future.delayed(Duration.zero); + + expect(pingCallCount, equals(1)); + expect(emissions, hasLength(1)); + expect(identical(emissions.single, pingResult), isTrue); + + await sub.cancel(); + }); + + test( + 'consecutive ping events coalesce: no concurrent handler invocations, ' + 'and a follow-up poll fires after the in-flight one returns', () async { + // Two pings back-to-back must not result in two concurrent polls + // (avoids out-of-order races and DoS amplification on the polling + // endpoint). But the second ping can't simply be dropped: if the + // server's state changed between when the in-flight poll captured + // it and when the second ping arrived, dropping leaves the SDK + // stuck on the stale snapshot. The coalescing fix runs one + // follow-up poll after the in-flight returns, which captures any + // state change that arrived during the in-flight window. + var concurrent = 0; + var maxConcurrent = 0; + var totalCalls = 0; + final firstCallGate = Completer(); + final firstCallReleased = Completer(); + final sse = makeSse(); + final base = makeBase( + sse, + pingHandler: () async { + totalCalls++; + concurrent++; + if (concurrent > maxConcurrent) maxConcurrent = concurrent; + if (!firstCallGate.isCompleted) firstCallGate.complete(); + // Only the first call is gated; subsequent calls return + // immediately so the coalesced follow-up can run unimpeded. + if (totalCalls == 1) { + await firstCallReleased.future; + } + concurrent--; + return FDv2SourceResults.interrupted(message: 'ok'); + }, + ); + final sub = base.results.listen((_) {}); + await Future.delayed(Duration.zero); + + emitMessage(sse, 'ping', ''); + // Wait for the first call to enter the handler. + await firstCallGate.future; + // Fire the second ping while the first is still in flight. + emitMessage(sse, 'ping', ''); + await Future.delayed(Duration.zero); + // Release the held call so it can return; the pending flag from + // the second ping should trigger a follow-up poll. + firstCallReleased.complete(); + // Yield enough times for the follow-up poll to run. + for (var i = 0; i < 5; i++) { + await Future.delayed(Duration.zero); + } + + expect(maxConcurrent, equals(1), + reason: 'concurrent ping handler invocations are not allowed'); + expect(totalCalls, equals(2), + reason: 'a ping that arrives while one is in flight must trigger ' + 'exactly one follow-up poll after the in-flight returns'); + + await sub.cancel(); + }); + + test( + 'a burst of pings during one in-flight poll collapses to exactly one ' + 'follow-up poll, not N follow-ups', () async { + // Three pings arrive while one is in flight. Coalescing must + // collapse them: only one follow-up runs (the latest poll sees the + // latest state), not three. + var totalCalls = 0; + final firstCallGate = Completer(); + final firstCallReleased = Completer(); + final sse = makeSse(); + final base = makeBase( + sse, + pingHandler: () async { + totalCalls++; + if (!firstCallGate.isCompleted) firstCallGate.complete(); + if (totalCalls == 1) { + await firstCallReleased.future; + } + return FDv2SourceResults.interrupted(message: 'ok'); + }, + ); + final sub = base.results.listen((_) {}); + await Future.delayed(Duration.zero); + + emitMessage(sse, 'ping', ''); + await firstCallGate.future; + emitMessage(sse, 'ping', ''); + emitMessage(sse, 'ping', ''); + emitMessage(sse, 'ping', ''); + await Future.delayed(Duration.zero); + firstCallReleased.complete(); + for (var i = 0; i < 5; i++) { + await Future.delayed(Duration.zero); + } + + expect(totalCalls, equals(2), + reason: 'multiple pings during one in-flight poll collapse to a ' + 'single follow-up poll'); + + await sub.cancel(); + }); + + test('PingHandler throwing is treated as interrupted, no propagation', + () async { + final sse = makeSse(); + final base = makeBase( + sse, + pingHandler: () async { + throw StateError('boom'); + }, + ); + final emissions = []; + final sub = base.results.listen(emissions.add); + await Future.delayed(Duration.zero); + + emitMessage(sse, 'ping', ''); + await Future.delayed(Duration.zero); + + expect((emissions.single as StatusResult).state, + equals(SourceState.interrupted)); + + await sub.cancel(); + }); + }); +} diff --git a/packages/common_client/test/data_sources/fdv2/streaming_synchronizer_test.dart b/packages/common_client/test/data_sources/fdv2/streaming_synchronizer_test.dart new file mode 100644 index 0000000..d563a54 --- /dev/null +++ b/packages/common_client/test/data_sources/fdv2/streaming_synchronizer_test.dart @@ -0,0 +1,133 @@ +import 'dart:async'; +import 'dart:convert'; + +import 'package:launchdarkly_common_client/src/data_sources/fdv2/source_result.dart'; +import 'package:launchdarkly_common_client/src/data_sources/fdv2/streaming_base.dart'; +import 'package:launchdarkly_common_client/src/data_sources/fdv2/streaming_synchronizer.dart'; +import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart'; +import 'package:launchdarkly_event_source_client/launchdarkly_event_source_client.dart'; +import 'package:test/test.dart'; + +TestSseClient makeSse() => SSEClient.testClient(Uri.parse('/test'), const {}); + +void emitMessage(TestSseClient sse, String type, String data) { + sse.emitEvent(MessageEvent(type, data, null)); +} + +void emitFullPayload(TestSseClient sse, {String state = 'sel-1'}) { + emitMessage( + sse, + 'server-intent', + jsonEncode({ + 'payloads': [ + { + 'id': 'p1', + 'target': 1, + 'intentCode': 'xfer-full', + 'reason': 'test', + } + ] + }), + ); + emitMessage( + sse, + 'put-object', + jsonEncode({ + 'kind': 'flag-eval', + 'key': 'k', + 'version': 1, + 'object': {'value': true, 'version': 1, 'variation': 0}, + }), + ); + emitMessage( + sse, + 'payload-transferred', + jsonEncode({'state': state, 'version': 1}), + ); +} + +FDv2StreamingBase makeBase(TestSseClient sse) => FDv2StreamingBase( + sseClient: sse, + pingHandler: () async => + FDv2SourceResults.interrupted(message: 'no ping'), + logger: LDLogger(level: LDLogLevel.error), + ); + +void main() { + test('forwards results from the underlying base', () async { + final sse = makeSse(); + final sync = FDv2StreamingSynchronizer(base: makeBase(sse)); + final emissions = []; + final sub = sync.results.listen(emissions.add); + await Future.delayed(Duration.zero); + + emitFullPayload(sse, state: 'sel-1'); + emitFullPayload(sse, state: 'sel-2'); + await Future.delayed(Duration.zero); + + expect(emissions, hasLength(2)); + expect((emissions[0] as ChangeSetResult).payload.selector.state, + equals('sel-1')); + expect((emissions[1] as ChangeSetResult).payload.selector.state, + equals('sel-2')); + + await sub.cancel(); + }); + + test('close forwards to the base, emitting shutdown', () async { + final sse = makeSse(); + final sync = FDv2StreamingSynchronizer(base: makeBase(sse)); + final emissions = []; + final done = Completer(); + sync.results.listen(emissions.add, onDone: done.complete); + await Future.delayed(Duration.zero); + + sync.close(); + await done.future; + + expect( + (emissions.last as StatusResult).state, equals(SourceState.shutdown)); + expect(sse.isClosed, isTrue); + }); + + test('close() is safe to call from an onData listener reacting to goodbye', + () async { + // The Synchronizer interface contract documents close() as + // idempotent. A close() call from inside the listener's onData + // when reacting to a goodbye must not throw. + final sse = makeSse(); + final sync = FDv2StreamingSynchronizer(base: makeBase(sse)); + Object? caught; + runZonedGuarded(() { + sync.results.listen((event) { + if (event is StatusResult && event.state == SourceState.goodbye) { + sync.close(); + } + }); + }, (err, _) => caught = err); + + await Future.delayed(Duration.zero); + emitMessage( + sse, + 'server-intent', + jsonEncode({ + 'payloads': [ + { + 'id': 'p1', + 'target': 1, + 'intentCode': 'xfer-full', + 'reason': 'test', + } + ] + }), + ); + emitMessage(sse, 'goodbye', jsonEncode({'reason': 'maintenance'})); + for (var i = 0; i < 5; i++) { + await Future.delayed(Duration.zero); + } + + expect(caught, isNull, + reason: 'sync.close() from onData on goodbye must not throw'); + expect(sse.isClosed, isTrue); + }); +}