diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 39f6fd15f..bd77b892a 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -3106,7 +3106,7 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), THandlePar unless (null connIds) $ do notify' "" $ UP srv connIds atomically $ incSMPServerStat' c userId srv connSubscribed $ length connIds - readTVarIO serviceRQs >>= processRcvServiceAssocs c + readTVarIO serviceRQs >>= processRcvServiceAssocs c srv where withRcvConn :: SMP.RecipientId -> (forall c. RcvQueue -> Connection c -> AM ()) -> AM' () withRcvConn rId a = do diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 1039c5d75..d33794006 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -1692,7 +1692,7 @@ subscribeSessQueues_ c withEvents qs = sendClientBatch_ "SUB" False subscribe_ c unless (null notices) $ takeTMVar $ clientNoticesLock c pure r unless (null serviceQs) $ void $ - processRcvServiceAssocs c serviceQs `runReaderT` agentEnv c + processRcvServiceAssocs c srv serviceQs `runReaderT` agentEnv c unless (null notices) $ void $ (processClientNotices c tSess notices `runReaderT` agentEnv c) `E.finally` atomically (putTMVar (clientNoticesLock c) ()) @@ -1714,10 +1714,10 @@ subscribeSessQueues_ c withEvents qs = sendClientBatch_ "SUB" False subscribe_ c tSess = transportSession' smp sessId = sessionId $ thParams smp -processRcvServiceAssocs :: SMPQueue q => AgentClient -> [q] -> AM' () -processRcvServiceAssocs _ [] = pure () -processRcvServiceAssocs c serviceQs = - withStore' c (`setRcvServiceAssocs` serviceQs) `catchAllErrors'` \e -> do +processRcvServiceAssocs :: SMPQueue q => AgentClient -> SMPServer -> [q] -> AM' () +processRcvServiceAssocs _ _ [] = pure () +processRcvServiceAssocs c srv serviceQs = + withStore' c (\db -> setRcvServiceAssocs db srv serviceQs) `catchAllErrors'` \e -> do logError $ "processRcvServiceAssocs error: " <> tshow e notifySub' c "" $ ERR e diff --git a/src/Simplex/Messaging/Agent/Store/AgentStore.hs b/src/Simplex/Messaging/Agent/Store/AgentStore.hs index 6a369ee2c..e3ea1671b 100644 --- a/src/Simplex/Messaging/Agent/Store/AgentStore.hs +++ b/src/Simplex/Messaging/Agent/Store/AgentStore.hs @@ -2399,12 +2399,18 @@ unassocUserServerRcvQueueSubs' db userId srv@(SMPServer h p kh) = do unsetQueuesToSubscribe :: DB.Connection -> IO () unsetQueuesToSubscribe db = DB.execute_ db "UPDATE rcv_queues SET to_subscribe = 0 WHERE to_subscribe = 1" -setRcvServiceAssocs :: SMPQueue q => DB.Connection -> [q] -> IO () -setRcvServiceAssocs db rqs = do +setRcvServiceAssocs :: SMPQueue q => DB.Connection -> SMPServer -> [q] -> IO () +setRcvServiceAssocs db ProtocolServer {host, port} rqs = #if defined(dbPostgres) - DB.execute db "UPDATE rcv_queues SET rcv_service_assoc = 1 WHERE rcv_id IN ?" $ Only $ In (map queueId rqs) + DB.execute + db + "UPDATE rcv_queues SET rcv_service_assoc = 1 WHERE host = ? AND port = ? AND rcv_id IN ?" + (host, port, In (map queueId rqs)) #else - DB.executeMany db "UPDATE rcv_queues SET rcv_service_assoc = 1 WHERE rcv_id = ?" $ map (Only . queueId) rqs + DB.executeMany + db + "UPDATE rcv_queues SET rcv_service_assoc = 1 WHERE host = ? AND port = ? AND rcv_id = ?" + (map (\q -> (host, port, queueId q)) rqs) #endif removeRcvServiceAssocs :: DB.Connection -> UserId -> SMPServer -> IO ()