Skip to content

Commit 4143c47

Browse files
authored
Merge branch 'main' into cli
2 parents 0e7dbd7 + 85eff3b commit 4143c47

File tree

3 files changed

+83
-17
lines changed

3 files changed

+83
-17
lines changed

Sources/Valkey/ValkeyClient.swift

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public final class ValkeyClient: Sendable {
4545
case runNodeClient(ValkeyNodeClient)
4646
case runTimer(ValkeyTimer)
4747
case runRole
48+
case verifyReplicas
4849
}
4950
let actionStream: AsyncStream<RunAction>
5051
let actionStreamContinuation: AsyncStream<RunAction>.Continuation
@@ -185,18 +186,30 @@ extension ValkeyClient {
185186

186187
case .runTimer(let timer):
187188
await runTimerAction(timer)
189+
190+
case .verifyReplicas:
191+
break
188192
}
189193
}
190194

191195
/// Run ROLE command and update topology based on results
192196
private func runRoleAction() async {
197+
struct NotReplicaError: Error {}
193198
var topology: StateMachine.TopologyRefreshInput
194199
let nodeClient = self.getNode(readOnly: false)
195200
do {
196201
let role = try await nodeClient.execute(ROLE())
197202
switch role {
198203
case .primary(let primaryState):
199204
let replicas = primaryState.replicas.map { ValkeyServerAddress.hostname($0.ip, port: $0.port) }
205+
// add nodes primary found
206+
let action = self.stateMachine.withLock { $0.addNewNodesFromRefresh(replicas) }
207+
for replicaClient in action.clientsToRun {
208+
self.queueAction(.runNodeClient(replicaClient))
209+
}
210+
// verify all replicas are replicas
211+
guard try await self.verifyReplicas(action.clientsToRun) else { throw NotReplicaError() }
212+
200213
topology = .primary(replicas: replicas)
201214
self.logger.debug("Found replicas \(replicas)")
202215

@@ -277,7 +290,7 @@ extension ValkeyClient {
277290
}
278291
}
279292

280-
func runTimerFiredAction(_ action: StateMachine.TimerFiredAction) {
293+
private func runTimerFiredAction(_ action: StateMachine.TimerFiredAction) {
281294
switch action {
282295
case .runRole:
283296
self.queueAction(.runRole)
@@ -286,6 +299,21 @@ extension ValkeyClient {
286299
break
287300
}
288301
}
302+
303+
private func verifyReplicas(_ replicas: [ValkeyNodeClient]) async throws -> Bool {
304+
try await withThrowingTaskGroup(of: Bool.self) { group in
305+
for replica in replicas {
306+
group.addTask {
307+
let role = try await replica.execute(ROLE())
308+
guard case .replica = role else {
309+
return false
310+
}
311+
return true
312+
}
313+
}
314+
return try await group.reduce(true) { $0 && $1 }
315+
}
316+
}
289317
}
290318

291319
// MARK: ValkeyClientProtocol methods

Sources/Valkey/ValkeyClientStateMachine.swift

Lines changed: 52 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,13 @@ struct ValkeyClientStateMachine<
2929
/// Current state
3030
@usableFromInline
3131
enum State {
32+
@usableFromInline
33+
struct HealthyContext {
34+
@usableFromInline
35+
let nodes: ValkeyNodeIDs<ValkeyServerAddress>
36+
}
3237
case uninitialized
33-
case running(ValkeyNodeIDs<ValkeyServerAddress>)
38+
case healthy(HealthyContext)
3439
case shutdown
3540
}
3641
/// Topology refresh state
@@ -77,10 +82,10 @@ struct ValkeyClientStateMachine<
7782

7883
@inlinable
7984
func getNode(_ selection: ValkeyNodeSelection) -> ConnectionPool {
80-
guard case .running(let nodes) = self.state else {
85+
guard case .healthy(let healthyContext) = self.state else {
8186
preconditionFailure("Cannot get a node if the client statemachine isn't initialized")
8287
}
83-
let nodeID = selection.select(nodeIDs: nodes)
88+
let nodeID = selection.select(nodeIDs: healthyContext.nodes)
8489
if let pool = self.runningClients[nodeID]?.pool {
8590
return pool
8691
} else {
@@ -109,8 +114,8 @@ struct ValkeyClientStateMachine<
109114
let client: ConnectionPool?
110115
switch action {
111116
case .useExistingPool:
112-
if case .running(let currentNodes) = self.state {
113-
if currentNodes.primary == address {
117+
if case .healthy(let healthyContext) = self.state {
118+
if healthyContext.nodes.primary == address {
114119
return .init()
115120
}
116121
}
@@ -119,7 +124,7 @@ struct ValkeyClientStateMachine<
119124
client = node
120125
}
121126

122-
self.state = .running(nodes)
127+
self.state = .healthy(.init(nodes: nodes))
123128
if self.configuration.findReplicas {
124129
switch self.topologyRefreshState {
125130
case .notRefreshing:
@@ -136,6 +141,17 @@ struct ValkeyClientStateMachine<
136141
}
137142
}
138143

144+
func getNodes() -> ValkeyNodeIDs<ValkeyServerAddress>? {
145+
switch self.state {
146+
case .uninitialized:
147+
preconditionFailure("Invalid state: \(self.state)")
148+
case .healthy(let healthContext):
149+
return healthContext.nodes
150+
case .shutdown:
151+
return nil
152+
}
153+
}
154+
139155
enum TimerFiredAction {
140156
case runRole
141157
case doNothing
@@ -147,7 +163,7 @@ struct ValkeyClientStateMachine<
147163
switch self.state {
148164
case .uninitialized:
149165
preconditionFailure("Invalid state: \(self.state)")
150-
case .running:
166+
case .healthy:
151167
break
152168
case .shutdown:
153169
return .doNothing
@@ -175,7 +191,7 @@ struct ValkeyClientStateMachine<
175191
switch self.state {
176192
case .shutdown, .uninitialized:
177193
return .cancelTimer(token)
178-
case .running:
194+
case .healthy:
179195
break
180196
}
181197

@@ -191,6 +207,28 @@ struct ValkeyClientStateMachine<
191207
}
192208
}
193209

210+
struct NewNodesAction {
211+
var clientsToRun: [ConnectionPool] = []
212+
}
213+
214+
mutating func addNewNodesFromRefresh(_ nodes: [ValkeyServerAddress]) -> NewNodesAction {
215+
// If we are shutdown ignore
216+
guard case .healthy = self.state else {
217+
return .init()
218+
}
219+
220+
switch self.topologyRefreshState {
221+
case .notRefreshing, .waitingForNextRefresh:
222+
preconditionFailure("Invalid state: \(self.topologyRefreshState)")
223+
224+
case .refreshing:
225+
let nodeDescriptions = nodes.map { ValkeyClientNodeDescription(address: $0) }
226+
let action = self.runningClients.updateNodes(nodeDescriptions, removeUnmentionedPools: false)
227+
precondition(action.poolsToShutdown.isEmpty, "There should be no pools to shutdown.")
228+
return .init(clientsToRun: action.poolsToRun.map { $0.0 })
229+
}
230+
}
231+
194232
struct TopologyRefreshAction {
195233
enum NextAction: Equatable {
196234
case refreshTopology
@@ -215,7 +253,7 @@ struct ValkeyClientStateMachine<
215253
/// Depending on what node we are connected to we either get a primary node or a list of replicas
216254
mutating func topologyRefreshSucceeded(_ topology: TopologyRefreshInput) -> TopologyRefreshAction {
217255
// If we are shutdown ignore
218-
guard case .running(let nodes) = self.state else {
256+
guard case .healthy(let healthyContext) = self.state else {
219257
return .init()
220258
}
221259

@@ -227,13 +265,13 @@ struct ValkeyClientStateMachine<
227265
switch topology {
228266
case .primary(let replicas):
229267
var nodeDescriptions = [
230-
ValkeyClientNodeDescription(address: nodes.primary)
268+
ValkeyClientNodeDescription(address: healthyContext.nodes.primary)
231269
]
232270
nodeDescriptions.append(
233271
contentsOf: replicas.lazy.map { ValkeyClientNodeDescription(address: $0) }
234272
)
235-
let newNodes = ValkeyNodeIDs(primary: nodes.primary, replicas: replicas)
236-
self.state = .running(newNodes)
273+
let newNodes = ValkeyNodeIDs(primary: healthyContext.nodes.primary, replicas: replicas)
274+
self.state = .healthy(.init(nodes: newNodes))
237275
let action = self.runningClients.updateNodes(nodeDescriptions, removeUnmentionedPools: true)
238276
let refreshTimerID = self.nextTimerID()
239277
self.topologyRefreshState = .waitingForNextRefresh(.init(timer: .init(id: refreshTimerID)))
@@ -254,7 +292,7 @@ struct ValkeyClientStateMachine<
254292
ValkeyClientNodeDescription(address: primary)
255293
]
256294
let newNodes = ValkeyNodeIDs(primary: primary, replicas: [])
257-
self.state = .running(newNodes)
295+
self.state = .healthy(.init(nodes: newNodes))
258296
let action = self.runningClients.updateNodes(nodeDescriptions, removeUnmentionedPools: false)
259297
return .init(
260298
nextAction: .refreshTopology,
@@ -271,7 +309,7 @@ struct ValkeyClientStateMachine<
271309

272310
mutating func topologyRefreshFailed() -> TopologyRefreshFailedAction {
273311
// If we are shutdown ignore
274-
guard case .running = self.state else {
312+
guard case .healthy = self.state else {
275313
return .init(retryTimer: nil)
276314
}
277315
switch self.topologyRefreshState {

Tests/IntegrationTests/StandaloneReplicaIntegrationTests.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -321,8 +321,8 @@ struct StandaloneReplicaIntegrationTests {
321321
func withFailover<Value>(_ client: ValkeyClient, operation: () async throws -> Value) async throws -> Value {
322322
// get primary address
323323
let (host, port) = client.stateMachine.withLock {
324-
guard case .running(let nodes) = $0.state else { fatalError("Expected a running primary node") }
325-
guard case .hostname(let host, let port) = nodes.primary.value else { fatalError("Expected a hostname") }
324+
guard case .healthy(let healthContext) = $0.state else { fatalError("Expected a running primary node") }
325+
guard case .hostname(let host, let port) = healthContext.nodes.primary.value else { fatalError("Expected a hostname") }
326326
return (host, port)
327327
}
328328
// extract replica address

0 commit comments

Comments
 (0)