From a77221841d840797a668095d16157efdd5ee49d0 Mon Sep 17 00:00:00 2001 From: sh Date: Wed, 20 May 2026 08:55:56 +0000 Subject: [PATCH 1/2] agent: use primary key index in setRcvServiceAssocs Previous WHERE rcv_id = ? did not match the (host, port, rcv_id) primary key prefix and fell back to a table scan via idx_rcv_queues_client_notice_id. With ~390k rows per queue, each update in a 1350-row batch scanned the whole table, yielding ~290s per batch and a multi-hour rcv-services migration. --- src/Simplex/Messaging/Agent/Store/AgentStore.hs | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/src/Simplex/Messaging/Agent/Store/AgentStore.hs b/src/Simplex/Messaging/Agent/Store/AgentStore.hs index 6a369ee2c..72539eeb3 100644 --- a/src/Simplex/Messaging/Agent/Store/AgentStore.hs +++ b/src/Simplex/Messaging/Agent/Store/AgentStore.hs @@ -2400,11 +2400,19 @@ unsetQueuesToSubscribe :: DB.Connection -> IO () unsetQueuesToSubscribe db = DB.execute_ db "UPDATE rcv_queues SET to_subscribe = 0 WHERE to_subscribe = 1" setRcvServiceAssocs :: SMPQueue q => DB.Connection -> [q] -> IO () -setRcvServiceAssocs db rqs = do +setRcvServiceAssocs _ [] = pure () +setRcvServiceAssocs db rqs@(q : _) = + let ProtocolServer {host, port} = qServer q #if defined(dbPostgres) - DB.execute db "UPDATE rcv_queues SET rcv_service_assoc = 1 WHERE rcv_id IN ?" $ Only $ In (map queueId rqs) + in DB.execute + db + "UPDATE rcv_queues SET rcv_service_assoc = 1 WHERE host = ? AND port = ? AND rcv_id IN ?" + (host, port, In (map queueId rqs)) #else - DB.executeMany db "UPDATE rcv_queues SET rcv_service_assoc = 1 WHERE rcv_id = ?" $ map (Only . queueId) rqs + in DB.executeMany + db + "UPDATE rcv_queues SET rcv_service_assoc = 1 WHERE host = ? AND port = ? AND rcv_id = ?" + (map (\q' -> (host, port, queueId q')) rqs) #endif removeRcvServiceAssocs :: DB.Connection -> UserId -> SMPServer -> IO () From f57e53abcecde4a4ed2e2ba3cc6017e3fa86959a Mon Sep 17 00:00:00 2001 From: sh Date: Wed, 20 May 2026 10:39:13 +0000 Subject: [PATCH 2/2] agent: pass SMPServer explicitly to setRcvServiceAssocs Avoid extracting host/port from the first queue inside setRcvServiceAssocs. The caller already has SMPServer in scope (from tSess) and the call chain is short, so threading it through is simpler than inspecting the list. Removes the empty-list guard from setRcvServiceAssocs (it remains in processRcvServiceAssocs). --- src/Simplex/Messaging/Agent.hs | 2 +- src/Simplex/Messaging/Agent/Client.hs | 10 ++++----- .../Messaging/Agent/Store/AgentStore.hs | 22 +++++++++---------- 3 files changed, 16 insertions(+), 18 deletions(-) diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 39f6fd15f..bd77b892a 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -3106,7 +3106,7 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), THandlePar unless (null connIds) $ do notify' "" $ UP srv connIds atomically $ incSMPServerStat' c userId srv connSubscribed $ length connIds - readTVarIO serviceRQs >>= processRcvServiceAssocs c + readTVarIO serviceRQs >>= processRcvServiceAssocs c srv where withRcvConn :: SMP.RecipientId -> (forall c. RcvQueue -> Connection c -> AM ()) -> AM' () withRcvConn rId a = do diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 1039c5d75..d33794006 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -1692,7 +1692,7 @@ subscribeSessQueues_ c withEvents qs = sendClientBatch_ "SUB" False subscribe_ c unless (null notices) $ takeTMVar $ clientNoticesLock c pure r unless (null serviceQs) $ void $ - processRcvServiceAssocs c serviceQs `runReaderT` agentEnv c + processRcvServiceAssocs c srv serviceQs `runReaderT` agentEnv c unless (null notices) $ void $ (processClientNotices c tSess notices `runReaderT` agentEnv c) `E.finally` atomically (putTMVar (clientNoticesLock c) ()) @@ -1714,10 +1714,10 @@ subscribeSessQueues_ c withEvents qs = sendClientBatch_ "SUB" False subscribe_ c tSess = transportSession' smp sessId = sessionId $ thParams smp -processRcvServiceAssocs :: SMPQueue q => AgentClient -> [q] -> AM' () -processRcvServiceAssocs _ [] = pure () -processRcvServiceAssocs c serviceQs = - withStore' c (`setRcvServiceAssocs` serviceQs) `catchAllErrors'` \e -> do +processRcvServiceAssocs :: SMPQueue q => AgentClient -> SMPServer -> [q] -> AM' () +processRcvServiceAssocs _ _ [] = pure () +processRcvServiceAssocs c srv serviceQs = + withStore' c (\db -> setRcvServiceAssocs db srv serviceQs) `catchAllErrors'` \e -> do logError $ "processRcvServiceAssocs error: " <> tshow e notifySub' c "" $ ERR e diff --git a/src/Simplex/Messaging/Agent/Store/AgentStore.hs b/src/Simplex/Messaging/Agent/Store/AgentStore.hs index 72539eeb3..e3ea1671b 100644 --- a/src/Simplex/Messaging/Agent/Store/AgentStore.hs +++ b/src/Simplex/Messaging/Agent/Store/AgentStore.hs @@ -2399,20 +2399,18 @@ unassocUserServerRcvQueueSubs' db userId srv@(SMPServer h p kh) = do unsetQueuesToSubscribe :: DB.Connection -> IO () unsetQueuesToSubscribe db = DB.execute_ db "UPDATE rcv_queues SET to_subscribe = 0 WHERE to_subscribe = 1" -setRcvServiceAssocs :: SMPQueue q => DB.Connection -> [q] -> IO () -setRcvServiceAssocs _ [] = pure () -setRcvServiceAssocs db rqs@(q : _) = - let ProtocolServer {host, port} = qServer q +setRcvServiceAssocs :: SMPQueue q => DB.Connection -> SMPServer -> [q] -> IO () +setRcvServiceAssocs db ProtocolServer {host, port} rqs = #if defined(dbPostgres) - in DB.execute - db - "UPDATE rcv_queues SET rcv_service_assoc = 1 WHERE host = ? AND port = ? AND rcv_id IN ?" - (host, port, In (map queueId rqs)) + DB.execute + db + "UPDATE rcv_queues SET rcv_service_assoc = 1 WHERE host = ? AND port = ? AND rcv_id IN ?" + (host, port, In (map queueId rqs)) #else - in DB.executeMany - db - "UPDATE rcv_queues SET rcv_service_assoc = 1 WHERE host = ? AND port = ? AND rcv_id = ?" - (map (\q' -> (host, port, queueId q')) rqs) + DB.executeMany + db + "UPDATE rcv_queues SET rcv_service_assoc = 1 WHERE host = ? AND port = ? AND rcv_id = ?" + (map (\q -> (host, port, queueId q)) rqs) #endif removeRcvServiceAssocs :: DB.Connection -> UserId -> SMPServer -> IO ()