connector: implement chat list sync and backfill

Fixes #1
This commit is contained in:
Tulir Asokan 2025-01-19 13:26:23 +02:00
parent dd3aab051f
commit dd0f10ab70
10 changed files with 440 additions and 19 deletions

192
pkg/connector/backfill.go Normal file
View file

@ -0,0 +1,192 @@
// mautrix-signal - A Matrix-Signal puppeting bridge.
// Copyright (C) 2025 Tulir Asokan
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
package connector
import (
"context"
"fmt"
"slices"
"time"
"github.com/google/uuid"
"github.com/rs/zerolog"
"go.mau.fi/util/ptr"
"maunium.net/go/mautrix/bridgev2"
"maunium.net/go/mautrix/bridgev2/networkid"
"go.mau.fi/mautrix-signal/pkg/msgconv"
"go.mau.fi/mautrix-signal/pkg/signalid"
"go.mau.fi/mautrix-signal/pkg/signalmeow/protobuf/backuppb"
"go.mau.fi/mautrix-signal/pkg/signalmeow/store"
)
var _ bridgev2.BackfillingNetworkAPI = (*SignalClient)(nil)
func tryCastUUID(b []byte) uuid.UUID {
if len(b) == 16 {
return uuid.UUID(b)
}
return uuid.Nil
}
func (s *SignalClient) FetchMessages(ctx context.Context, params bridgev2.FetchMessagesParams) (*bridgev2.FetchMessagesResponse, error) {
if !s.IsLoggedIn() {
return nil, bridgev2.ErrNotLoggedIn
}
userID, groupID, err := signalid.ParsePortalID(params.Portal.ID)
if err != nil {
return nil, fmt.Errorf("failed to parse portal ID: %w", err)
}
var chat *store.BackupChat
if groupID != "" {
chat, err = s.Client.Store.BackupStore.GetBackupChatByGroupID(ctx, groupID)
} else {
chat, err = s.Client.Store.BackupStore.GetBackupChatByUserID(ctx, userID)
}
if err != nil {
return nil, fmt.Errorf("failed to get chat: %w", err)
} else if chat == nil {
zerolog.Ctx(ctx).Debug().Msg("Chat not found, returning nil response for backfill")
return nil, nil
}
var anchorTS time.Time
if params.AnchorMessage != nil {
anchorTS = params.AnchorMessage.Timestamp
}
minTS := anchorTS
items, err := s.Client.Store.BackupStore.GetBackupChatItems(ctx, chat.Id, anchorTS, params.Forward, params.Count)
if err != nil {
return nil, fmt.Errorf("failed to get chat items: %w", err)
}
if len(items) > 0 {
minTS = time.UnixMilli(int64(items[0].DateSent))
}
// GetBackupChatItems returns in reverse chronological order, so flip the list
slices.Reverse(items)
var firstDirectionfulProcessed bool
var isRead bool
convertedMessages := make([]*bridgev2.BackfillMessage, 0, len(items))
attMap := make(msgconv.AttachmentMap)
recipientMap := make(map[uint64]*backuppb.Recipient)
getRecipientACI := func(id uint64) (uuid.UUID, error) {
recipient, ok := recipientMap[id]
if !ok {
recipient, err = s.Client.Store.BackupStore.GetBackupRecipient(ctx, id)
if err != nil {
return uuid.Nil, fmt.Errorf("failed to get recipient %d: %w", id, err)
} else if len(recipient.GetContact().GetAci()) != 16 && recipient.GetSelf() == nil {
zerolog.Ctx(ctx).Warn().
Uint64("recipient_id", id).
Type("recipient_type", recipient.GetDestination()).
Msg("ACI not found for recipient")
}
recipientMap[id] = recipient
}
switch dest := recipient.Destination.(type) {
case *backuppb.Recipient_Self:
return s.Client.Store.ACI, nil
case *backuppb.Recipient_Contact:
if len(dest.Contact.GetAci()) == 16 {
return uuid.UUID(dest.Contact.GetAci()), nil
}
}
return uuid.Nil, nil
}
for _, item := range items {
var streamOrder int64
switch dt := item.DirectionalDetails.(type) {
case *backuppb.ChatItem_Incoming:
streamOrder = int64(dt.Incoming.GetDateServerSent())
if !firstDirectionfulProcessed {
firstDirectionfulProcessed = true
isRead = dt.Incoming.Read
}
case *backuppb.ChatItem_Outgoing:
// TODO stream order?
if !firstDirectionfulProcessed {
firstDirectionfulProcessed = true
isRead = true
}
}
if len(attMap) > 0 {
clear(attMap)
}
senderACI, err := getRecipientACI(item.AuthorId)
if err != nil {
return nil, err
} else if senderACI == uuid.Nil {
continue
}
dm, reactions := msgconv.BackupToDataMessage(item, attMap)
if dm == nil {
continue
}
cm := s.Main.MsgConv.ToMatrix(ctx, s.Client, params.Portal, s.Main.Bridge.Bot, dm, attMap)
convertedReactions := make([]*bridgev2.BackfillReaction, 0, len(reactions))
for _, reaction := range reactions {
reactionSenderACI, err := getRecipientACI(reaction.AuthorId)
if err != nil {
return nil, err
} else if reactionSenderACI == uuid.Nil {
continue
}
convertedReactions = append(convertedReactions, &bridgev2.BackfillReaction{
TargetPart: ptr.Ptr(networkid.PartID("")),
Timestamp: time.UnixMilli(int64(reaction.SentTimestamp)),
Sender: s.makeEventSender(reactionSenderACI),
Emoji: reaction.GetEmoji(),
})
}
msgID := signalid.MakeMessageID(senderACI, item.DateSent)
convertedMessages = append(convertedMessages, &bridgev2.BackfillMessage{
ConvertedMessage: cm,
Sender: s.makeEventSender(senderACI),
ID: msgID,
TxnID: networkid.TransactionID(msgID),
Timestamp: time.UnixMilli(int64(item.DateSent)),
StreamOrder: streamOrder,
Reactions: convertedReactions,
})
}
return &bridgev2.FetchMessagesResponse{
Messages: convertedMessages,
HasMore: len(items) >= params.Count,
Forward: params.Forward,
MarkRead: isRead,
ApproxTotalCount: chat.TotalMessages,
CompleteCallback: func() {
// When reaching the last backwards backfill batch, delete the chat from the backup store.
// If backwards backfilling isn't enabled, delete immediately after the first backfill request.
if (!params.Forward && len(items) < params.Count) || !s.Main.Bridge.Config.Backfill.Queue.Enabled {
err := s.Client.Store.BackupStore.DeleteBackupChat(ctx, chat.Id)
if err != nil {
zerolog.Ctx(ctx).Err(err).Msg("Failed to delete chat from backup store")
} else {
zerolog.Ctx(ctx).Debug().Msg("Deleted chat from backup store as backfill seems finished")
}
} else {
err := s.Client.Store.BackupStore.DeleteBackupChatItems(ctx, chat.Id, minTS)
if err != nil {
zerolog.Ctx(ctx).Err(err).Time("min_ts", minTS).Msg("Failed to delete messages from backup store")
} else {
zerolog.Ctx(ctx).Debug().Time("min_ts", minTS).Msg("Deleted messages from backup store")
}
}
},
}, nil
}

View file

@ -34,6 +34,7 @@ import (
"go.mau.fi/mautrix-signal/pkg/libsignalgo"
"go.mau.fi/mautrix-signal/pkg/signalid"
"go.mau.fi/mautrix-signal/pkg/signalmeow/store"
"go.mau.fi/mautrix-signal/pkg/signalmeow/types"
)
@ -62,14 +63,14 @@ func (s *SignalClient) GetChatInfo(ctx context.Context, portal *bridgev2.Portal)
return nil, err
}
if groupID != "" {
return s.getGroupInfo(ctx, groupID, 0)
return s.getGroupInfo(ctx, groupID, 0, nil)
} else {
aci, pni := userID.ToACIAndPNI()
contact, err := s.Client.Store.RecipientStore.LoadAndUpdateRecipient(ctx, aci, pni, nil)
if err != nil {
return nil, err
}
return s.makeCreateDMResponse(contact).PortalInfo, nil
return s.makeCreateDMResponse(ctx, contact, nil).PortalInfo, nil
}
}
@ -182,13 +183,13 @@ func (s *SignalClient) ResolveIdentifier(ctx context.Context, number string, cre
UserID: signalid.MakeUserID(aci),
UserInfo: s.contactToUserInfo(recipient),
Ghost: ghost,
Chat: s.makeCreateDMResponse(recipient),
Chat: s.makeCreateDMResponse(ctx, recipient, nil),
}, nil
} else {
return &bridgev2.ResolveIdentifierResponse{
UserID: signalid.MakeUserIDFromServiceID(libsignalgo.NewPNIServiceID(pni)),
UserInfo: s.contactToUserInfo(recipient),
Chat: s.makeCreateDMResponse(recipient),
Chat: s.makeCreateDMResponse(ctx, recipient, nil),
}, nil
}
}
@ -207,7 +208,7 @@ func (s *SignalClient) GetContactList(ctx context.Context) ([]*bridgev2.ResolveI
for i, recipient := range recipients {
recipientResp := &bridgev2.ResolveIdentifierResponse{
UserInfo: s.contactToUserInfo(recipient),
Chat: s.makeCreateDMResponse(recipient),
Chat: s.makeCreateDMResponse(ctx, recipient, nil),
}
if recipient.ACI != uuid.Nil {
recipientResp.UserID = signalid.MakeUserID(recipient.ACI)
@ -224,7 +225,7 @@ func (s *SignalClient) GetContactList(ctx context.Context) ([]*bridgev2.ResolveI
return resp, nil
}
func (s *SignalClient) makeCreateDMResponse(recipient *types.Recipient) *bridgev2.CreateChatResponse {
func (s *SignalClient) makeCreateDMResponse(ctx context.Context, recipient *types.Recipient, backupChat *store.BackupChat) *bridgev2.CreateChatResponse {
name := ""
topic := PrivateChatTopic
selfUser := s.makeEventSender(s.Client.Store.ACI)
@ -247,6 +248,13 @@ func (s *SignalClient) makeCreateDMResponse(recipient *types.Recipient) *bridgev
name = s.Main.Config.FormatDisplayname(recipient)
serviceID = libsignalgo.NewPNIServiceID(recipient.PNI)
} else {
if backupChat == nil {
var err error
backupChat, err = s.Client.Store.BackupStore.GetBackupChatByUserID(ctx, libsignalgo.NewACIServiceID(recipient.ACI))
if err != nil {
zerolog.Ctx(ctx).Warn().Err(err).Msg("Failed to get backup chat for recipient")
}
}
members.OtherUserID = signalid.MakeUserID(recipient.ACI)
if recipient.ACI == s.Client.Store.ACI {
name = NoteToSelfName
@ -275,6 +283,8 @@ func (s *SignalClient) makeCreateDMResponse(recipient *types.Recipient) *bridgev
Topic: &topic,
Members: members,
Type: ptr.Ptr(database.RoomTypeDM),
CanBackfill: backupChat != nil,
},
}
}

157
pkg/connector/chatsync.go Normal file
View file

@ -0,0 +1,157 @@
// mautrix-signal - A Matrix-Signal puppeting bridge.
// Copyright (C) 2025 Tulir Asokan
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
package connector
import (
"context"
"encoding/base64"
"time"
"github.com/google/uuid"
"github.com/rs/zerolog"
"maunium.net/go/mautrix/bridgev2"
"maunium.net/go/mautrix/bridgev2/simplevent"
"go.mau.fi/mautrix-signal/pkg/libsignalgo"
"go.mau.fi/mautrix-signal/pkg/signalid"
"go.mau.fi/mautrix-signal/pkg/signalmeow/protobuf/backuppb"
"go.mau.fi/mautrix-signal/pkg/signalmeow/types"
)
func (s *SignalClient) syncChats(ctx context.Context) {
if s.UserLogin.Metadata.(*signalid.UserLoginMetadata).ChatsSynced {
return
}
if s.Client.Store.EphemeralBackupKey != nil {
zerolog.Ctx(ctx).Info().Msg("Fetching transfer archive before syncing chats")
meta, err := s.Client.WaitForTransfer(ctx)
if err != nil {
zerolog.Ctx(ctx).Err(err).Msg("Failed to request transfer archive")
return
} else if meta.Error != "" {
zerolog.Ctx(ctx).Error().Str("error_type", meta.Error).Msg("Transfer archive request was rejected")
s.UserLogin.Metadata.(*signalid.UserLoginMetadata).ChatsSynced = true
err = s.UserLogin.Save(ctx)
if err != nil {
zerolog.Ctx(ctx).Err(err).Msg("Failed to save user login metadata after transfer archive request was rejected")
}
return
}
err = s.Client.FetchAndProcessTransfer(ctx, meta)
if err != nil {
zerolog.Ctx(ctx).Err(err).Msg("Failed to fetch and process transfer archive")
return
}
zerolog.Ctx(ctx).Info().Msg("Transfer archive fetched and processed, syncing chats")
}
chats, err := s.Client.Store.BackupStore.GetBackupChats(ctx)
if err != nil {
zerolog.Ctx(ctx).Err(err).Msg("Failed to get chats from backup store")
return
}
zerolog.Ctx(ctx).Info().Int("chat_count", len(chats)).Msg("Fetched chats to sync from database")
for _, chat := range chats {
recipient, err := s.Client.Store.BackupStore.GetBackupRecipient(ctx, chat.RecipientId)
if err != nil {
zerolog.Ctx(ctx).Err(err).Msg("Failed to get recipient for chat")
continue
}
resyncEvt := &simplevent.ChatResync{
EventMeta: simplevent.EventMeta{
Type: bridgev2.RemoteEventChatResync,
LogContext: func(c zerolog.Context) zerolog.Context {
return c.
Int("message_count", chat.TotalMessages).
Uint64("backup_chat_id", chat.Id).
Uint64("backup_recipient_id", chat.RecipientId)
},
CreatePortal: true,
},
LatestMessageTS: time.UnixMilli(int64(chat.LatestMessageID)),
}
switch dest := recipient.Destination.(type) {
case *backuppb.Recipient_Contact:
aci := tryCastUUID(dest.Contact.GetAci())
pni := tryCastUUID(dest.Contact.GetPni())
if chat.TotalMessages == 0 {
zerolog.Ctx(ctx).Debug().
Stringer("aci", aci).
Stringer("pni", pni).
Uint64("e164", dest.Contact.GetE164()).
Msg("Skipping direct chat with no messages and deleting data")
err = s.Client.Store.BackupStore.DeleteBackupChat(ctx, chat.Id)
if err != nil {
zerolog.Ctx(ctx).Err(err).Msg("Failed to delete chat from backup store")
}
continue
}
processedRecipient, err := s.Client.Store.RecipientStore.LoadAndUpdateRecipient(ctx, aci, pni, nil)
if err != nil {
zerolog.Ctx(ctx).Err(err).Msg("Failed to get full recipient data")
continue
}
dmInfo := s.makeCreateDMResponse(ctx, processedRecipient, chat)
resyncEvt.PortalKey = dmInfo.PortalKey
resyncEvt.ChatInfo = dmInfo.PortalInfo
case *backuppb.Recipient_Self:
processedRecipient, err := s.Client.Store.RecipientStore.LoadAndUpdateRecipient(ctx, s.Client.Store.ACI, uuid.Nil, nil)
if err != nil {
zerolog.Ctx(ctx).Err(err).Msg("Failed to get full recipient data")
continue
}
dmInfo := s.makeCreateDMResponse(ctx, processedRecipient, chat)
resyncEvt.PortalKey = dmInfo.PortalKey
resyncEvt.ChatInfo = dmInfo.PortalInfo
case *backuppb.Recipient_Group:
if len(dest.Group.MasterKey) != libsignalgo.GroupMasterKeyLength {
continue
}
rawGroupID, err := libsignalgo.GroupMasterKey(dest.Group.MasterKey).GroupIdentifier()
if err != nil {
zerolog.Ctx(ctx).Err(err).
Uint64("recipient_id", recipient.Id).
Msg("Failed to get group identifier from master key")
continue
}
groupID := types.GroupIdentifier(base64.StdEncoding.EncodeToString(rawGroupID[:]))
groupInfo, err := s.getGroupInfo(ctx, groupID, dest.Group.GetSnapshot().GetVersion(), chat)
if err != nil {
zerolog.Ctx(ctx).Err(err).Msg("Failed to get full group info")
continue
}
resyncEvt.PortalKey = s.makePortalKey(string(groupID))
resyncEvt.ChatInfo = groupInfo
default:
zerolog.Ctx(ctx).Debug().
Type("destination_type", dest).
Uint64("backup_chat_id", chat.Id).
Uint64("backup_recipient_id", chat.RecipientId).
Msg("Ignoring and deleting chat with unsupported destination type")
err = s.Client.Store.BackupStore.DeleteBackupChat(ctx, chat.Id)
if err != nil {
zerolog.Ctx(ctx).Err(err).Msg("Failed to delete chat from backup store")
}
continue
}
s.UserLogin.QueueRemoteEvent(resyncEvt)
}
s.UserLogin.Metadata.(*signalid.UserLoginMetadata).ChatsSynced = true
err = s.UserLogin.Save(ctx)
if err != nil {
zerolog.Ctx(ctx).Err(err).Msg("Failed to save user login metadata after syncing chats")
}
}

View file

@ -206,6 +206,16 @@ func (s *SignalClient) bridgeStateLoop(statusChan <-chan signalmeow.SignalConnec
}
}
func (s *SignalClient) postLoginConnect(ctx context.Context) {
// TODO it would be more proper to only connect after syncing,
// but currently syncing will fetch group info online, so it has to be connected.
s.Connect(ctx)
s.syncChats(ctx)
if s.Client.Store.MasterKey != nil {
s.Client.SyncStorage(ctx)
}
}
func (s *SignalClient) Connect(ctx context.Context) {
if s.Client == nil {
s.UserLogin.BridgeState.Send(status.BridgeState{StateEvent: status.StateBadCredentials, Message: "You're not logged into Signal"})
@ -283,6 +293,7 @@ func (s *SignalClient) tryConnect(ctx context.Context, retryCount int) {
}
} else {
go s.bridgeStateLoop(ch)
go s.syncChats(ctx)
}
}

View file

@ -33,7 +33,9 @@ func (s *SignalConnector) GetDBMetaTypes() database.MetaTypes {
Message: func() any {
return &signalid.MessageMetadata{}
},
Reaction: nil,
UserLogin: nil,
Reaction: nil,
UserLogin: func() any {
return &signalid.UserLoginMetadata{}
},
}
}

View file

@ -31,6 +31,7 @@ import (
"go.mau.fi/mautrix-signal/pkg/libsignalgo"
"go.mau.fi/mautrix-signal/pkg/signalid"
"go.mau.fi/mautrix-signal/pkg/signalmeow"
"go.mau.fi/mautrix-signal/pkg/signalmeow/store"
"go.mau.fi/mautrix-signal/pkg/signalmeow/types"
)
@ -93,7 +94,7 @@ func inviteLinkToJoinRule(inviteLinkAccess signalmeow.AccessControl) event.JoinR
}
}
func (s *SignalClient) getGroupInfo(ctx context.Context, groupID types.GroupIdentifier, minRevision uint32) (*bridgev2.ChatInfo, error) {
func (s *SignalClient) getGroupInfo(ctx context.Context, groupID types.GroupIdentifier, minRevision uint32, backupChat *store.BackupChat) (*bridgev2.ChatInfo, error) {
groupInfo, err := s.Client.RetrieveGroupByID(ctx, groupID, minRevision)
if err != nil {
return nil, err
@ -152,6 +153,13 @@ func (s *SignalClient) getGroupInfo(ctx context.Context, groupID types.GroupIden
Membership: event.MembershipBan,
}
}
if backupChat == nil {
// TODO allow using backup chat for data too instead of asking server?
backupChat, err = s.Client.Store.BackupStore.GetBackupChatByGroupID(ctx, groupID)
if err != nil {
zerolog.Ctx(ctx).Warn().Err(err).Msg("Failed to get backup chat for group")
}
}
return &bridgev2.ChatInfo{
Name: &groupInfo.Title,
Topic: &groupInfo.Description,
@ -164,6 +172,7 @@ func (s *SignalClient) getGroupInfo(ctx context.Context, groupID types.GroupIden
Type: ptr.Ptr(database.RoomTypeDefault),
JoinRule: &event.JoinRulesEventContent{JoinRule: joinRule},
ExtraUpdates: makeRevisionUpdater(groupInfo.Revision),
CanBackfill: backupChat != nil,
}, nil
}
@ -366,7 +375,7 @@ func (s *SignalClient) catchUpGroup(ctx context.Context, portal *bridgev2.Portal
Logger()
if fromRevision == 0 {
log.Info().Msg("Syncing full group info")
info, err := s.getGroupInfo(ctx, types.GroupIdentifier(portal.ID), toRevision)
info, err := s.getGroupInfo(ctx, types.GroupIdentifier(portal.ID), toRevision, nil)
if err != nil {
log.Err(err).Msg("Failed to get group info")
} else {

View file

@ -79,7 +79,9 @@ func (qr *QRLogin) Start(ctx context.Context) (*bridgev2.LoginStep, error) {
provCtx, cancel := context.WithCancel(log.WithContext(context.Background()))
qr.cancelChan = cancel
// Don't use the start context here: the channel will outlive the start request.
qr.ProvChan = signalmeow.PerformProvisioning(provCtx, qr.Main.Store, qr.Main.Config.DeviceName, false)
qr.ProvChan = signalmeow.PerformProvisioning(
provCtx, qr.Main.Store, qr.Main.Config.DeviceName, qr.Main.Bridge.Config.Backfill.Enabled,
)
var resp signalmeow.ProvisioningResponse
select {
case resp = <-qr.ProvChan:
@ -165,6 +167,7 @@ func (qr *QRLogin) processingWait(ctx context.Context) (*bridgev2.LoginStep, err
RemoteProfile: status.RemoteProfile{
Phone: qr.ProvData.Number,
},
Metadata: &signalid.UserLoginMetadata{},
}, &bridgev2.NewLoginParams{
DeleteOnConflict: true,
})
@ -172,10 +175,16 @@ func (qr *QRLogin) processingWait(ctx context.Context) (*bridgev2.LoginStep, err
return nil, fmt.Errorf("failed to create user login: %w", err)
}
backgroundCtx := ul.Log.WithContext(context.Background())
ul.Client.Connect(backgroundCtx)
if signalClient := ul.Client.(*SignalClient).Client; signalClient.Store.MasterKey != nil {
zerolog.Ctx(ctx).Info().Msg("Received master key in login, syncing storage immediately")
go signalClient.SyncStorage(backgroundCtx)
signalClient := ul.Client.(*SignalClient).Client
if signalClient.Store.EphemeralBackupKey != nil {
zerolog.Ctx(ctx).Info().Msg("Received ephemeral backup key in login, syncing chats before connecting")
go ul.Client.(*SignalClient).postLoginConnect(backgroundCtx)
} else {
ul.Client.Connect(backgroundCtx)
if signalClient.Store.MasterKey != nil {
zerolog.Ctx(ctx).Info().Msg("Received master key in login, syncing storage immediately")
go signalClient.SyncStorage(backgroundCtx)
}
}
return &bridgev2.LoginStep{
Type: bridgev2.LoginStepTypeComplete,

View file

@ -20,6 +20,7 @@ import (
"bytes"
"context"
"encoding/base64"
"errors"
"fmt"
"net/http"
"strings"
@ -41,6 +42,11 @@ import (
signalpb "go.mau.fi/mautrix-signal/pkg/signalmeow/protobuf"
)
var (
ErrAttachmentNotInBackup = errors.New("attachment not found in backup")
ErrBackupNotSupported = errors.New("downloading attachments from server-side backup is not yet supported")
)
func calculateLength(dm *signalpb.DataMessage) int {
if dm.GetFlags()&uint32(signalpb.DataMessage_EXPIRATION_TIMER_UPDATE) != 0 {
return 1
@ -384,6 +390,23 @@ func (mc *MessageConverter) convertContactToMatrix(ctx context.Context, contact
func (mc *MessageConverter) convertAttachmentToMatrix(ctx context.Context, index int, att *signalpb.AttachmentPointer, attMap AttachmentMap) *bridgev2.ConvertedMessagePart {
part, err := mc.reuploadAttachment(ctx, att, attMap)
if err != nil {
if (errors.Is(err, signalmeow.ErrAttachmentNotFound) || errors.Is(err, ErrAttachmentNotInBackup)) && attMap != nil {
return &bridgev2.ConvertedMessagePart{
Type: event.EventMessage,
Content: &event.MessageEventContent{
MsgType: event.MsgNotice,
Body: fmt.Sprintf("Attachment no longer available %s", att.GetFileName()),
},
}
} else if errors.Is(err, ErrBackupNotSupported) {
return &bridgev2.ConvertedMessagePart{
Type: event.EventMessage,
Content: &event.MessageEventContent{
MsgType: event.MsgNotice,
Body: "Downloading attachments from backup is not yet supported",
},
}
}
zerolog.Ctx(ctx).Err(err).Int("attachment_index", index).Msg("Failed to handle attachment")
return &bridgev2.ConvertedMessagePart{
Type: event.EventMessage,
@ -434,7 +457,7 @@ func (mc *MessageConverter) convertStickerToMatrix(ctx context.Context, sticker
func (mc *MessageConverter) downloadSignalLongText(ctx context.Context, att *signalpb.AttachmentPointer, attMap AttachmentMap) (*string, error) {
data, err := mc.downloadAttachment(ctx, att, attMap)
if err != nil {
return nil, fmt.Errorf("failed to download attachment: %w", err)
return nil, err
}
longBody := string(data)
return &longBody, nil
@ -449,10 +472,10 @@ func (mc *MessageConverter) downloadAttachment(ctx context.Context, att *signalp
if !ok {
return nil, fmt.Errorf("no attachment identifier and attachment not found in map")
} else if target == nil {
return nil, fmt.Errorf("attachment not available in backup")
return nil, ErrAttachmentNotInBackup
} else {
// TODO add support for downloading attachments from backup
return nil, fmt.Errorf("downloading attachments from backup is not yet supported")
return nil, ErrBackupNotSupported
}
}
return signalmeow.DownloadAttachment(ctx, att)
@ -461,7 +484,7 @@ func (mc *MessageConverter) downloadAttachment(ctx context.Context, att *signalp
func (mc *MessageConverter) reuploadAttachment(ctx context.Context, att *signalpb.AttachmentPointer, attMap AttachmentMap) (*bridgev2.ConvertedMessagePart, error) {
data, err := mc.downloadAttachment(ctx, att, attMap)
if err != nil {
return nil, fmt.Errorf("failed to download attachment: %w", err)
return nil, err
}
mimeType := att.GetContentType()
if mimeType == "" {

View file

@ -29,6 +29,10 @@ type MessageMetadata struct {
ContainsAttachments bool `json:"contains_attachments,omitempty"`
}
type UserLoginMetadata struct {
ChatsSynced bool `json:"chats_synced,omitempty"`
}
type GhostMetadata struct {
ProfileFetchedAt jsontime.UnixMilli `json:"profile_fetched_at"`
}

View file

@ -57,6 +57,7 @@ func getAttachmentPath(id uint64, key string) string {
// ErrInvalidMACForAttachment signals that the downloaded attachment has an invalid MAC.
var ErrInvalidMACForAttachment = errors.New("invalid MAC for attachment")
var ErrInvalidDigestForAttachment = errors.New("invalid digest for attachment")
var ErrAttachmentNotFound = errors.New("attachment not found on server")
func DownloadAttachment(ctx context.Context, a *signalpb.AttachmentPointer) ([]byte, error) {
path := getAttachmentPath(a.GetCdnId(), a.GetCdnKey())
@ -77,6 +78,9 @@ func DownloadAttachment(ctx context.Context, a *signalpb.AttachmentPointer) ([]b
} else if len(body) < 1024 {
zerolog.Ctx(ctx).Debug().Bytes("response_data", body).Msg("Failed download response data")
}
if resp.StatusCode == http.StatusNotFound {
return nil, ErrAttachmentNotFound
}
return nil, fmt.Errorf("unexpected status code %d", resp.StatusCode)
}