mirror of
https://github.com/mautrix/whatsapp.git
synced 2025-03-14 14:15:38 +00:00
mediarequest: implement requesting old media
This commit is contained in:
parent
efd898e557
commit
acb56e4ac4
9 changed files with 414 additions and 32 deletions
6
go.mod
6
go.mod
|
@ -11,13 +11,14 @@ require (
|
|||
github.com/rs/zerolog v1.33.0
|
||||
go.mau.fi/util v0.8.1-0.20240927174413-000d30f9a02a
|
||||
go.mau.fi/webp v0.1.0
|
||||
go.mau.fi/whatsmeow v0.0.0-20240930130510-b12cf9c9deda
|
||||
go.mau.fi/whatsmeow v0.0.0-20241001110941-382edde94d9f
|
||||
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0
|
||||
golang.org/x/image v0.20.0
|
||||
golang.org/x/net v0.29.0
|
||||
golang.org/x/sync v0.8.0
|
||||
google.golang.org/protobuf v1.34.2
|
||||
gopkg.in/yaml.v3 v3.0.1
|
||||
maunium.net/go/mautrix v0.21.1-0.20240930142122-741b4e823ffb
|
||||
maunium.net/go/mautrix v0.21.1-0.20241001105237-37af19a01a61
|
||||
)
|
||||
|
||||
require (
|
||||
|
@ -40,7 +41,6 @@ require (
|
|||
go.mau.fi/libsignal v0.1.1 // indirect
|
||||
go.mau.fi/zeroconfig v0.1.3 // indirect
|
||||
golang.org/x/crypto v0.27.0 // indirect
|
||||
golang.org/x/sync v0.8.0 // indirect
|
||||
golang.org/x/sys v0.25.0 // indirect
|
||||
golang.org/x/text v0.18.0 // indirect
|
||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
|
||||
|
|
8
go.sum
8
go.sum
|
@ -69,8 +69,8 @@ go.mau.fi/util v0.8.1-0.20240927174413-000d30f9a02a h1:4TrWJ0ooHT9YssDBUgXNU8FiR
|
|||
go.mau.fi/util v0.8.1-0.20240927174413-000d30f9a02a/go.mod h1:1Ixb8HWoVbl3rT6nAX6nV4iMkzn7KU/KXwE0Rn5RmsQ=
|
||||
go.mau.fi/webp v0.1.0 h1:BHObH/DcFntT9KYun5pDr0Ot4eUZO8k2C7eP7vF4ueA=
|
||||
go.mau.fi/webp v0.1.0/go.mod h1:e42Z+VMFrUMS9cpEwGRIor+lQWO8oUAyPyMtcL+NMt8=
|
||||
go.mau.fi/whatsmeow v0.0.0-20240930130510-b12cf9c9deda h1:e1wgeyVk5F/3Pec4/iBQGvOBkkrc5HSbVE57ay/9x+A=
|
||||
go.mau.fi/whatsmeow v0.0.0-20240930130510-b12cf9c9deda/go.mod h1:UvaXcdb8y5Mryj2LSXAMw7u4/exnWJIXn8Gvpmf6ndI=
|
||||
go.mau.fi/whatsmeow v0.0.0-20241001110941-382edde94d9f h1:L3aOZEtq5XJcJO5+/mxO3MW7e2ofXNFkVT6McpcpF5k=
|
||||
go.mau.fi/whatsmeow v0.0.0-20241001110941-382edde94d9f/go.mod h1:UvaXcdb8y5Mryj2LSXAMw7u4/exnWJIXn8Gvpmf6ndI=
|
||||
go.mau.fi/zeroconfig v0.1.3 h1:As9wYDKmktjmNZW5i1vn8zvJlmGKHeVxHVIBMXsm4kM=
|
||||
go.mau.fi/zeroconfig v0.1.3/go.mod h1:NcSJkf180JT+1IId76PcMuLTNa1CzsFFZ0nBygIQM70=
|
||||
golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A=
|
||||
|
@ -101,5 +101,5 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
|||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
maunium.net/go/mauflag v1.0.0 h1:YiaRc0tEI3toYtJMRIfjP+jklH45uDHtT80nUamyD4M=
|
||||
maunium.net/go/mauflag v1.0.0/go.mod h1:nLivPOpTpHnpzEh8jEdSL9UqO9+/KBJFmNRlwKfkPeA=
|
||||
maunium.net/go/mautrix v0.21.1-0.20240930142122-741b4e823ffb h1:X19nOFvy3GINE6TbSshH5OiOgqbZqEgfu8e0Nsi2iAk=
|
||||
maunium.net/go/mautrix v0.21.1-0.20240930142122-741b4e823ffb/go.mod h1:qN4yYKm3brOUWN8dlR0KPbKwSBGXQ4am/kzSQt/kLmY=
|
||||
maunium.net/go/mautrix v0.21.1-0.20241001105237-37af19a01a61 h1:acO4UUl2NQgzCURgUP6SL+05/Vx0Kr09J3iiJD0WCrM=
|
||||
maunium.net/go/mautrix v0.21.1-0.20241001105237-37af19a01a61/go.mod h1:qN4yYKm3brOUWN8dlR0KPbKwSBGXQ4am/kzSQt/kLmY=
|
||||
|
|
|
@ -269,13 +269,18 @@ func (wa *WhatsAppClient) FetchMessages(ctx context.Context, params bridgev2.Fet
|
|||
}
|
||||
}
|
||||
convertedMessages := make([]*bridgev2.BackfillMessage, len(messages))
|
||||
var mediaRequests []*wadb.MediaRequest
|
||||
for i, msg := range messages {
|
||||
evt, err := wa.Client.ParseWebMessage(portalJID, msg)
|
||||
if err != nil {
|
||||
// This should never happen because the info is already parsed once before being stored in the database
|
||||
return nil, fmt.Errorf("failed to parse info of message %s: %w", msg.GetKey().GetID(), err)
|
||||
}
|
||||
convertedMessages[i] = wa.convertHistorySyncMessage(ctx, params.Portal, &evt.Info, msg)
|
||||
var mediaReq *wadb.MediaRequest
|
||||
convertedMessages[i], mediaReq = wa.convertHistorySyncMessage(ctx, params.Portal, &evt.Info, msg)
|
||||
if mediaReq != nil {
|
||||
mediaRequests = append(mediaRequests, mediaReq)
|
||||
}
|
||||
}
|
||||
return &bridgev2.FetchMessagesResponse{
|
||||
Messages: convertedMessages,
|
||||
|
@ -298,13 +303,20 @@ func (wa *WhatsAppClient) FetchMessages(ctx context.Context, params bridgev2.Fet
|
|||
if err != nil {
|
||||
zerolog.Ctx(ctx).Warn().Err(err).Msg("Failed to delete messages from database after backfill")
|
||||
}
|
||||
if len(mediaRequests) > 0 {
|
||||
go func(ctx context.Context) {
|
||||
for _, req := range mediaRequests {
|
||||
wa.sendMediaRequest(ctx, req)
|
||||
}
|
||||
}(context.WithoutCancel(ctx))
|
||||
}
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (wa *WhatsAppClient) convertHistorySyncMessage(
|
||||
ctx context.Context, portal *bridgev2.Portal, info *types.MessageInfo, msg *waWeb.WebMessageInfo,
|
||||
) *bridgev2.BackfillMessage {
|
||||
) (*bridgev2.BackfillMessage, *wadb.MediaRequest) {
|
||||
// TODO use proper intent
|
||||
intent := wa.Main.Bridge.Bot
|
||||
wrapped := &bridgev2.BackfillMessage{
|
||||
|
@ -315,6 +327,7 @@ func (wa *WhatsAppClient) convertHistorySyncMessage(
|
|||
Timestamp: info.Timestamp,
|
||||
Reactions: make([]*bridgev2.BackfillReaction, len(msg.Reactions)),
|
||||
}
|
||||
mediaReq := wa.processFailedMedia(ctx, portal.PortalKey, wrapped.ID, wrapped.ConvertedMessage, true)
|
||||
for i, reaction := range msg.Reactions {
|
||||
var sender types.JID
|
||||
if reaction.GetKey().GetFromMe() {
|
||||
|
@ -334,5 +347,5 @@ func (wa *WhatsAppClient) convertHistorySyncMessage(
|
|||
Emoji: reaction.GetText(),
|
||||
}
|
||||
}
|
||||
return wrapped
|
||||
return wrapped, mediaReq
|
||||
}
|
||||
|
|
|
@ -29,6 +29,7 @@ import (
|
|||
"go.mau.fi/whatsmeow/store"
|
||||
"go.mau.fi/whatsmeow/types"
|
||||
waLog "go.mau.fi/whatsmeow/util/log"
|
||||
"golang.org/x/sync/semaphore"
|
||||
"maunium.net/go/mautrix/bridge/status"
|
||||
"maunium.net/go/mautrix/bridgev2"
|
||||
"maunium.net/go/mautrix/bridgev2/networkid"
|
||||
|
@ -41,8 +42,9 @@ func (wa *WhatsAppConnector) LoadUserLogin(_ context.Context, login *bridgev2.Us
|
|||
Main: wa,
|
||||
UserLogin: login,
|
||||
|
||||
historySyncs: make(chan *waHistorySync.HistorySync, 64),
|
||||
resyncQueue: make(map[types.JID]resyncQueueItem),
|
||||
historySyncs: make(chan *waHistorySync.HistorySync, 64),
|
||||
resyncQueue: make(map[types.JID]resyncQueueItem),
|
||||
mediaRetryLock: semaphore.NewWeighted(wa.Config.HistorySync.MediaRequests.MaxAsyncHandle),
|
||||
}
|
||||
login.Client = w
|
||||
|
||||
|
@ -90,6 +92,7 @@ type WhatsAppClient struct {
|
|||
resyncQueue map[types.JID]resyncQueueItem
|
||||
resyncQueueLock sync.Mutex
|
||||
nextResync time.Time
|
||||
mediaRetryLock *semaphore.Weighted
|
||||
|
||||
lastPhoneOfflineWarning time.Time
|
||||
}
|
||||
|
@ -147,6 +150,9 @@ func (wa *WhatsAppClient) startLoops() {
|
|||
go wa.historySyncLoop(ctx)
|
||||
go wa.ghostResyncLoop(ctx)
|
||||
go wa.disconnectWarningLoop(ctx)
|
||||
if mrc := wa.Main.Config.HistorySync.MediaRequests; mrc.AutoRequestMedia && mrc.RequestMethod == MediaRequestMethodLocalTime {
|
||||
go wa.mediaRequestLoop(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
func (wa *WhatsAppClient) Disconnect() {
|
||||
|
|
|
@ -18,10 +18,14 @@ package connector
|
|||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/rs/zerolog"
|
||||
"go.mau.fi/whatsmeow"
|
||||
"go.mau.fi/whatsmeow/proto/waE2E"
|
||||
"go.mau.fi/whatsmeow/proto/waMmsRetry"
|
||||
"go.mau.fi/whatsmeow/types"
|
||||
"go.mau.fi/whatsmeow/types/events"
|
||||
"maunium.net/go/mautrix/bridgev2"
|
||||
|
@ -30,6 +34,7 @@ import (
|
|||
"maunium.net/go/mautrix/event"
|
||||
"maunium.net/go/mautrix/format"
|
||||
|
||||
"maunium.net/go/mautrix-whatsapp/pkg/msgconv"
|
||||
"maunium.net/go/mautrix-whatsapp/pkg/waid"
|
||||
)
|
||||
|
||||
|
@ -85,6 +90,7 @@ type WAMessageEvent struct {
|
|||
|
||||
parsedMessageType string
|
||||
isUndecryptableUpsertSubEvent bool
|
||||
postHandle func()
|
||||
}
|
||||
|
||||
var (
|
||||
|
@ -98,12 +104,20 @@ var (
|
|||
_ bridgev2.RemoteReactionWithMeta = (*WAMessageEvent)(nil)
|
||||
_ bridgev2.RemoteEdit = (*WAMessageEvent)(nil)
|
||||
_ bridgev2.RemoteMessageRemove = (*WAMessageEvent)(nil)
|
||||
_ bridgev2.RemotePostHandler = (*WAMessageEvent)(nil)
|
||||
)
|
||||
|
||||
func (evt *WAMessageEvent) AddLogContext(c zerolog.Context) zerolog.Context {
|
||||
return evt.MessageInfoWrapper.AddLogContext(c).Str("parsed_message_type", evt.parsedMessageType)
|
||||
}
|
||||
|
||||
func (evt *WAMessageEvent) PostHandle(ctx context.Context, portal *bridgev2.Portal) {
|
||||
if ph := evt.postHandle; ph != nil {
|
||||
evt.postHandle = nil
|
||||
ph()
|
||||
}
|
||||
}
|
||||
|
||||
func (evt *WAMessageEvent) ConvertEdit(ctx context.Context, portal *bridgev2.Portal, intent bridgev2.MatrixAPI, existing []*database.Message) (*bridgev2.ConvertedEdit, error) {
|
||||
if len(existing) > 1 {
|
||||
zerolog.Ctx(ctx).Warn().Msg("Got edit to message with multiple parts")
|
||||
|
@ -118,6 +132,11 @@ func (evt *WAMessageEvent) ConvertEdit(ctx context.Context, portal *bridgev2.Por
|
|||
|
||||
// TODO edits to media captions may not contain the media
|
||||
cm := evt.wa.Main.MsgConv.ToMatrix(ctx, portal, evt.wa.Client, intent, editedMsg, &evt.Info)
|
||||
if evt.isUndecryptableUpsertSubEvent && isFailedMedia(cm) {
|
||||
evt.postHandle = func() {
|
||||
evt.wa.processFailedMedia(ctx, portal.PortalKey, evt.GetID(), cm, false)
|
||||
}
|
||||
}
|
||||
return &bridgev2.ConvertedEdit{
|
||||
ModifiedParts: []*bridgev2.ConvertedEditPart{cm.Parts[0].ToEditPart(existing[0])},
|
||||
}, nil
|
||||
|
@ -201,6 +220,11 @@ func (evt *WAMessageEvent) HandleExisting(ctx context.Context, portal *bridgev2.
|
|||
func (evt *WAMessageEvent) ConvertMessage(ctx context.Context, portal *bridgev2.Portal, intent bridgev2.MatrixAPI) (*bridgev2.ConvertedMessage, error) {
|
||||
evt.wa.EnqueuePortalResync(portal)
|
||||
converted := evt.wa.Main.MsgConv.ToMatrix(ctx, portal, evt.wa.Client, intent, evt.Message, &evt.Info)
|
||||
if isFailedMedia(converted) {
|
||||
evt.postHandle = func() {
|
||||
evt.wa.processFailedMedia(ctx, portal.PortalKey, evt.GetID(), converted, false)
|
||||
}
|
||||
}
|
||||
return converted, nil
|
||||
}
|
||||
|
||||
|
@ -278,3 +302,113 @@ func (evt *WAUndecryptableMessage) ConvertMessage(ctx context.Context, portal *b
|
|||
Disappear: portal.Disappear,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type WAMediaRetry struct {
|
||||
*events.MediaRetry
|
||||
wa *WhatsAppClient
|
||||
}
|
||||
|
||||
func (evt *WAMediaRetry) GetType() bridgev2.RemoteEventType {
|
||||
return bridgev2.RemoteEventEdit
|
||||
}
|
||||
|
||||
func (evt *WAMediaRetry) GetPortalKey() networkid.PortalKey {
|
||||
return evt.wa.makeWAPortalKey(evt.ChatID)
|
||||
}
|
||||
|
||||
func (evt *WAMediaRetry) AddLogContext(c zerolog.Context) zerolog.Context {
|
||||
return c.
|
||||
Str("message_id", evt.MessageID).
|
||||
Stringer("sender_id", evt.SenderID).
|
||||
Stringer("chat_id", evt.ChatID).
|
||||
Bool("from_me", evt.FromMe).
|
||||
Str("wa_event_type", "media retry")
|
||||
}
|
||||
|
||||
func (evt *WAMediaRetry) getRealSender() types.JID {
|
||||
sender := evt.SenderID
|
||||
if evt.FromMe {
|
||||
sender = evt.wa.JID.ToNonAD()
|
||||
} else if sender.IsEmpty() && evt.ChatID.Server == types.DefaultUserServer {
|
||||
sender = evt.ChatID.ToNonAD()
|
||||
}
|
||||
return sender
|
||||
}
|
||||
|
||||
func (evt *WAMediaRetry) GetSender() bridgev2.EventSender {
|
||||
return evt.wa.makeEventSender(evt.getRealSender())
|
||||
}
|
||||
|
||||
func (evt *WAMediaRetry) GetTargetMessage() networkid.MessageID {
|
||||
return waid.MakeMessageID(evt.ChatID, evt.getRealSender(), evt.MessageID)
|
||||
}
|
||||
|
||||
func (evt *WAMediaRetry) GetTimestamp() time.Time {
|
||||
return evt.Timestamp
|
||||
}
|
||||
|
||||
func (evt *WAMediaRetry) makeErrorEdit(part *database.Message, meta *msgconv.PreparedMedia, err error) *bridgev2.ConvertedEdit {
|
||||
content := &event.MessageEventContent{
|
||||
MsgType: event.MsgNotice,
|
||||
Body: fmt.Sprintf("Failed to bridge media after re-requesting it from your phone: %v", err),
|
||||
}
|
||||
if meta.FormattedBody != "" {
|
||||
content.EnsureHasHTML()
|
||||
content.Body += "\n\n" + meta.Body
|
||||
content.FormattedBody += "<br><br>" + meta.FormattedBody
|
||||
} else if meta.Body != meta.FileName && meta.FileName != "" {
|
||||
content.Body += "\n\n" + meta.Body
|
||||
}
|
||||
return &bridgev2.ConvertedEdit{
|
||||
ModifiedParts: []*bridgev2.ConvertedEditPart{{
|
||||
Part: part,
|
||||
Type: event.EventMessage,
|
||||
Content: content,
|
||||
}},
|
||||
}
|
||||
}
|
||||
|
||||
func (evt *WAMediaRetry) ConvertEdit(ctx context.Context, portal *bridgev2.Portal, intent bridgev2.MatrixAPI, existing []*database.Message) (*bridgev2.ConvertedEdit, error) {
|
||||
meta := existing[0].Metadata.(*waid.MessageMetadata)
|
||||
if meta.Error != waid.MsgErrMediaNotFound {
|
||||
return nil, fmt.Errorf("%w: message doesn't have media error", bridgev2.ErrIgnoringRemoteEvent)
|
||||
} else if meta.MediaMeta == nil {
|
||||
return nil, fmt.Errorf("%w: message doesn't have media metadata", bridgev2.ErrIgnoringRemoteEvent)
|
||||
}
|
||||
var mediaMeta msgconv.PreparedMedia
|
||||
err := json.Unmarshal(meta.MediaMeta, &mediaMeta)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to unmarshal media metadata: %w", err)
|
||||
}
|
||||
log := zerolog.Ctx(ctx)
|
||||
retryData, err := whatsmeow.DecryptMediaRetryNotification(evt.MediaRetry, mediaMeta.FailedKeys.Key)
|
||||
if err != nil {
|
||||
log.Warn().Err(err).Msg("Failed to decrypt media retry notification")
|
||||
return evt.makeErrorEdit(existing[0], &mediaMeta, err), nil
|
||||
} else if retryData.GetResult() != waMmsRetry.MediaRetryNotification_SUCCESS {
|
||||
errorName := waMmsRetry.MediaRetryNotification_ResultType_name[int32(retryData.GetResult())]
|
||||
if retryData.GetDirectPath() == "" {
|
||||
log.Warn().Str("error_name", errorName).Msg("Got error response in media retry notification")
|
||||
log.Debug().Any("error_content", retryData).Msg("Full error response content")
|
||||
if retryData.GetResult() == waMmsRetry.MediaRetryNotification_NOT_FOUND {
|
||||
return evt.makeErrorEdit(existing[0], &mediaMeta, whatsmeow.ErrMediaNotAvailableOnPhone), nil
|
||||
}
|
||||
return evt.makeErrorEdit(existing[0], &mediaMeta, fmt.Errorf("phone sent error response: %s", errorName)), nil
|
||||
} else {
|
||||
log.Debug().Msg("Got error response in media retry notification, but response also contains a new download URL - trying to download")
|
||||
}
|
||||
}
|
||||
err = evt.wa.mediaRetryLock.Acquire(ctx, 1)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to acquire media retry lock: %w", err)
|
||||
}
|
||||
defer evt.wa.mediaRetryLock.Release(1)
|
||||
|
||||
mediaMeta.FailedKeys.DirectPath = retryData.GetDirectPath()
|
||||
return evt.wa.Main.MsgConv.MediaRetryToMatrix(ctx, &mediaMeta, evt.wa.Client, intent, portal, existing[0]), nil
|
||||
}
|
||||
|
||||
var (
|
||||
_ bridgev2.RemoteEdit = (*WAMediaRetry)(nil)
|
||||
_ bridgev2.RemoteEventWithTimestamp = (*WAMediaRetry)(nil)
|
||||
)
|
||||
|
|
|
@ -110,7 +110,7 @@ func (wa *WhatsAppClient) handleWAEvent(rawEvt any) {
|
|||
}
|
||||
case *events.MediaRetry:
|
||||
wa.phoneSeen(evt.Timestamp)
|
||||
// TODO
|
||||
wa.UserLogin.QueueRemoteEvent(&WAMediaRetry{MediaRetry: evt, wa: wa})
|
||||
|
||||
case *events.GroupInfo:
|
||||
wa.handleWAGroupInfoChange(evt)
|
||||
|
|
164
pkg/connector/mediarequest.go
Normal file
164
pkg/connector/mediarequest.go
Normal file
|
@ -0,0 +1,164 @@
|
|||
// mautrix-whatsapp - A Matrix-WhatsApp puppeting bridge.
|
||||
// Copyright (C) 2024 Tulir Asokan
|
||||
//
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Affero General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Affero General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
package connector
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/rs/zerolog"
|
||||
"go.mau.fi/whatsmeow/types"
|
||||
"maunium.net/go/mautrix/bridgev2"
|
||||
"maunium.net/go/mautrix/bridgev2/networkid"
|
||||
|
||||
"maunium.net/go/mautrix-whatsapp/pkg/connector/wadb"
|
||||
"maunium.net/go/mautrix-whatsapp/pkg/msgconv"
|
||||
"maunium.net/go/mautrix-whatsapp/pkg/waid"
|
||||
)
|
||||
|
||||
func isFailedMedia(converted *bridgev2.ConvertedMessage) bool {
|
||||
if len(converted.Parts) == 0 || converted.Parts[0].Extra == nil {
|
||||
return false
|
||||
}
|
||||
_, ok := converted.Parts[0].Extra[msgconv.FailedMediaField].(*msgconv.PreparedMedia)
|
||||
return ok
|
||||
}
|
||||
|
||||
func (wa *WhatsAppClient) processFailedMedia(ctx context.Context, portalKey networkid.PortalKey, msgID networkid.MessageID, converted *bridgev2.ConvertedMessage, isBackfill bool) *wadb.MediaRequest {
|
||||
if len(converted.Parts) == 0 || converted.Parts[0].Extra == nil {
|
||||
return nil
|
||||
}
|
||||
field, ok := converted.Parts[0].Extra[msgconv.FailedMediaField].(*msgconv.PreparedMedia)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
req := &wadb.MediaRequest{
|
||||
UserLoginID: wa.UserLogin.ID,
|
||||
MessageID: msgID,
|
||||
PortalKey: portalKey,
|
||||
MediaKey: field.FailedKeys.Key,
|
||||
Status: wadb.MediaBackfillRequestStatusNotRequested,
|
||||
}
|
||||
err := wa.Main.DB.MediaRequest.Put(ctx, req)
|
||||
if err != nil {
|
||||
zerolog.Ctx(ctx).Err(err).Msg("Failed to save failed media request")
|
||||
}
|
||||
if wa.Main.Config.HistorySync.MediaRequests.AutoRequestMedia && wa.Main.Config.HistorySync.MediaRequests.RequestMethod == MediaRequestMethodImmediate {
|
||||
if isBackfill {
|
||||
return req
|
||||
}
|
||||
go wa.sendMediaRequest(context.WithoutCancel(ctx), req)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (wa *WhatsAppClient) mediaRequestLoop(ctx context.Context) {
|
||||
log := wa.UserLogin.Log.With().Str("loop", "media requests").Logger()
|
||||
ctx = log.WithContext(ctx)
|
||||
tzName := wa.UserLogin.Metadata.(*waid.UserLoginMetadata).Timezone
|
||||
userTz, err := time.LoadLocation(tzName)
|
||||
var startIn time.Duration
|
||||
if tzName != "" && err == nil && userTz != nil {
|
||||
now := time.Now()
|
||||
startAt := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, userTz)
|
||||
startAt = startAt.Add(time.Duration(wa.Main.Config.HistorySync.MediaRequests.RequestLocalTime) * time.Minute)
|
||||
if startAt.Before(now) {
|
||||
startAt = startAt.AddDate(0, 0, 1)
|
||||
}
|
||||
startIn = time.Until(startAt)
|
||||
} else {
|
||||
startIn = 8 * time.Hour
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-time.After(startIn):
|
||||
}
|
||||
ticker := time.NewTicker(24 * time.Hour)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
wa.sendMediaRequests(ctx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (wa *WhatsAppClient) sendMediaRequests(ctx context.Context) {
|
||||
reqs, err := wa.Main.DB.MediaRequest.GetUnrequestedForUserLogin(ctx, wa.UserLogin.ID)
|
||||
if err != nil {
|
||||
zerolog.Ctx(ctx).Err(err).Msg("Failed to get media requests from database")
|
||||
return
|
||||
} else if len(reqs) == 0 {
|
||||
return
|
||||
}
|
||||
zerolog.Ctx(ctx).Info().Int("request_count", len(reqs)).Msg("Sending media requests")
|
||||
for _, req := range reqs {
|
||||
wa.sendMediaRequest(ctx, req)
|
||||
}
|
||||
}
|
||||
|
||||
func (wa *WhatsAppClient) sendMediaRequest(ctx context.Context, req *wadb.MediaRequest) {
|
||||
msgID, err := waid.ParseMessageID(req.MessageID)
|
||||
if err != nil {
|
||||
err = wa.Main.DB.MediaRequest.Delete(ctx, wa.UserLogin.ID, req.MessageID)
|
||||
if err != nil {
|
||||
zerolog.Ctx(ctx).Err(err).Str("message_id", string(req.MessageID)).Msg("Failed to delete invalid media request")
|
||||
}
|
||||
return
|
||||
}
|
||||
log := zerolog.Ctx(ctx).With().Str("action", "send media request").Str("message_id", string(req.MessageID)).Logger()
|
||||
defer func() {
|
||||
err = wa.Main.DB.MediaRequest.Put(ctx, req)
|
||||
if err != nil {
|
||||
log.Err(err).Msg("Failed to save media request status")
|
||||
}
|
||||
}()
|
||||
msg, err := wa.Main.Bridge.DB.Message.GetPartByID(ctx, wa.UserLogin.ID, req.MessageID, "")
|
||||
if err != nil {
|
||||
log.Err(err).Msg("Failed to get media retry target message from database")
|
||||
req.Status = wadb.MediaBackfillRequestStatusRequestSkipped
|
||||
return
|
||||
} else if msg == nil {
|
||||
log.Warn().Msg("Media retry target message not found in database")
|
||||
req.Status = wadb.MediaBackfillRequestStatusRequestSkipped
|
||||
return
|
||||
} else if msg.Metadata.(*waid.MessageMetadata).Error != waid.MsgErrMediaNotFound {
|
||||
log.Debug().Msg("Not sending media retry for message that doesn't have media error")
|
||||
req.Status = wadb.MediaBackfillRequestStatusRequestSkipped
|
||||
return
|
||||
}
|
||||
err = wa.Client.SendMediaRetryReceipt(&types.MessageInfo{
|
||||
ID: msgID.ID,
|
||||
MessageSource: types.MessageSource{
|
||||
IsFromMe: msgID.Sender.User == wa.JID.User,
|
||||
IsGroup: msgID.Chat.Server != types.DefaultUserServer,
|
||||
Sender: msgID.Sender,
|
||||
Chat: msgID.Chat,
|
||||
},
|
||||
}, req.MediaKey)
|
||||
if err != nil {
|
||||
log.Err(err).Msg("Failed to send media retry request")
|
||||
req.Status = wadb.MediaBackfillRequestStatusRequestFailed
|
||||
req.Error = err.Error()
|
||||
} else {
|
||||
log.Debug().Msg("Sent media retry request")
|
||||
req.Status = wadb.MediaBackfillRequestStatusRequested
|
||||
}
|
||||
}
|
|
@ -10,9 +10,10 @@ import (
|
|||
type MediaBackfillRequestStatus int
|
||||
|
||||
const (
|
||||
MediaBackfillRequestStatusNotRequested MediaBackfillRequestStatus = 0
|
||||
MediaBackfillRequestStatusRequested MediaBackfillRequestStatus = 1
|
||||
MediaBackfillRequestStatusRequestFailed MediaBackfillRequestStatus = 2
|
||||
MediaBackfillRequestStatusNotRequested MediaBackfillRequestStatus = 0
|
||||
MediaBackfillRequestStatusRequested MediaBackfillRequestStatus = 1
|
||||
MediaBackfillRequestStatusRequestFailed MediaBackfillRequestStatus = 2
|
||||
MediaBackfillRequestStatusRequestSkipped MediaBackfillRequestStatus = 3
|
||||
)
|
||||
|
||||
type MediaRequestQuery struct {
|
||||
|
|
|
@ -37,8 +37,8 @@ import (
|
|||
"go.mau.fi/util/random"
|
||||
"go.mau.fi/whatsmeow"
|
||||
"go.mau.fi/whatsmeow/proto/waE2E"
|
||||
|
||||
"maunium.net/go/mautrix/bridgev2"
|
||||
"maunium.net/go/mautrix/bridgev2/database"
|
||||
"maunium.net/go/mautrix/event"
|
||||
|
||||
"maunium.net/go/mautrix-whatsapp/pkg/waid"
|
||||
|
@ -46,13 +46,14 @@ import (
|
|||
|
||||
func (mc *MessageConverter) convertMediaMessage(ctx context.Context, msg MediaMessage, typeName string) (part *bridgev2.ConvertedMessagePart, contextInfo *waE2E.ContextInfo) {
|
||||
preparedMedia := prepareMediaMessage(msg)
|
||||
preparedMedia.TypeDescription = typeName
|
||||
if preparedMedia.FileName != "" && preparedMedia.Body != preparedMedia.FileName {
|
||||
mc.parseFormatting(preparedMedia.MessageEventContent, false, false)
|
||||
}
|
||||
contextInfo = preparedMedia.ContextInfo
|
||||
err := mc.reuploadWhatsAppAttachment(ctx, msg, preparedMedia)
|
||||
if err != nil {
|
||||
part = mc.makeMediaFailure(ctx, typeName, preparedMedia, &FailedMediaKeys{
|
||||
part = mc.makeMediaFailure(ctx, preparedMedia, &FailedMediaKeys{
|
||||
Key: msg.GetMediaKey(),
|
||||
Length: msg.GetFileLength(),
|
||||
Type: whatsmeow.GetMediaType(msg),
|
||||
|
@ -69,22 +70,53 @@ func (mc *MessageConverter) convertMediaMessage(ctx context.Context, msg MediaMe
|
|||
return
|
||||
}
|
||||
|
||||
const failedMediaField = "fi.mau.whatsapp.failed_media"
|
||||
const FailedMediaField = "fi.mau.whatsapp.failed_media"
|
||||
|
||||
type FailedMediaKeys struct {
|
||||
Key []byte `json:"key"`
|
||||
Length uint64 `json:"length"`
|
||||
Type whatsmeow.MediaType `json:"type"`
|
||||
SHA256 []byte `json:"sha256"`
|
||||
EncSHA256 []byte `json:"enc_sha256"`
|
||||
Key []byte `json:"key"`
|
||||
Length uint64 `json:"length"`
|
||||
Type whatsmeow.MediaType `json:"type"`
|
||||
SHA256 []byte `json:"sha256"`
|
||||
EncSHA256 []byte `json:"enc_sha256"`
|
||||
DirectPath string `json:"-"`
|
||||
}
|
||||
|
||||
func (f *FailedMediaKeys) GetDirectPath() string {
|
||||
return f.DirectPath
|
||||
}
|
||||
|
||||
func (f *FailedMediaKeys) GetMediaType() whatsmeow.MediaType {
|
||||
return f.Type
|
||||
}
|
||||
|
||||
func (f *FailedMediaKeys) GetFileLength() uint64 {
|
||||
return f.Length
|
||||
}
|
||||
|
||||
func (f *FailedMediaKeys) GetMediaKey() []byte {
|
||||
return f.Key
|
||||
}
|
||||
|
||||
func (f *FailedMediaKeys) GetFileSHA256() []byte {
|
||||
return f.SHA256
|
||||
}
|
||||
|
||||
func (f *FailedMediaKeys) GetFileEncSHA256() []byte {
|
||||
return f.EncSHA256
|
||||
}
|
||||
|
||||
var (
|
||||
_ whatsmeow.DownloadableMessage = (*FailedMediaKeys)(nil)
|
||||
_ whatsmeow.MediaTypeable = (*FailedMediaKeys)(nil)
|
||||
)
|
||||
|
||||
type PreparedMedia struct {
|
||||
Type event.Type `json:"type"`
|
||||
*event.MessageEventContent `json:"content"`
|
||||
Extra map[string]any `json:"extra"`
|
||||
FailedKeys *FailedMediaKeys `json:"whatsapp_media"` // only for failed media
|
||||
MentionedJID []string `json:"mentioned_jid,omitempty"` // only for failed media
|
||||
TypeDescription string `json:"type_description"`
|
||||
ContextInfo *waE2E.ContextInfo `json:"-"`
|
||||
}
|
||||
|
||||
|
@ -188,9 +220,36 @@ func prepareMediaMessage(rawMsg MediaMessage) *PreparedMedia {
|
|||
// TODO read this from config?
|
||||
const uploadFileThreshold = 5 * 1024 * 1024
|
||||
|
||||
func (mc *MessageConverter) MediaRetryToMatrix(
|
||||
ctx context.Context,
|
||||
part *PreparedMedia,
|
||||
client *whatsmeow.Client,
|
||||
intent bridgev2.MatrixAPI,
|
||||
portal *bridgev2.Portal,
|
||||
existingPart *database.Message,
|
||||
) *bridgev2.ConvertedEdit {
|
||||
ctx = context.WithValue(ctx, contextKeyClient, client)
|
||||
ctx = context.WithValue(ctx, contextKeyIntent, intent)
|
||||
ctx = context.WithValue(ctx, contextKeyPortal, portal)
|
||||
err := mc.reuploadWhatsAppAttachment(ctx, part.FailedKeys, part)
|
||||
var updatedPart *bridgev2.ConvertedMessagePart
|
||||
if err != nil {
|
||||
updatedPart = mc.makeMediaFailure(ctx, part, nil, err)
|
||||
} else {
|
||||
updatedPart = &bridgev2.ConvertedMessagePart{
|
||||
Type: event.EventMessage,
|
||||
Content: part.MessageEventContent,
|
||||
Extra: part.Extra,
|
||||
}
|
||||
}
|
||||
return &bridgev2.ConvertedEdit{
|
||||
ModifiedParts: []*bridgev2.ConvertedEditPart{updatedPart.ToEditPart(existingPart)},
|
||||
}
|
||||
}
|
||||
|
||||
func (mc *MessageConverter) reuploadWhatsAppAttachment(
|
||||
ctx context.Context,
|
||||
message MediaMessage,
|
||||
message whatsmeow.DownloadableMessage,
|
||||
part *PreparedMedia,
|
||||
) error {
|
||||
client := getClient(ctx)
|
||||
|
@ -340,12 +399,12 @@ func (mc *MessageConverter) convertAnimatedSticker(ctx context.Context, fileInfo
|
|||
}
|
||||
}
|
||||
|
||||
func (mc *MessageConverter) makeMediaFailure(ctx context.Context, mediaType string, mediaInfo *PreparedMedia, keys *FailedMediaKeys, err error) *bridgev2.ConvertedMessagePart {
|
||||
func (mc *MessageConverter) makeMediaFailure(ctx context.Context, mediaInfo *PreparedMedia, keys *FailedMediaKeys, err error) *bridgev2.ConvertedMessagePart {
|
||||
logLevel := zerolog.ErrorLevel
|
||||
var extra map[string]any
|
||||
var dbMeta *waid.MessageMetadata
|
||||
errorMsg := fmt.Sprintf("Failed to bridge %s, please view it on the WhatsApp app", mediaType)
|
||||
if errors.Is(err, whatsmeow.ErrMediaDownloadFailedWith403) || errors.Is(err, whatsmeow.ErrMediaDownloadFailedWith404) || errors.Is(err, whatsmeow.ErrMediaDownloadFailedWith410) {
|
||||
errorMsg := fmt.Sprintf("Failed to bridge %s, please view it on the WhatsApp app", mediaInfo.TypeDescription)
|
||||
if keys != nil && (errors.Is(err, whatsmeow.ErrMediaDownloadFailedWith403) || errors.Is(err, whatsmeow.ErrMediaDownloadFailedWith404) || errors.Is(err, whatsmeow.ErrMediaDownloadFailedWith410)) {
|
||||
logLevel = zerolog.DebugLevel
|
||||
mediaInfo.FailedKeys = keys
|
||||
mediaInfo.MentionedJID = mediaInfo.ContextInfo.GetMentionedJID()
|
||||
|
@ -354,15 +413,16 @@ func (mc *MessageConverter) makeMediaFailure(ctx context.Context, mediaType stri
|
|||
zerolog.Ctx(ctx).Err(serializerErr).Msg("Failed to serialize media info")
|
||||
}
|
||||
extra = map[string]any{
|
||||
failedMediaField: mediaInfo,
|
||||
FailedMediaField: mediaInfo,
|
||||
}
|
||||
dbMeta = &waid.MessageMetadata{
|
||||
Error: waid.MsgErrMediaNotFound,
|
||||
MediaMeta: serializedMedia,
|
||||
}
|
||||
errorMsg = fmt.Sprintf("Old %s. Viewing old media is not currently supported.", mediaType)
|
||||
errorMsg = fmt.Sprintf("Old %s. Viewing old media is not currently supported.", mediaInfo.TypeDescription)
|
||||
}
|
||||
zerolog.Ctx(ctx).WithLevel(logLevel).Err(err).Str("media_type", mediaType).
|
||||
zerolog.Ctx(ctx).WithLevel(logLevel).Err(err).
|
||||
Str("media_type", mediaInfo.TypeDescription).
|
||||
Msg("Failed to reupload WhatsApp attachment")
|
||||
part := &bridgev2.ConvertedMessagePart{
|
||||
Type: event.EventMessage,
|
||||
|
@ -373,7 +433,11 @@ func (mc *MessageConverter) makeMediaFailure(ctx context.Context, mediaType stri
|
|||
Extra: extra,
|
||||
DBMetadata: dbMeta,
|
||||
}
|
||||
if mediaInfo.Body != "" && mediaInfo.FileName != "" && mediaInfo.Body != mediaInfo.FileName {
|
||||
if mediaInfo.FormattedBody != "" {
|
||||
part.Content.EnsureHasHTML()
|
||||
part.Content.FormattedBody += "<br><br>" + mediaInfo.FormattedBody
|
||||
part.Content.Body += "\n\n" + mediaInfo.Body
|
||||
} else if mediaInfo.Body != "" && mediaInfo.FileName != "" && mediaInfo.Body != mediaInfo.FileName {
|
||||
part.Content.Body += "\n\n" + mediaInfo.Body
|
||||
}
|
||||
return part
|
||||
|
|
Loading…
Add table
Reference in a new issue