core: update simplexmq (avoid deleting shared message bodies) (#5630)

* core: update simplexmq (avoid deleting shared message bodies)

* simplexmq, plans

* simplexmq

* output in failing test

* stabilize test
This commit is contained in:
Evgeny 2025-02-15 16:18:34 +00:00 committed by GitHub
parent e9893989df
commit 1f8755f941
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 49 additions and 32 deletions

View file

@ -12,7 +12,7 @@ constraints: zip +disable-bzip2 +disable-zstd
source-repository-package
type: git
location: https://github.com/simplex-chat/simplexmq.git
tag: 7ac80bffcb51e2461ff8d0f54094943c56f1c4e6
tag: fa67d128d1d1c3edb3c49886f096c36e3f4da0d0
source-repository-package
type: git

View file

@ -1,5 +1,5 @@
{
"https://github.com/simplex-chat/simplexmq.git"."7ac80bffcb51e2461ff8d0f54094943c56f1c4e6" = "1qzv3vs1f0h5df5i0fi2hqiagkkwqghpzjgh9bnjrgmhkhvkl2iq";
"https://github.com/simplex-chat/simplexmq.git"."fa67d128d1d1c3edb3c49886f096c36e3f4da0d0" = "13fl760rbw75rqa1zs4f130w7a75dfqw43w52jjnfpxgqis3dd2z";
"https://github.com/simplex-chat/hs-socks.git"."a30cc7a79a08d8108316094f8f2f82a0c5e1ac51" = "0yasvnr7g91k76mjkamvzab2kvlb1g5pspjyjn2fr6v83swjhj38";
"https://github.com/simplex-chat/direct-sqlcipher.git"."f814ee68b16a9447fbb467ccc8f29bdd3546bfd9" = "1ql13f4kfwkbaq7nygkxgw84213i0zm7c1a8hwvramayxl38dq5d";
"https://github.com/simplex-chat/sqlcipher-simple.git"."a46bd361a19376c5211f1058908fc0ae6bf42446" = "1z0r78d8f0812kxbgsm735qf6xx8lvaz27k1a0b4a2m0sshpd5gl";

View file

@ -75,6 +75,17 @@ Query:
Plan:
SEARCH snd_file_chunks USING INDEX idx_snd_file_chunks_snd_file_id (snd_file_id=?)
Query:
DELETE FROM snd_message_bodies
WHERE NOT EXISTS (SELECT 1 FROM snd_messages WHERE snd_message_body_id = ?)
AND snd_message_body_id = ?
Plan:
SEARCH snd_message_bodies USING INTEGER PRIMARY KEY (rowid=?)
SCALAR SUBQUERY 1
SEARCH snd_messages USING COVERING INDEX idx_snd_messages_snd_message_body_id (snd_message_body_id=?)
SEARCH snd_messages USING COVERING INDEX idx_snd_messages_snd_message_body_id (snd_message_body_id=?)
Query:
SELECT
f.rcv_file_id, f.rcv_file_entity_id, f.user_id, c.rcv_file_chunk_id, c.chunk_no, c.chunk_size, c.digest, f.tmp_path, c.tmp_path,
@ -188,6 +199,16 @@ SEARCH c USING INTEGER PRIMARY KEY (rowid=?)
SEARCH f USING INTEGER PRIMARY KEY (rowid=?)
USE TEMP B-TREE FOR ORDER BY
Query:
SELECT rcpt_status, snd_message_body_id FROM snd_messages
WHERE NOT EXISTS (SELECT 1 FROM snd_message_deliveries WHERE conn_id = ? AND internal_id = ? AND failed = 0)
AND conn_id = ? AND internal_id = ?
Plan:
SEARCH snd_messages USING PRIMARY KEY (conn_id=?)
SCALAR SUBQUERY 1
SEARCH snd_message_deliveries USING COVERING INDEX idx_snd_message_deliveries_expired (conn_id=?)
Query:
SELECT rcv_file_entity_id, user_id, size, digest, key, nonce, chunk_size, prefix_path, tmp_path, save_path, save_file_key, save_file_nonce, status, deleted, redirect_id, redirect_entity_id, redirect_size, redirect_digest
FROM rcv_files
@ -809,11 +830,6 @@ Plan:
SEARCH snd_files USING INTEGER PRIMARY KEY (rowid=?)
SEARCH snd_file_chunks USING COVERING INDEX idx_snd_file_chunks_snd_file_id (snd_file_id=?)
Query: DELETE FROM snd_message_bodies WHERE snd_message_body_id = ?
Plan:
SEARCH snd_message_bodies USING INTEGER PRIMARY KEY (rowid=?)
SEARCH snd_messages USING COVERING INDEX idx_snd_messages_snd_message_body_id (snd_message_body_id=?)
Query: DELETE FROM snd_message_deliveries WHERE conn_id = ? AND snd_queue_id = ?
Plan:
SEARCH snd_message_deliveries USING COVERING INDEX idx_snd_message_deliveries (conn_id=? AND snd_queue_id=?)
@ -906,10 +922,6 @@ Query: SELECT conn_id FROM connections WHERE user_id = ?
Plan:
SEARCH connections USING COVERING INDEX idx_connections_user (user_id=?)
Query: SELECT count(*) FROM snd_message_deliveries WHERE conn_id = ? AND internal_id = ? AND failed = 0
Plan:
SEARCH snd_message_deliveries USING COVERING INDEX idx_snd_message_deliveries_expired (conn_id=?)
Query: SELECT count(1) FROM snd_message_bodies
Plan:
SCAN snd_message_bodies
@ -938,10 +950,6 @@ Query: SELECT ratchet_state, x3dh_pub_key_1, x3dh_pub_key_2, pq_pub_kem FROM rat
Plan:
SEARCH ratchets USING PRIMARY KEY (conn_id=?)
Query: SELECT rcpt_internal_id, rcpt_status, snd_message_body_id FROM snd_messages WHERE conn_id = ? AND internal_id = ?
Plan:
SEARCH snd_messages USING PRIMARY KEY (conn_id=?)
Query: SELECT rcv_file_id FROM rcv_files WHERE rcv_file_entity_id = ?
Plan:
SEARCH rcv_files USING COVERING INDEX sqlite_autoindex_rcv_files_1 (rcv_file_entity_id=?)
@ -1106,6 +1114,10 @@ Query: UPDATE snd_messages SET retry_int_slow = ?, retry_int_fast = ? WHERE conn
Plan:
SEARCH snd_messages USING COVERING INDEX idx_snd_messages_conn_id_internal_id (conn_id=? AND internal_id=?)
Query: UPDATE snd_messages SET snd_message_body_id = NULL WHERE conn_id = ? AND internal_id = ?
Plan:
SEARCH snd_messages USING COVERING INDEX idx_snd_messages_conn_id_internal_id (conn_id=? AND internal_id=?)
Query: UPDATE snd_queues SET snd_primary = ? WHERE conn_id = ?
Plan:
SEARCH snd_queues USING COVERING INDEX idx_snd_queue_id (conn_id=?)

View file

@ -259,13 +259,13 @@ createTestChat ps cfg opts@ChatOpts {coreOptions} dbPrefix profile = do
Right db@ChatDatabase {chatStore, agentStore} <- createDatabase ps coreOptions dbPrefix
insertUser agentStore
Right user <- withTransaction chatStore $ \db' -> runExceptT $ createUserRecord db' (AgentUserId 1) profile True
startTestChat_ db cfg opts user
startTestChat_ ps db cfg opts user
startTestChat :: TestParams -> ChatConfig -> ChatOpts -> String -> IO TestCC
startTestChat ps cfg opts@ChatOpts {coreOptions} dbPrefix = do
Right db@ChatDatabase {chatStore} <- createDatabase ps coreOptions dbPrefix
Just user <- find activeUser <$> withTransaction chatStore getUsers
startTestChat_ db cfg opts user
startTestChat_ ps db cfg opts user
createDatabase :: TestParams -> CoreChatOpts -> String -> IO (Either MigrationError ChatDatabase)
#if defined(dbPostgres)
@ -282,8 +282,8 @@ insertUser :: DBStore -> IO ()
insertUser st = withTransaction st (`DB.execute_` "INSERT INTO users (user_id) VALUES (1)")
#endif
startTestChat_ :: ChatDatabase -> ChatConfig -> ChatOpts -> User -> IO TestCC
startTestChat_ db cfg opts user = do
startTestChat_ :: TestParams -> ChatDatabase -> ChatConfig -> ChatOpts -> User -> IO TestCC
startTestChat_ TestParams {printOutput} db cfg opts user = do
t <- withVirtualTerminal termSettings pure
ct <- newChatTerminal t opts
cc <- newChatController db (Just user) cfg opts False
@ -292,7 +292,7 @@ startTestChat_ db cfg opts user = do
atomically . unless (maintenance opts) $ readTVar (agentAsync cc) >>= \a -> when (isNothing a) retry
termQ <- newTQueueIO
termAsync <- async $ readTerminalOutput t termQ
pure TestCC {chatController = cc, virtualTerminal = t, chatAsync, termAsync, termQ, printOutput = False}
pure TestCC {chatController = cc, virtualTerminal = t, chatAsync, termAsync, termQ, printOutput}
stopTestChat :: TestParams -> TestCC -> IO ()
stopTestChat ps TestCC {chatController = cc@ChatController {smpAgent, chatStore}, chatAsync, termAsync} = do
@ -355,10 +355,10 @@ withTestChatOpts ps = withTestChatCfgOpts ps testCfg
withTestChatCfgOpts :: HasCallStack => TestParams -> ChatConfig -> ChatOpts -> String -> (HasCallStack => TestCC -> IO a) -> IO a
withTestChatCfgOpts ps cfg opts dbPrefix = bracket (startTestChat ps cfg opts dbPrefix) (\cc -> cc <// 100000 >> stopTestChat ps cc)
-- enable output for specific chat controller, use like this:
-- withNewTestChat tmp "alice" aliceProfile $ \a -> withTestOutput a $ \alice -> do ...
withTestOutput :: HasCallStack => TestCC -> (HasCallStack => TestCC -> IO a) -> IO a
withTestOutput cc runTest = runTest cc {printOutput = True}
-- enable output for specific test.
-- usage: withTestOutput $ testChat2 aliceProfile bobProfile $ \alice bob -> do ...
withTestOutput :: HasCallStack => (HasCallStack => TestParams -> IO ()) -> TestParams -> IO ()
withTestOutput test ps = test ps {printOutput = True}
readTerminalOutput :: VirtualTerminal -> TQueue String -> IO ()
readTerminalOutput t termQ = do
@ -404,12 +404,12 @@ testChatN cfg opts ps test params = do
(<//) cc t = timeout t (getTermLine cc) `shouldReturn` Nothing
getTermLine :: HasCallStack => TestCC -> IO String
getTermLine cc =
getTermLine cc@TestCC {printOutput} =
5000000 `timeout` atomically (readTQueue $ termQ cc) >>= \case
Just s -> do
-- remove condition to always echo virtual terminal
-- when True $ do
when (printOutput cc) $ do
when printOutput $ do
name <- userName cc
putStrLn $ name <> ": " <> s
pure s

View file

@ -6,6 +6,7 @@ import Simplex.Messaging.TMap (TMap)
data TestParams = TestParams
{ tmpPath :: FilePath,
printOutput :: Bool,
chatQueryStats :: TMap Query SlowQueryStats,
agentQueryStats :: TMap Query SlowQueryStats
}

View file

@ -1604,7 +1604,7 @@ testGroupModerate =
testGroupModerateOwn :: HasCallStack => TestParams -> IO ()
testGroupModerateOwn =
testChat2 aliceProfile bobProfile $
withTestOutput $ testChat2 aliceProfile bobProfile $
\alice bob -> do
createGroup2 "team" alice bob
-- disableFullDeletion2 "team" alice bob

View file

@ -742,7 +742,7 @@ testBusinessAddress = testChat3 businessProfile aliceProfile {fullName = "Alice
(biz <# "#bob bob_1> hey there")
testBusinessUpdateProfiles :: HasCallStack => TestParams -> IO ()
testBusinessUpdateProfiles = testChat4 businessProfile aliceProfile bobProfile cathProfile $
testBusinessUpdateProfiles = withTestOutput $ testChat4 businessProfile aliceProfile bobProfile cathProfile $
\biz alice bob cath -> do
biz ##> "/ad"
cLink <- getContactLink biz True
@ -756,8 +756,11 @@ testBusinessUpdateProfiles = testChat4 businessProfile aliceProfile bobProfile c
alice <## "#biz: joining the group..."
biz <# "#alice Welcome" -- auto reply
biz <## "#alice: alice_1 joined the group"
alice <# "#biz biz_1> Welcome"
alice <## "#biz: you joined the group"
alice
<###
[ WithTime "#biz biz_1> Welcome",
"#biz: you joined the group"
]
biz #> "#alice hi"
alice <# "#biz biz_1> hi"
alice #> "#biz hello"

View file

@ -1,4 +1,5 @@
{-# LANGUAGE CPP #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE TupleSections #-}
@ -75,10 +76,10 @@ main = do
#endif
where
#if defined(dbPostgres)
testBracket test = withSmpServer $ tmpBracket $ test . TestParams
testBracket test = withSmpServer $ tmpBracket $ \tmpPath -> test TestParams {tmpPath, printOutput = False}
#else
testBracket chatQueryStats agentQueryStats test =
withSmpServer $ tmpBracket $ \tmpPath -> test TestParams {tmpPath, chatQueryStats, agentQueryStats}
withSmpServer $ tmpBracket $ \tmpPath -> test TestParams {tmpPath, chatQueryStats, agentQueryStats, printOutput = False}
#endif
tmpBracket test = do
t <- getSystemTime