Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Simplex/Messaging/Agent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3106,7 +3106,7 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), THandlePar
unless (null connIds) $ do
notify' "" $ UP srv connIds
atomically $ incSMPServerStat' c userId srv connSubscribed $ length connIds
readTVarIO serviceRQs >>= processRcvServiceAssocs c
readTVarIO serviceRQs >>= processRcvServiceAssocs c srv
where
withRcvConn :: SMP.RecipientId -> (forall c. RcvQueue -> Connection c -> AM ()) -> AM' ()
withRcvConn rId a = do
Expand Down
10 changes: 5 additions & 5 deletions src/Simplex/Messaging/Agent/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -1692,7 +1692,7 @@ subscribeSessQueues_ c withEvents qs = sendClientBatch_ "SUB" False subscribe_ c
unless (null notices) $ takeTMVar $ clientNoticesLock c
pure r
unless (null serviceQs) $ void $
processRcvServiceAssocs c serviceQs `runReaderT` agentEnv c
processRcvServiceAssocs c srv serviceQs `runReaderT` agentEnv c
unless (null notices) $ void $
(processClientNotices c tSess notices `runReaderT` agentEnv c)
`E.finally` atomically (putTMVar (clientNoticesLock c) ())
Expand All @@ -1714,10 +1714,10 @@ subscribeSessQueues_ c withEvents qs = sendClientBatch_ "SUB" False subscribe_ c
tSess = transportSession' smp
sessId = sessionId $ thParams smp

processRcvServiceAssocs :: SMPQueue q => AgentClient -> [q] -> AM' ()
processRcvServiceAssocs _ [] = pure ()
processRcvServiceAssocs c serviceQs =
withStore' c (`setRcvServiceAssocs` serviceQs) `catchAllErrors'` \e -> do
processRcvServiceAssocs :: SMPQueue q => AgentClient -> SMPServer -> [q] -> AM' ()
processRcvServiceAssocs _ _ [] = pure ()
processRcvServiceAssocs c srv serviceQs =
withStore' c (\db -> setRcvServiceAssocs db srv serviceQs) `catchAllErrors'` \e -> do
logError $ "processRcvServiceAssocs error: " <> tshow e
notifySub' c "" $ ERR e

Expand Down
14 changes: 10 additions & 4 deletions src/Simplex/Messaging/Agent/Store/AgentStore.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2399,12 +2399,18 @@ unassocUserServerRcvQueueSubs' db userId srv@(SMPServer h p kh) = do
unsetQueuesToSubscribe :: DB.Connection -> IO ()
unsetQueuesToSubscribe db = DB.execute_ db "UPDATE rcv_queues SET to_subscribe = 0 WHERE to_subscribe = 1"

setRcvServiceAssocs :: SMPQueue q => DB.Connection -> [q] -> IO ()
setRcvServiceAssocs db rqs = do
setRcvServiceAssocs :: SMPQueue q => DB.Connection -> SMPServer -> [q] -> IO ()
setRcvServiceAssocs db ProtocolServer {host, port} rqs =
#if defined(dbPostgres)
DB.execute db "UPDATE rcv_queues SET rcv_service_assoc = 1 WHERE rcv_id IN ?" $ Only $ In (map queueId rqs)
DB.execute
db
"UPDATE rcv_queues SET rcv_service_assoc = 1 WHERE host = ? AND port = ? AND rcv_id IN ?"
(host, port, In (map queueId rqs))
#else
DB.executeMany db "UPDATE rcv_queues SET rcv_service_assoc = 1 WHERE rcv_id = ?" $ map (Only . queueId) rqs
DB.executeMany
db
"UPDATE rcv_queues SET rcv_service_assoc = 1 WHERE host = ? AND port = ? AND rcv_id = ?"
(map (\q -> (host, port, queueId q)) rqs)
#endif

removeRcvServiceAssocs :: DB.Connection -> UserId -> SMPServer -> IO ()
Expand Down
Loading