Skip to content

Commit d81be76

Browse files
kevin-wu24jsancio
authored andcommitted
KAFKA-20247; Controller registration needs to retry after request timeout (#21619)
Set pendingRpc to false when controller registration times out. If we do not set this, controllers cannot send ControllerRegistrationRequests thereafter. Instead, subsequent calls to maybeSendControllerRegistration() will always log: "maybeSendControllerRegistration: waiting for the previous RPC to complete." The asserts on L294 and L300 fail when pendingRpc does not get set in onTimeout. Previously, RegistrationResponseHandler callbacks were invoked from the NodeToControllerRequestThread. Instead, these callbacks should append an event to the ControllerRegistrationManger event queue. Added testRetransmitRegistrationAfterTimeout. This test times out a controller registration, and checks that the registration manager channel manager's request queue state is as expected. Reviewers: José Armando García Sancio <jsancio@apache.org>
1 parent f42e4df commit d81be76

2 files changed

Lines changed: 72 additions & 8 deletions

File tree

core/src/main/scala/kafka/server/ControllerRegistrationManager.scala

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,16 @@ class ControllerRegistrationManager(
238238

239239
private class RegistrationResponseHandler extends ControllerRequestCompletionHandler {
240240
override def onComplete(response: ClientResponse): Unit = {
241+
eventQueue.append(new RequestCompleteEvent(response))
242+
}
243+
244+
override def onTimeout(): Unit = {
245+
eventQueue.append(new RequestTimeoutEvent())
246+
}
247+
}
248+
249+
private class RequestCompleteEvent(response: ClientResponse) extends EventQueue.Event {
250+
override def run(): Unit = {
241251
pendingRpc = false
242252
if (response.authenticationException() != null) {
243253
error(s"RegistrationResponseHandler: authentication error", response.authenticationException())
@@ -265,8 +275,11 @@ class ControllerRegistrationManager(
265275
}
266276
}
267277
}
278+
}
268279

269-
override def onTimeout(): Unit = {
280+
private class RequestTimeoutEvent extends EventQueue.Event {
281+
override def run(): Unit = {
282+
pendingRpc = false
270283
error(s"RegistrationResponseHandler: channel manager timed out before sending the request.")
271284
scheduleNextCommunicationAfterFailure()
272285
}

core/src/test/scala/unit/kafka/server/ControllerRegistrationManagerTest.scala

Lines changed: 58 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import org.apache.kafka.common.message.ControllerRegistrationResponseData
2222
import org.apache.kafka.common.metadata.{FeatureLevelRecord, RegisterControllerRecord}
2323
import org.apache.kafka.common.protocol.Errors
2424
import org.apache.kafka.common.requests.ControllerRegistrationResponse
25-
import org.apache.kafka.common.utils.{ExponentialBackoff, Time}
25+
import org.apache.kafka.common.utils.ExponentialBackoff
2626
import org.apache.kafka.image.loader.{LogDeltaManifest, SnapshotManifest}
2727
import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataProvenance}
2828
import org.apache.kafka.metadata.{ListenerInfo, RecordTestUtils, VersionRange}
@@ -72,12 +72,15 @@ class ControllerRegistrationManagerTest {
7272
): ControllerRegistrationManager = {
7373
new ControllerRegistrationManager(context.config.nodeId,
7474
context.clusterId,
75-
Time.SYSTEM,
75+
context.time,
7676
"controller-registration-manager-test-",
7777
createSupportedFeatures(MetadataVersion.IBP_3_7_IV0),
7878
RecordTestUtils.createTestControllerRegistration(1, false).incarnationId(),
7979
ListenerInfo.create(context.config.controllerListeners.map(_.toJava).asJava),
80-
new ExponentialBackoff(1, 2, 100, 0.02))
80+
// Use a backoff with no jitter so we can reliably observe the intermediate
81+
// state after receiving error responses and before the scheduled retries.
82+
new ExponentialBackoff(1, 2, 100, 0)
83+
)
8184
}
8285

8386
private def registeredInLog(manager: ControllerRegistrationManager): Boolean = {
@@ -249,20 +252,68 @@ class ControllerRegistrationManagerTest {
249252
try {
250253
context.controllerNodeProvider.node.set(controller1)
251254
manager.start(context.mockChannelManager)
255+
256+
// send a ControllerRegistrationRequest after learning the MV
257+
doMetadataUpdate(MetadataImage.EMPTY,
258+
manager,
259+
MetadataVersion.IBP_3_7_IV0,
260+
r => if (r.controllerId() == 1) None else Some(r))
261+
assertEquals((true, 0, 0), rpcStats(manager))
262+
263+
// the first response will trigger a retry
252264
context.mockClient.prepareResponseFrom(new ControllerRegistrationResponse(
253265
new ControllerRegistrationResponseData().
254266
setErrorCode(Errors.UNKNOWN_CONTROLLER_ID.code()).
255267
setErrorMessage("Unknown controller 1")), controller1)
268+
context.mockChannelManager.poll()
269+
assertEquals((false, 0, 1), rpcStats(manager))
270+
271+
// the retried request will be sent after retryBackoffMs
272+
context.time.sleep(1)
273+
assertEquals((true, 0, 1), rpcStats(manager))
274+
275+
// the second response will complete the RPC successfully
256276
context.mockClient.prepareResponseFrom(new ControllerRegistrationResponse(
257277
new ControllerRegistrationResponseData()), controller1)
278+
context.mockChannelManager.poll()
279+
assertEquals((false, 1, 0), rpcStats(manager))
280+
} finally {
281+
manager.close()
282+
}
283+
}
284+
285+
@Test
286+
def testRetransmitRegistrationAfterTimeout(): Unit = {
287+
val context = new RegistrationTestContext(configProperties)
288+
val manager = newControllerRegistrationManager(context)
289+
try {
290+
context.controllerNodeProvider.node.set(controller1)
291+
292+
// send a ControllerRegistrationRequest after learning the MV
293+
manager.start(context.mockChannelManager)
294+
assertFalse(registeredInLog(manager))
295+
assertEquals((false, 0, 0), rpcStats(manager))
258296
doMetadataUpdate(MetadataImage.EMPTY,
259297
manager,
260298
MetadataVersion.IBP_3_7_IV0,
261299
r => if (r.controllerId() == 1) None else Some(r))
262-
TestUtils.retryOnExceptionWithTimeout(30000, () => {
263-
context.mockChannelManager.poll()
264-
assertEquals((false, 1, 0), rpcStats(manager))
265-
})
300+
// pendingRpc = true, successfulRpcs = 0, failedRpcs = 0
301+
assertEquals((true, 0, 0), rpcStats(manager))
302+
assertEquals(1, context.mockChannelManager.unsentQueue.size())
303+
304+
// time out the request before polling
305+
// this will call the timeout callback
306+
context.time.sleep(context.mockChannelManager.getTimeoutMs)
307+
context.mockChannelManager.poll()
308+
// pendingRpc = false, successfulRpcs = 0, failedRpcs = 1
309+
assertEquals((false, 0, 1), rpcStats(manager))
310+
assertEquals(0, context.mockChannelManager.unsentQueue.size())
311+
312+
// the retried request will be sent after retryBackoffMs
313+
context.time.sleep(1)
314+
// pendingRpc = true, successfulRpcs = 0, failedRpcs = 1
315+
assertEquals((true, 0, 1), rpcStats(manager))
316+
assertEquals(1, context.mockChannelManager.unsentQueue.size())
266317
} finally {
267318
manager.close()
268319
}

0 commit comments

Comments
 (0)