backfill: implement

This commit is contained in:
Tulir Asokan 2024-09-17 17:37:13 +03:00
parent edf6b17b8c
commit a47ed7fd18
13 changed files with 497 additions and 61 deletions

View file

@ -57,6 +57,7 @@ func migrateLegacyConfig(helper up.Helper) {
bridgeconfig.CopyToOtherLocation(helper, up.Bool, []string{"bridge", "whatsapp_thumbnail"}, []string{"network", "whatsapp_thumbnail"})
bridgeconfig.CopyToOtherLocation(helper, up.Bool, []string{"bridge", "url_previews"}, []string{"network", "url_previews"})
bridgeconfig.CopyToOtherLocation(helper, up.Bool, []string{"bridge", "force_active_delivery_receipts"}, []string{"network", "force_active_delivery_receipts"})
bridgeconfig.CopyToOtherLocation(helper, up.Int, []string{"bridge", "history_sync", "max_initial_conversations"}, []string{"network", "history_sync", "max_initial_conversations"})
bridgeconfig.CopyToOtherLocation(helper, up.Bool, []string{"bridge", "history_sync", "request_full_sync"}, []string{"network", "history_sync", "request_full_sync"})
bridgeconfig.CopyToOtherLocation(helper, up.Int, []string{"bridge", "history_sync", "full_sync_config", "days_limit"}, []string{"network", "history_sync", "full_sync_config", "days_limit"})
bridgeconfig.CopyToOtherLocation(helper, up.Int, []string{"bridge", "history_sync", "full_sync_config", "size_limit_mb"}, []string{"network", "history_sync", "full_sync_config", "size_limit_mb"})

2
go.mod
View file

@ -22,7 +22,7 @@ require (
golang.org/x/sync v0.8.0
google.golang.org/protobuf v1.34.2
gopkg.in/yaml.v3 v3.0.1
maunium.net/go/mautrix v0.21.0
maunium.net/go/mautrix v0.21.1-0.20240917135825-a95101ea7f01
)
require (

4
go.sum
View file

@ -114,5 +114,5 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
maunium.net/go/mauflag v1.0.0 h1:YiaRc0tEI3toYtJMRIfjP+jklH45uDHtT80nUamyD4M=
maunium.net/go/mauflag v1.0.0/go.mod h1:nLivPOpTpHnpzEh8jEdSL9UqO9+/KBJFmNRlwKfkPeA=
maunium.net/go/mautrix v0.21.0 h1:Z6nVu+clkJgj6ANwFYQQ1BtYeVXZPZ9lRgwuFN57gOY=
maunium.net/go/mautrix v0.21.0/go.mod h1:qm9oDhcHxF/Xby5RUuONIGpXw1SXXqLZj/GgvMxJxu0=
maunium.net/go/mautrix v0.21.1-0.20240917135825-a95101ea7f01 h1:KQE071yde/kCQi2cffa6MjiNtrNWSTW2YzWQmeRjx6A=
maunium.net/go/mautrix v0.21.1-0.20240917135825-a95101ea7f01/go.mod h1:qm9oDhcHxF/Xby5RUuONIGpXw1SXXqLZj/GgvMxJxu0=

344
pkg/connector/backfill.go Normal file
View file

@ -0,0 +1,344 @@
package connector
import (
"context"
"errors"
"fmt"
"strconv"
"strings"
"time"
"github.com/rs/zerolog"
"go.mau.fi/util/ptr"
"go.mau.fi/whatsmeow"
"go.mau.fi/whatsmeow/proto/waHistorySync"
"go.mau.fi/whatsmeow/proto/waWeb"
"go.mau.fi/whatsmeow/types"
"google.golang.org/protobuf/proto"
"maunium.net/go/mautrix/bridgev2"
"maunium.net/go/mautrix/bridgev2/networkid"
"maunium.net/go/mautrix/bridgev2/simplevent"
"maunium.net/go/mautrix-whatsapp/pkg/connector/wadb"
"maunium.net/go/mautrix-whatsapp/pkg/waid"
)
var _ bridgev2.BackfillingNetworkAPI = (*WhatsAppClient)(nil)
const historySyncDispatchWait = 30 * time.Second
func (wa *WhatsAppClient) historySyncLoop() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
oldStop := wa.stopHistorySyncLoop.Swap(&cancel)
if oldStop != nil {
(*oldStop)()
}
dispatchTimer := time.NewTimer(historySyncDispatchWait)
dispatchTimer.Stop()
wa.UserLogin.Log.Debug().Msg("Starting history sync loop")
for {
select {
case evt := <-wa.historySyncs:
dispatchTimer.Stop()
wa.handleWAHistorySync(ctx, evt)
dispatchTimer.Reset(historySyncDispatchWait)
case <-dispatchTimer.C:
wa.createPortalsFromHistorySync(ctx)
case <-ctx.Done():
wa.UserLogin.Log.Debug().Msg("Stopping history sync loop")
return
}
}
}
func (wa *WhatsAppClient) handleWAHistorySync(ctx context.Context, evt *waHistorySync.HistorySync) {
if evt == nil || evt.SyncType == nil {
return
}
log := wa.UserLogin.Log.With().
Str("action", "store history sync").
Stringer("sync_type", evt.GetSyncType()).
Uint32("chunk_order", evt.GetChunkOrder()).
Uint32("progress", evt.GetProgress()).
Logger()
ctx = log.WithContext(ctx)
if evt.GetGlobalSettings() != nil {
log.Debug().Interface("global_settings", evt.GetGlobalSettings()).Msg("Got global settings in history sync")
}
if evt.GetSyncType() == waHistorySync.HistorySync_INITIAL_STATUS_V3 || evt.GetSyncType() == waHistorySync.HistorySync_PUSH_NAME || evt.GetSyncType() == waHistorySync.HistorySync_NON_BLOCKING_DATA {
log.Debug().
Int("conversation_count", len(evt.GetConversations())).
Int("pushname_count", len(evt.GetPushnames())).
Int("status_count", len(evt.GetStatusV3Messages())).
Int("recent_sticker_count", len(evt.GetRecentStickers())).
Int("past_participant_count", len(evt.GetPastParticipants())).
Msg("Ignoring history sync")
return
}
log.Info().
Int("conversation_count", len(evt.GetConversations())).
Int("past_participant_count", len(evt.GetPastParticipants())).
Msg("Storing history sync")
successfullySavedTotal := 0
failedToSaveTotal := 0
totalMessageCount := 0
for _, conv := range evt.GetConversations() {
jid, err := types.ParseJID(conv.GetID())
if err != nil {
totalMessageCount += len(conv.GetMessages())
log.Warn().Err(err).
Str("chat_jid", conv.GetID()).
Int("msg_count", len(conv.GetMessages())).
Msg("Failed to parse chat JID in history sync")
continue
} else if jid.Server == types.BroadcastServer {
log.Debug().Str("chat_jid", jid.String()).Msg("Skipping broadcast list in history sync")
continue
} else if jid.Server == types.HiddenUserServer {
log.Debug().Str("chat_jid", jid.String()).Msg("Skipping hidden user JID chat in history sync")
continue
}
totalMessageCount += len(conv.GetMessages())
log := log.With().
Stringer("chat_jid", jid).
Int("msg_count", len(conv.GetMessages())).
Logger()
var minTime, maxTime time.Time
var minTimeIndex, maxTimeIndex int
ignoredTypes := 0
messages := make([]*wadb.HistorySyncMessageTuple, 0, len(conv.GetMessages()))
for i, rawMsg := range conv.GetMessages() {
// Don't store messages that will just be skipped.
msgEvt, err := wa.Client.ParseWebMessage(jid, rawMsg.GetMessage())
if err != nil {
log.Warn().Err(err).
Int("msg_index", i).
Str("msg_id", rawMsg.GetMessage().GetKey().GetID()).
Uint64("msg_time_seconds", rawMsg.GetMessage().GetMessageTimestamp()).
Msg("Dropping historical message due to parse error")
continue
}
if minTime.IsZero() || msgEvt.Info.Timestamp.Before(minTime) {
minTime = msgEvt.Info.Timestamp
minTimeIndex = i
}
if maxTime.IsZero() || msgEvt.Info.Timestamp.After(maxTime) {
maxTime = msgEvt.Info.Timestamp
maxTimeIndex = i
}
msgType := getMessageType(msgEvt.Message)
if msgType == "ignore" || strings.HasPrefix(msgType, "unknown_protocol_") {
ignoredTypes++
continue
}
marshaled, err := proto.Marshal(rawMsg)
if err != nil {
log.Warn().Err(err).
Int("msg_index", i).
Str("msg_id", msgEvt.Info.ID).
Msg("Failed to marshal message")
continue
}
messages = append(messages, &wadb.HistorySyncMessageTuple{Info: &msgEvt.Info, Message: marshaled})
}
log.Debug().
Int("wrapped_count", len(messages)).
Int("ignored_msg_type_count", ignoredTypes).
Time("lowest_time", minTime).
Int("lowest_time_index", minTimeIndex).
Time("highest_time", maxTime).
Int("highest_time_index", maxTimeIndex).
Dict("metadata", zerolog.Dict().
Uint32("ephemeral_expiration", conv.GetEphemeralExpiration()).
Int64("ephemeral_setting_timestamp", conv.GetEphemeralSettingTimestamp()).
Bool("marked_unread", conv.GetMarkedAsUnread()).
Bool("archived", conv.GetArchived()).
Uint32("pinned", conv.GetPinned()).
Uint64("mute_end", conv.GetMuteEndTime()).
Uint32("unread_count", conv.GetUnreadCount()),
).
Msg("Collected messages to save from history sync conversation")
if len(messages) > 0 {
err = wa.Main.DB.Conversation.Put(ctx, wadb.NewConversation(wa.UserLogin.ID, jid, conv))
if err != nil {
log.Err(err).Msg("Failed to save conversation metadata")
continue
}
err = wa.Main.DB.Message.Put(ctx, wa.UserLogin.ID, jid, messages)
if err != nil {
log.Err(err).Msg("Failed to save messages")
failedToSaveTotal += len(messages)
} else {
successfullySavedTotal += len(messages)
}
}
}
log.Info().
Int("total_saved_count", successfullySavedTotal).
Int("total_failed_count", failedToSaveTotal).
Int("total_message_count", totalMessageCount).
Msg("Finished storing history sync")
}
func (wa *WhatsAppClient) createPortalsFromHistorySync(ctx context.Context) {
log := wa.UserLogin.Log.With().
Str("action", "create portals from history sync").
Logger()
ctx = log.WithContext(ctx)
limit := wa.Main.Config.HistorySync.MaxInitialConversations
log.Info().Int("limit", limit).Msg("Creating portals from history sync")
conversations, err := wa.Main.DB.Conversation.GetRecent(ctx, wa.UserLogin.ID, limit)
if err != nil {
log.Err(err).Msg("Failed to get recent conversations from database")
return
}
for _, conv := range conversations {
wrappedInfo, err := wa.getChatInfo(ctx, conv.ChatJID, conv)
if errors.Is(err, whatsmeow.ErrNotInGroup) {
log.Debug().Stringer("chat_jid", conv.ChatJID).
Msg("Skipping creating room because the user is not a participant")
err = wa.Main.DB.Message.DeleteAllInChat(ctx, wa.UserLogin.ID, conv.ChatJID)
if err != nil {
log.Err(err).Msg("Failed to delete historical messages for portal")
}
continue
} else if err != nil {
log.Err(err).Stringer("chat_jid", conv.ChatJID).Msg("Failed to get chat info")
continue
}
wa.Main.Bridge.QueueRemoteEvent(wa.UserLogin, &simplevent.ChatResync{
EventMeta: simplevent.EventMeta{
Type: bridgev2.RemoteEventChatResync,
LogContext: nil,
PortalKey: wa.makeWAPortalKey(conv.ChatJID),
CreatePortal: true,
},
ChatInfo: wrappedInfo,
LatestMessageTS: conv.LastMessageTimestamp,
})
}
}
func (wa *WhatsAppClient) FetchMessages(ctx context.Context, params bridgev2.FetchMessagesParams) (*bridgev2.FetchMessagesResponse, error) {
portalJID, err := waid.ParsePortalID(params.Portal.ID)
if err != nil {
return nil, err
}
var markRead bool
var startTime, endTime *time.Time
if params.Forward {
if params.AnchorMessage != nil {
startTime = ptr.Ptr(params.AnchorMessage.Timestamp)
}
conv, err := wa.Main.DB.Conversation.Get(ctx, wa.UserLogin.ID, portalJID)
if err != nil {
return nil, fmt.Errorf("failed to get conversation from database: %w", err)
} else if conv != nil {
markRead = !ptr.Val(conv.MarkedAsUnread) && ptr.Val(conv.UnreadCount) == 0
}
} else if params.Cursor != "" {
endTimeUnix, err := strconv.ParseInt(string(params.Cursor), 10, 64)
if err != nil {
return nil, fmt.Errorf("failed to parse cursor: %w", err)
}
endTime = ptr.Ptr(time.Unix(endTimeUnix, 0))
} else if params.AnchorMessage != nil {
endTime = ptr.Ptr(params.AnchorMessage.Timestamp)
}
messages, err := wa.Main.DB.Message.GetBetween(ctx, wa.UserLogin.ID, portalJID, startTime, endTime, params.Count+1)
if err != nil {
return nil, fmt.Errorf("failed to load messages from database: %w", err)
} else if len(messages) == 0 {
return &bridgev2.FetchMessagesResponse{
HasMore: false,
Forward: params.Forward,
}, nil
}
hasMore := false
oldestTS := messages[len(messages)-1].GetMessageTimestamp()
newestTS := messages[0].GetMessageTimestamp()
if len(messages) > params.Count {
hasMore = true
// For safety, cut off messages with the oldest timestamp in the response.
// Otherwise, if there are multiple messages with the same timestamp, the next fetch may miss some.
for i := len(messages) - 2; i >= 0; i-- {
if messages[i].GetMessageTimestamp() > oldestTS {
messages = messages[:i+1]
break
}
}
}
convertedMessages := make([]*bridgev2.BackfillMessage, len(messages))
for i, msg := range messages {
evt, err := wa.Client.ParseWebMessage(portalJID, msg)
if err != nil {
// This should never happen because the info is already parsed once before being stored in the database
return nil, fmt.Errorf("failed to parse info of message %s: %w", msg.GetKey().GetID(), err)
}
convertedMessages[i] = wa.convertHistorySyncMessage(ctx, params.Portal, &evt.Info, msg)
}
return &bridgev2.FetchMessagesResponse{
Messages: convertedMessages,
Cursor: networkid.PaginationCursor(strconv.FormatUint(messages[0].GetMessageTimestamp(), 10)),
HasMore: hasMore,
Forward: endTime == nil,
MarkRead: markRead,
// TODO set remaining or total count
CompleteCallback: func() {
// TODO this only deletes after backfilling. If there's no need for backfill after a relogin,
// the messages will be stuck in the database
var err error
if !wa.Main.Bridge.Config.Backfill.Queue.Enabled {
// If the backfill queue isn't enabled, delete all messages after backfilling a batch.
err = wa.Main.DB.Message.DeleteAllInChat(ctx, wa.UserLogin.ID, portalJID)
} else {
// Otherwise just delete the messages that got backfilled
err = wa.Main.DB.Message.DeleteBetween(ctx, wa.UserLogin.ID, portalJID, newestTS, oldestTS)
}
if err != nil {
zerolog.Ctx(ctx).Warn().Err(err).Msg("Failed to delete messages from database after backfill")
}
},
}, nil
}
func (wa *WhatsAppClient) convertHistorySyncMessage(
ctx context.Context, portal *bridgev2.Portal, info *types.MessageInfo, msg *waWeb.WebMessageInfo,
) *bridgev2.BackfillMessage {
// TODO use proper intent
intent := wa.Main.Bridge.Bot
wrapped := &bridgev2.BackfillMessage{
ConvertedMessage: wa.Main.MsgConv.ToMatrix(ctx, portal, wa.Client, intent, msg.Message, info),
Sender: wa.makeEventSender(info.Sender),
ID: waid.MakeMessageID(info.Chat, info.Sender, info.ID),
TxnID: networkid.TransactionID(waid.MakeMessageID(info.Chat, info.Sender, info.ID)),
Timestamp: info.Timestamp,
Reactions: make([]*bridgev2.BackfillReaction, len(msg.Reactions)),
}
for i, reaction := range msg.Reactions {
var sender types.JID
if reaction.GetKey().GetFromMe() {
sender = wa.JID
} else if reaction.GetKey().GetParticipant() != "" {
sender, _ = types.ParseJID(*reaction.Key.Participant)
} else if info.Chat.Server == types.DefaultUserServer {
sender = info.Chat
}
if sender.IsEmpty() {
continue
}
wrapped.Reactions[i] = &bridgev2.BackfillReaction{
TargetPart: ptr.Ptr(networkid.PartID("")),
Timestamp: time.UnixMilli(reaction.GetSenderTimestampMS()),
Sender: wa.makeEventSender(sender),
Emoji: reaction.GetText(),
}
}
return wrapped
}

View file

@ -7,21 +7,25 @@ import (
"github.com/rs/zerolog"
"go.mau.fi/util/ptr"
"go.mau.fi/whatsmeow/proto/waHistorySync"
"go.mau.fi/whatsmeow/types"
"maunium.net/go/mautrix/bridgev2"
"maunium.net/go/mautrix/bridgev2/database"
"maunium.net/go/mautrix/bridgev2/networkid"
"maunium.net/go/mautrix/event"
"maunium.net/go/mautrix-whatsapp/pkg/connector/wadb"
"maunium.net/go/mautrix-whatsapp/pkg/waid"
)
func (wa *WhatsAppClient) GetChatInfo(ctx context.Context, portal *bridgev2.Portal) (wrapped *bridgev2.ChatInfo, err error) {
func (wa *WhatsAppClient) GetChatInfo(ctx context.Context, portal *bridgev2.Portal) (*bridgev2.ChatInfo, error) {
portalJID, err := waid.ParsePortalID(portal.ID)
if err != nil {
return nil, err
}
return wa.getChatInfo(ctx, portalJID, nil)
}
func (wa *WhatsAppClient) getChatInfo(ctx context.Context, portalJID types.JID, conv *wadb.Conversation) (wrapped *bridgev2.ChatInfo, err error) {
switch portalJID.Server {
case types.DefaultUserServer:
wrapped = wa.wrapDMInfo(portalJID)
@ -46,8 +50,15 @@ func (wa *WhatsAppClient) GetChatInfo(ctx context.Context, portal *bridgev2.Port
default:
return nil, fmt.Errorf("unsupported server %s", portalJID.Server)
}
var conv *waHistorySync.Conversation
applyHistoryInfo(wrapped, conv)
if conv == nil {
conv, err = wa.Main.DB.Conversation.Get(ctx, wa.UserLogin.ID, portalJID)
if err != nil {
zerolog.Ctx(ctx).Warn().Err(err).Msg("Failed to get history sync conversation info")
}
}
if conv != nil {
applyHistoryInfo(wrapped, conv)
}
wa.applyChatSettings(ctx, portalJID, wrapped)
return wrapped, nil
}
@ -79,25 +90,27 @@ func (wa *WhatsAppClient) applyChatSettings(ctx context.Context, chatID types.JI
}
}
func applyHistoryInfo(info *bridgev2.ChatInfo, conv *waHistorySync.Conversation) {
func applyHistoryInfo(info *bridgev2.ChatInfo, conv *wadb.Conversation) {
if conv == nil {
return
}
info.CanBackfill = true
info.UserLocal = &bridgev2.UserLocalPortalInfo{
MutedUntil: ptr.Ptr(time.Unix(int64(conv.GetMuteEndTime()), 0)),
MutedUntil: ptr.Ptr(conv.MuteEndTime),
}
if conv.GetPinned() > 0 {
if ptr.Val(conv.Pinned) {
info.UserLocal.Tag = ptr.Ptr(event.RoomTagFavourite)
} else if conv.GetArchived() {
} else if ptr.Val(conv.Archived) {
info.UserLocal.Tag = ptr.Ptr(event.RoomTagLowPriority)
}
if conv.GetEphemeralExpiration() > 0 {
if ptr.Val(conv.EphemeralExpiration) > 0 {
info.Disappear = &database.DisappearingSetting{
Type: database.DisappearingTypeAfterRead,
Timer: time.Duration(conv.GetEphemeralExpiration()) * time.Second,
Timer: time.Duration(*conv.EphemeralExpiration) * time.Second,
}
if conv.EphemeralSettingTimestamp != nil {
info.ExtraUpdates = bridgev2.MergeExtraUpdaters(info.ExtraUpdates, updateDisappearingTimerSetAt(*conv.EphemeralSettingTimestamp))
}
info.ExtraUpdates = bridgev2.MergeExtraUpdaters(info.ExtraUpdates, updateDisappearingTimerSetAt(conv.GetEphemeralSettingTimestamp()))
}
}

View file

@ -3,9 +3,11 @@ package connector
import (
"context"
"fmt"
"sync/atomic"
"github.com/rs/zerolog"
"go.mau.fi/whatsmeow"
"go.mau.fi/whatsmeow/proto/waHistorySync"
"go.mau.fi/whatsmeow/store"
"go.mau.fi/whatsmeow/types"
waLog "go.mau.fi/whatsmeow/util/log"
@ -20,6 +22,8 @@ func (wa *WhatsAppConnector) LoadUserLogin(_ context.Context, login *bridgev2.Us
w := &WhatsAppClient{
Main: wa,
UserLogin: login,
historySyncs: make(chan *waHistorySync.HistorySync, 64),
}
login.Client = w
@ -54,6 +58,9 @@ type WhatsAppClient struct {
Client *whatsmeow.Client
Device *store.Device
JID types.JID
historySyncs chan *waHistorySync.HistorySync
stopHistorySyncLoop atomic.Pointer[context.CancelFunc]
}
var _ bridgev2.NetworkAPI = (*WhatsAppClient)(nil)
@ -96,10 +103,14 @@ func (wa *WhatsAppClient) Connect(ctx context.Context) error {
if err := wa.Main.updateProxy(ctx, wa.Client, false); err != nil {
zerolog.Ctx(ctx).Err(err).Msg("Failed to update proxy")
}
go wa.historySyncLoop()
return wa.Client.Connect()
}
func (wa *WhatsAppClient) Disconnect() {
if stopHistorySyncLoop := wa.stopHistorySyncLoop.Swap(nil); stopHistorySyncLoop != nil {
(*stopHistorySyncLoop)()
}
if cli := wa.Client; cli != nil {
cli.Disconnect()
wa.Client = nil

View file

@ -42,8 +42,9 @@ type Config struct {
ForceActiveDeliveryReceipts bool `yaml:"force_active_delivery_receipts"`
HistorySync struct {
RequestFullSync bool `yaml:"request_full_sync"`
FullSyncConfig struct {
MaxInitialConversations int `yaml:"max_initial_conversations"`
RequestFullSync bool `yaml:"request_full_sync"`
FullSyncConfig struct {
DaysLimit uint32 `yaml:"days_limit"`
SizeLimit uint32 `yaml:"size_mb_limit"`
StorageQuota uint32 `yaml:"storage_quota_mb"`
@ -94,6 +95,7 @@ func upgradeConfig(helper up.Helper) {
helper.Copy(up.Bool, "whatsapp_thumbnail")
helper.Copy(up.Bool, "url_previews")
helper.Copy(up.Int, "history_sync", "max_initial_conversations")
helper.Copy(up.Bool, "history_sync", "request_full_sync")
helper.Copy(up.Int|up.Null, "history_sync", "full_sync_config", "days_limit")
helper.Copy(up.Int|up.Null, "history_sync", "full_sync_config", "size_mb_limit")

View file

@ -52,6 +52,10 @@ force_active_delivery_receipts: false
# Settings for handling history sync payloads.
history_sync:
# How many conversations should the bridge create after login?
# If -1, all conversations received from history sync will be bridged.
# Other conversations will be backfilled on demand when receiving a message.
max_initial_conversations: -1
# Should the bridge request a full sync from the phone when logging in?
# This bumps the size of history syncs from 3 months to 1 year.
request_full_sync: false

View file

@ -87,7 +87,9 @@ func (wa *WhatsAppClient) handleWAEvent(rawEvt any) {
// TODO
case *events.HistorySync:
// TODO
if wa.Main.Bridge.Config.Backfill.Enabled {
wa.historySyncs <- evt.Data
}
case *events.MediaRetry:
// TODO

View file

@ -5,6 +5,7 @@ import (
"time"
"go.mau.fi/util/dbutil"
"go.mau.fi/util/ptr"
"go.mau.fi/whatsmeow/proto/waHistorySync"
"go.mau.fi/whatsmeow/types"
"maunium.net/go/mautrix/bridgev2/networkid"
@ -20,14 +21,41 @@ type Conversation struct {
UserLoginID networkid.UserLoginID
ChatJID types.JID
LastMessageTimestamp time.Time
Archived bool
Pinned bool
Archived *bool
Pinned *bool
MuteEndTime time.Time
EndOfHistoryTransferType waHistorySync.Conversation_EndOfHistoryTransferType
EphemeralExpiration time.Duration
EphemeralSettingTimestamp int64
MarkedAsUnread bool
UnreadCount uint32
EndOfHistoryTransferType *waHistorySync.Conversation_EndOfHistoryTransferType
EphemeralExpiration *uint32
EphemeralSettingTimestamp *int64
MarkedAsUnread *bool
UnreadCount *uint32
}
func parseHistoryTime(ts *uint64) time.Time {
if ts == nil || *ts == 0 {
return time.Time{}
}
return time.Unix(int64(*ts), 0)
}
func NewConversation(loginID networkid.UserLoginID, chatJID types.JID, conv *waHistorySync.Conversation) *Conversation {
var pinned *bool
if conv.Pinned != nil {
pinned = ptr.Ptr(*conv.Pinned > 0)
}
return &Conversation{
UserLoginID: loginID,
ChatJID: chatJID,
LastMessageTimestamp: parseHistoryTime(conv.LastMsgTimestamp),
Archived: conv.Archived,
Pinned: pinned,
MuteEndTime: parseHistoryTime(conv.MuteEndTime),
EndOfHistoryTransferType: conv.EndOfHistoryTransferType,
EphemeralExpiration: conv.EphemeralExpiration,
EphemeralSettingTimestamp: conv.EphemeralSettingTimestamp,
MarkedAsUnread: conv.MarkedAsUnread,
UnreadCount: conv.UnreadCount,
}
}
const (
@ -40,12 +68,19 @@ const (
ON CONFLICT (bridge_id, user_login_id, chat_jid)
DO UPDATE SET
last_message_timestamp=CASE
WHEN excluded.last_message_timestamp > whatsapp_history_sync_conversation.last_message_timestamp THEN excluded.last_message_timestamp
WHEN whatsapp_history_sync_conversation.last_message_timestamp IS NULL
OR excluded.last_message_timestamp > whatsapp_history_sync_conversation.last_message_timestamp
THEN excluded.last_message_timestamp
ELSE whatsapp_history_sync_conversation.last_message_timestamp
END,
archived=COALESCE(excluded.archived, whatsapp_history_sync_conversation.archived),
pinned=COALESCE(excluded.pinned, whatsapp_history_sync_conversation.pinned),
mute_end_time=COALESCE(excluded.mute_end_time, whatsapp_history_sync_conversation.mute_end_time),
end_of_history_transfer_type=COALESCE(excluded.end_of_history_transfer_type, whatsapp_history_sync_conversation.end_of_history_transfer_type),
ephemeral_expiration=COALESCE(excluded.ephemeral_expiration, whatsapp_history_sync_conversation.ephemeral_expiration),
ephemeral_setting_timestamp=COALESCE(excluded.ephemeral_setting_timestamp, whatsapp_history_sync_conversation.ephemeral_setting_timestamp),
end_of_history_transfer_type=excluded.end_of_history_transfer_type
marked_as_unread=COALESCE(excluded.marked_as_unread, whatsapp_history_sync_conversation.marked_as_unread),
unread_count=COALESCE(excluded.unread_count, whatsapp_history_sync_conversation.unread_count)
`
getRecentConversations = `
SELECT
@ -97,16 +132,23 @@ func (cq *ConversationQuery) Delete(ctx context.Context, loginID networkid.UserL
}
func (c *Conversation) sqlVariables() []any {
var lastMessageTS, muteEndTime int64
if !c.LastMessageTimestamp.IsZero() {
lastMessageTS = c.LastMessageTimestamp.Unix()
}
if !c.MuteEndTime.IsZero() {
muteEndTime = c.MuteEndTime.Unix()
}
return []any{
c.BridgeID,
c.UserLoginID,
c.ChatJID,
c.LastMessageTimestamp.Unix(),
lastMessageTS,
c.Archived,
c.Pinned,
c.MuteEndTime.Unix(),
muteEndTime,
c.EndOfHistoryTransferType,
int64(c.EphemeralExpiration.Seconds()),
c.EphemeralExpiration,
c.EphemeralSettingTimestamp,
c.MarkedAsUnread,
c.UnreadCount,
@ -114,7 +156,7 @@ func (c *Conversation) sqlVariables() []any {
}
func (c *Conversation) Scan(row dbutil.Scannable) (*Conversation, error) {
var lastMessageTS, muteEndTime, ephemeralExpiration int64
var lastMessageTS, muteEndTime int64
err := row.Scan(
&c.BridgeID,
&c.UserLoginID,
@ -124,7 +166,7 @@ func (c *Conversation) Scan(row dbutil.Scannable) (*Conversation, error) {
&c.Pinned,
&muteEndTime,
&c.EndOfHistoryTransferType,
&ephemeralExpiration,
&c.EphemeralExpiration,
&c.EphemeralSettingTimestamp,
&c.MarkedAsUnread,
&c.UnreadCount,
@ -138,6 +180,5 @@ func (c *Conversation) Scan(row dbutil.Scannable) (*Conversation, error) {
if muteEndTime != 0 {
c.MuteEndTime = time.Unix(muteEndTime, 0)
}
c.EphemeralExpiration = time.Duration(ephemeralExpiration) * time.Second
return c, nil
}

View file

@ -6,6 +6,7 @@ import (
"time"
"go.mau.fi/util/dbutil"
"go.mau.fi/util/exslices"
"go.mau.fi/whatsmeow/proto/waHistorySync"
"go.mau.fi/whatsmeow/proto/waWeb"
"go.mau.fi/whatsmeow/types"
@ -21,7 +22,7 @@ type MessageQuery struct {
const (
insertHistorySyncMessageQuery = `
INSERT INTO whatsapp_history_sync_message (bridge_id, user_login_id, chat_jid, sender_jid, message_id, timestamp, data, inserted_time)
VALUES ($1, $2, $3, $4, $5, $6, $7)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (bridge_id, user_login_id, chat_jid, sender_jid, message_id) DO NOTHING
`
getHistorySyncMessagesBetweenQueryTemplate = `
@ -31,9 +32,9 @@ const (
ORDER BY timestamp DESC
%s
`
deleteHistorySyncMessagesBetweenExclusiveQuery = `
deleteHistorySyncMessagesBetweenQuery = `
DELETE FROM whatsapp_history_sync_message
WHERE bridge_id=$1 AND user_login_id=$2 AND chat_jid=$3 AND timestamp<$4 AND timestamp>$5
WHERE bridge_id=$1 AND user_login_id=$2 AND chat_jid=$3 AND timestamp<=$4 AND timestamp>=$5
`
deleteAllHistorySyncMessagesQuery = "DELETE FROM whatsapp_history_sync_message WHERE bridge_id=$1 AND user_login_id=$2"
deleteHistorySyncMessagesForPortalQuery = `
@ -48,15 +49,30 @@ const (
`
)
func (mq *MessageQuery) Put(ctx context.Context, loginID networkid.UserLoginID, parsedInfo *types.MessageInfo, message *waHistorySync.HistorySyncMsg) error {
msgData, err := proto.Marshal(message)
if err != nil {
return err
}
_, err = mq.Exec(ctx, insertHistorySyncMessageQuery,
mq.BridgeID, loginID, parsedInfo.Chat, parsedInfo.Sender.ToNonAD(), parsedInfo.ID,
parsedInfo.Timestamp, msgData, time.Now())
return err
type HistorySyncMessageTuple struct {
Info *types.MessageInfo
Message []byte
}
func (t *HistorySyncMessageTuple) GetMassInsertValues() [4]any {
return [4]any{t.Info.Sender.ToNonAD(), t.Info.ID, t.Info.Timestamp.Unix(), t.Message}
}
var batchInsertHistorySyncMessage = dbutil.NewMassInsertBuilder[*HistorySyncMessageTuple, [4]any](
insertHistorySyncMessageQuery, "($1, $2, $3, $%d, $%d, $%d, $%d, $4)",
)
func (mq *MessageQuery) Put(ctx context.Context, loginID networkid.UserLoginID, chatJID types.JID, messages []*HistorySyncMessageTuple) error {
return mq.DoTxn(ctx, nil, func(ctx context.Context) error {
for _, chunk := range exslices.Chunk(messages, 50) {
query, params := batchInsertHistorySyncMessage.Build([4]any{mq.BridgeID, loginID, chatJID, time.Now().Unix()}, chunk)
_, err := mq.Exec(ctx, query, params...)
if err != nil {
return err
}
}
return nil
})
}
func scanWebMessageInfo(rows dbutil.Scannable) (*waWeb.WebMessageInfo, error) {
@ -100,8 +116,8 @@ func (mq *MessageQuery) GetBetween(ctx context.Context, loginID networkid.UserLo
AsList()
}
func (mq *MessageQuery) DeleteBetween(ctx context.Context, loginID networkid.UserLoginID, chatJID types.JID, before, after time.Time) error {
_, err := mq.Exec(ctx, deleteHistorySyncMessagesBetweenExclusiveQuery, mq.BridgeID, loginID, chatJID, before.Unix(), after.Unix())
func (mq *MessageQuery) DeleteBetween(ctx context.Context, loginID networkid.UserLoginID, chatJID types.JID, before, after uint64) error {
_, err := mq.Exec(ctx, deleteHistorySyncMessagesBetweenQuery, mq.BridgeID, loginID, chatJID, before, after)
return err
}

View file

@ -1,9 +1,9 @@
-- v0 -> v2 (compatible with v2+): Latest revision
CREATE TABLE whatsapp_poll_option_id (
bridge_id TEXT NOT NULL,
msg_mxid TEXT NOT NULL,
opt_id TEXT NOT NULL,
bridge_id TEXT NOT NULL,
msg_mxid TEXT NOT NULL,
opt_id TEXT NOT NULL,
opt_hash bytea NOT NULL CHECK ( length(opt_hash) = 32 ),
PRIMARY KEY (bridge_id, msg_mxid, opt_id),
@ -13,19 +13,19 @@ CREATE TABLE whatsapp_poll_option_id (
);
CREATE TABLE whatsapp_history_sync_conversation (
bridge_id TEXT NOT NULL,
user_login_id TEXT NOT NULL,
chat_jid TEXT NOT NULL,
bridge_id TEXT NOT NULL,
user_login_id TEXT NOT NULL,
chat_jid TEXT NOT NULL,
last_message_timestamp BIGINT NOT NULL,
archived BOOLEAN NOT NULL,
pinned BOOLEAN NOT NULL,
mute_end_time BIGINT NOT NULL,
end_of_history_transfer_type INTEGER NOT NULL,
ephemeral_expiration INTEGER NOT NULL,
ephemeral_setting_timestamp BIGINT NOT NULL,
marked_as_unread BOOLEAN NOT NULL,
unread_count INTEGER NOT NULL,
last_message_timestamp BIGINT,
archived BOOLEAN,
pinned BOOLEAN,
mute_end_time BIGINT,
end_of_history_transfer_type INTEGER,
ephemeral_expiration INTEGER,
ephemeral_setting_timestamp BIGINT,
marked_as_unread BOOLEAN,
unread_count INTEGER,
PRIMARY KEY (bridge_id, user_login_id, chat_jid),
CONSTRAINT whatsapp_history_sync_conversation_user_login_fkey FOREIGN KEY (bridge_id, user_login_id)

View file

@ -2,3 +2,5 @@
-- transaction: sqlite-fkey-off
ALTER TABLE whatsapp_history_sync_message ADD COLUMN sender_jid TEXT NOT NULL DEFAULT '';
ALTER TABLE whatsapp_history_sync_message ALTER COLUMN sender_jid DROP DEFAULT;
ALTER TABLE whatsapp_history_sync_message DROP CONSTRAINT whatsapp_history_sync_message_pkey;
ALTER TABLE whatsapp_history_sync_message ADD PRIMARY KEY (bridge_id, user_login_id, chat_jid, sender_jid, message_id);