Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/3-bug-fixes/optimize-cleanup-pending-deletes
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Reduce connection usage and number of SQL queries for checking for pending PostgreSQL migration cleanup.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ module Wire.ConversationStore.Migration.Cleanup where
import Cassandra
import Data.Id
import Data.Map qualified as Map
import Data.Vector (Vector)
import Hasql.Session qualified as Session
import Hasql.Statement (Statement)
import Hasql.Statement qualified as Hasql
import Hasql.TH
import Imports
Expand Down Expand Up @@ -113,28 +116,41 @@ deleteRemoteMemberStatusesFromCassandra uid = do
delete :: PrepQuery W (Identity UserId) ()
delete = "delete from user_remote_conv where user = ?"

cleanupIfNecessary :: (PGConstraints r, Member (Input ClientState) r, Member ConversationStore r) => [Either ConvId UserId] -> Sem r ()
cleanupIfNecessary = mapM_ (either cleanupConvIfNecessary cleanupUserIfNecessary)

cleanupUserIfNecessary :: (PGConstraints r, Member (Input ClientState) r) => UserId -> Sem r ()
cleanupUserIfNecessary uid =
whenM (isPendingDelete DeleteUser uid) $ do
deleteRemoteMemberStatusesFromCassandra uid
markDeletionComplete DeleteUser uid

cleanupConvIfNecessary :: (PGConstraints r, Member ConversationStore r) => ConvId -> Sem r ()
cleanupConvIfNecessary cid =
whenM (isPendingDelete DeleteConv cid) $ do
maybe (pure ()) deleteConv =<< getAllConvData cid
markDeletionComplete DeleteConv cid

isPendingDelete :: (PGConstraints r) => DeletionType -> Id a -> Sem r Bool
isPendingDelete typ id_ = runStatement (typ, id_) select
cleanupIfNecessary :: forall r. (PGConstraints r, Member (Input ClientState) r, Member ConversationStore r) => [Either ConvId UserId] -> Sem r ()
cleanupIfNecessary ids = do
(pendingConvIds, pendingUserIds) <- runSessionWithRetry $ do
let (convIds, userIds) = partitionEithers ids
pendingConvIds <- Session.statement (DeleteConv, convIds) filterPendingDeletes
pendingUserIds <- Session.statement (DeleteUser, userIds) filterPendingDeletes
pure (pendingConvIds, pendingUserIds)

unless (null pendingConvIds) $ do
cleanupConvs pendingConvIds
runStatement (DeleteConv, pendingConvIds) markDeletionsComplete

unless (null pendingUserIds) $ do
cleanupUsers pendingUserIds
runStatement (DeleteUser, pendingUserIds) markDeletionsComplete
where
select =
lmapPG
[singletonStatement|SELECT EXISTS (SELECT 1
FROM conversation_migration_pending_deletes
WHERE typ = $1 :: text AND id = $2 :: uuid
) :: boolean
|]
markDeletionsComplete :: Statement (DeletionType, [Id a]) ()
markDeletionsComplete =
lmapPG @(_, Vector _)
[resultlessStatement|DELETE FROM conversation_migration_pending_deletes
WHERE typ = $1 :: text AND id = ANY($2 :: uuid[])|]

filterPendingDeletes :: Statement (DeletionType, [Id a]) [Id a]
filterPendingDeletes =
dimapPG @(_, Vector _) @_ @(Vector _) @[_]
[vectorStatement|SELECT id :: uuid
FROM conversation_migration_pending_deletes
WHERE typ = $1 :: text AND id = ANY($2 :: uuid[])
|]
cleanupConvs :: [ConvId] -> Sem r ()
cleanupConvs =
mapM_ $ \cid -> do
mConvData <- getAllConvData cid
forM_ mConvData deleteConv

cleanupUsers :: [UserId] -> Sem r ()
cleanupUsers =
mapM_ deleteRemoteMemberStatusesFromCassandra