diff --git a/go/pools/smartconnpool/pool.go b/go/pools/smartconnpool/pool.go index 04a46e5906d..4eb6c4fa57f 100644 --- a/go/pools/smartconnpool/pool.go +++ b/go/pools/smartconnpool/pool.go @@ -736,35 +736,74 @@ func (pool *ConnPool[C]) closeIdleResources(now time.Time) { mono := monotonicFromTime(now) closeInStack := func(s *connStack[C]) { - // Do a read-only best effort iteration of all the connection in this - // stack and atomically attempt to mark them as expired. - // Any connections that are marked as expired are _not_ removed from - // the stack; it's generally unsafe to remove nodes from the stack - // besides the head. When clients pop from the stack, they'll immediately - // notice the expired connection and ignore it. - // see: timestamp.expired - for conn := s.Peek(); conn != nil; conn = conn.next.Load() { - if conn.timeUsed.expired(mono, timeout) { - pool.Metrics.idleClosed.Add(1) + conn, ok := s.Pop() + if !ok { + // Early return to skip allocating slices when the stack is empty + return + } - conn.Close() - pool.closedConn() + activeConnections := pool.Active() + + // Only expire up to ~half of the active connections at a time. This should + // prevent us from closing too many connections in one go which could lead to + // a lot of `.Get` calls being added to the waitlist if there's a sudden spike + // coming in _after_ connections were popped off the stack but _before_ being + // returned back to the pool. This is unlikely to happen, but better safe than sorry. + // + // We always expire at least one connection per stack per iteration to ensure + // that idle connections are eventually closed even in small pools. + // + // We will expire any additional connections in the next iteration of the idle closer. + expiredConnections := make([]*Pooled[C], 0, max(activeConnections/2, 1)) + validConnections := make([]*Pooled[C], 0, activeConnections) + + // Pop out connections from the stack until we get a `nil` connection + for ok { + if conn.timeUsed.expired(mono, timeout) { + expiredConnections = append(expiredConnections, conn) - // Using context.Background() is fine since MySQL connection already enforces - // a connect timeout via the `db_connect_timeout_ms` config param. - c, err := pool.getNew(context.Background()) - if err != nil { - // If we couldn't open a new connection, just continue - continue + if len(expiredConnections) == cap(expiredConnections) { + // We have collected enough connections for this iteration to expire + break } + } else { + validConnections = append(validConnections, conn) + } - // opening a new connection might have raced with other goroutines, - // so it's possible that we got back `nil` here - if c != nil { - // Return the new connection to the pool - pool.tryReturnConn(c) - } + conn, ok = s.Pop() + } + + // Return all the valid connections back to waiters or the stack + // + // The order here is not important - because we can't guarantee to + // restore the order we got the connections out of the stack anyway. + // + // If we return the connections in the order popped off the stack: + // * waiters will get the newest connection first + // * stack will have the oldest connections at the top of the stack. + // + // If we return the connections in reverse order: + // * waiters will get the oldest connection first + // * stack will have the newest connections at the top of the stack. + // + // Neither of these is better or worse than the other. + for _, conn := range validConnections { + pool.tryReturnConn(conn) + } + + // Close all the expired connections and open new ones to replace them + for _, conn := range expiredConnections { + pool.Metrics.idleClosed.Add(1) + + conn.Close() + + err := pool.connReopen(context.Background(), conn, mono) + if err != nil { + pool.closedConn() + continue } + + pool.tryReturnConn(conn) } } diff --git a/go/pools/smartconnpool/pool_test.go b/go/pools/smartconnpool/pool_test.go index 4a40987ae5b..29785d771ff 100644 --- a/go/pools/smartconnpool/pool_test.go +++ b/go/pools/smartconnpool/pool_test.go @@ -1325,22 +1325,28 @@ func TestIdleTimeoutConnectionLeak(t *testing.T) { // Try to get connections while they're being reopened // This should trigger the bug where connections get discarded + wg := sync.WaitGroup{} + for i := 0; i < 2; i++ { - getCtx, cancel := context.WithTimeout(t.Context(), 50*time.Millisecond) - defer cancel() + wg.Go(func() { + getCtx, cancel := context.WithTimeout(t.Context(), 300*time.Millisecond) + defer cancel() - conn, err := p.Get(getCtx, nil) - require.NoError(t, err) + conn, err := p.Get(getCtx, nil) + require.NoError(t, err) - p.put(conn) + p.put(conn) + }) } + wg.Wait() + // Wait a moment for all reopening to complete require.EventuallyWithT(t, func(c *assert.CollectT) { // Check the actual number of currently open connections - require.Equal(c, int64(2), state.open.Load()) + assert.Equal(c, int64(2), state.open.Load()) // Check the total number of closed connections - require.Equal(c, int64(2), state.close.Load()) + assert.Equal(c, int64(2), state.close.Load()) }, 400*time.Millisecond, 10*time.Millisecond) // Check the pool state @@ -1365,3 +1371,82 @@ func TestIdleTimeoutConnectionLeak(t *testing.T) { assert.Equal(t, int64(0), state.open.Load()) assert.Equal(t, int64(4), state.close.Load()) } + +func TestIdleTimeoutDoesntLeaveLingeringConnection(t *testing.T) { + var state TestState + + ctx := context.Background() + p := NewPool(&Config[*TestConn]{ + Capacity: 10, + IdleTimeout: 50 * time.Millisecond, + LogWait: state.LogWait, + }).Open(newConnector(&state), nil) + + defer p.Close() + + var conns []*Pooled[*TestConn] + for i := 0; i < 10; i++ { + conn, err := p.Get(ctx, nil) + require.NoError(t, err) + conns = append(conns, conn) + } + + for _, conn := range conns { + p.put(conn) + } + + require.EqualValues(t, 10, p.Active()) + require.EqualValues(t, 10, p.Available()) + + // Wait a bit for the idle timeout worker to refresh connections + assert.Eventually(t, func() bool { + return p.Metrics.IdleClosed() > 10 + }, 500*time.Millisecond, 10*time.Millisecond, "Expected at least 10 connections to be closed by idle timeout") + + // Verify that new connections were created to replace the closed ones + require.EqualValues(t, 10, p.Active()) + require.EqualValues(t, 10, p.Available()) + + // Count how many connections in the stack are closed + totalInStack := 0 + for conn := p.clean.Peek(); conn != nil; conn = conn.next.Load() { + totalInStack++ + } + + require.LessOrEqual(t, totalInStack, 10) +} + +func BenchmarkPoolCleanupIdleConnectionsPerformanceNoIdleConnections(b *testing.B) { + var state TestState + + capacity := 1000 + + p := NewPool(&Config[*TestConn]{ + Capacity: int64(capacity), + IdleTimeout: 30 * time.Second, + LogWait: state.LogWait, + }).Open(newConnector(&state), nil) + defer p.Close() + + // Fill the pool + connections := make([]*Pooled[*TestConn], 0, capacity) + for range capacity { + conn, err := p.Get(context.Background(), nil) + if err != nil { + b.Fatal(err) + } + + connections = append(connections, conn) + } + + // Return all connections to the pool + for _, conn := range connections { + conn.Recycle() + } + + b.ResetTimer() + + for b.Loop() { + p.closeIdleResources(time.Now()) + } +}