mirror of
https://github.com/mautrix/whatsapp.git
synced 2025-03-14 14:15:38 +00:00
wadb: add wrappers for backfill and poll option tables
This commit is contained in:
parent
7264c7fee2
commit
edf6b17b8c
13 changed files with 569 additions and 49 deletions
|
@ -228,13 +228,14 @@ LEFT JOIN user_login ON user_login.user_mxid = history_sync_conversation_old.use
|
|||
WHERE user_login.id IS NOT NULL;
|
||||
|
||||
INSERT INTO whatsapp_history_sync_message (
|
||||
bridge_id, user_login_id, chat_jid, message_id, timestamp, data, inserted_time
|
||||
bridge_id, user_login_id, chat_jid, sender_jid, message_id, timestamp, data, inserted_time
|
||||
)
|
||||
SELECT
|
||||
'',
|
||||
user_login.id,
|
||||
conversation_id,
|
||||
message_id,
|
||||
'',
|
||||
-- only: postgres
|
||||
CAST(EXTRACT(EPOCH FROM timestamp) AS BIGINT),
|
||||
-- only: sqlite (line commented)
|
||||
|
|
|
@ -36,7 +36,7 @@ func main() {
|
|||
"v0.11.0",
|
||||
m.LegacyMigrateWithAnotherUpgrader(
|
||||
legacyMigrateRenameTables, legacyMigrateCopyData, 17,
|
||||
upgrades.Table, "whatsapp_version", 1,
|
||||
upgrades.Table, "whatsapp_version", 2,
|
||||
),
|
||||
true,
|
||||
)
|
||||
|
|
4
go.mod
4
go.mod
|
@ -13,7 +13,7 @@ require (
|
|||
github.com/rs/zerolog v1.33.0
|
||||
github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e
|
||||
github.com/tidwall/gjson v1.17.3
|
||||
go.mau.fi/util v0.7.1-0.20240913091524-7617daa66719
|
||||
go.mau.fi/util v0.8.1-0.20240917114523-1ba4f6274db5
|
||||
go.mau.fi/webp v0.1.0
|
||||
go.mau.fi/whatsmeow v0.0.0-20240916205343-ea8c175b2e2c
|
||||
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0
|
||||
|
@ -22,7 +22,7 @@ require (
|
|||
golang.org/x/sync v0.8.0
|
||||
google.golang.org/protobuf v1.34.2
|
||||
gopkg.in/yaml.v3 v3.0.1
|
||||
maunium.net/go/mautrix v0.20.1-0.20240914094516-d89dac594db0
|
||||
maunium.net/go/mautrix v0.21.0
|
||||
)
|
||||
|
||||
require (
|
||||
|
|
8
go.sum
8
go.sum
|
@ -78,8 +78,8 @@ github.com/yuin/goldmark v1.7.4 h1:BDXOHExt+A7gwPCJgPIIq7ENvceR7we7rOS9TNoLZeg=
|
|||
github.com/yuin/goldmark v1.7.4/go.mod h1:uzxRWxtg69N339t3louHJ7+O03ezfj6PlliRlaOzY1E=
|
||||
go.mau.fi/libsignal v0.1.1 h1:m/0PGBh4QKP/I1MQ44ti4C0fMbLMuHb95cmDw01FIpI=
|
||||
go.mau.fi/libsignal v0.1.1/go.mod h1:QLs89F/OA3ThdSL2Wz2p+o+fi8uuQUz0e1BRa6ExdBw=
|
||||
go.mau.fi/util v0.7.1-0.20240913091524-7617daa66719 h1:sg1P/f4RHY1JuAwsPOjTCsZr8ROzR9bRTtnvvBu42d4=
|
||||
go.mau.fi/util v0.7.1-0.20240913091524-7617daa66719/go.mod h1:1Ixb8HWoVbl3rT6nAX6nV4iMkzn7KU/KXwE0Rn5RmsQ=
|
||||
go.mau.fi/util v0.8.1-0.20240917114523-1ba4f6274db5 h1:UBob83/x5OS6nLUAGaZepQtaTcnHdRKpJswvoS3MQUs=
|
||||
go.mau.fi/util v0.8.1-0.20240917114523-1ba4f6274db5/go.mod h1:1Ixb8HWoVbl3rT6nAX6nV4iMkzn7KU/KXwE0Rn5RmsQ=
|
||||
go.mau.fi/webp v0.1.0 h1:BHObH/DcFntT9KYun5pDr0Ot4eUZO8k2C7eP7vF4ueA=
|
||||
go.mau.fi/webp v0.1.0/go.mod h1:e42Z+VMFrUMS9cpEwGRIor+lQWO8oUAyPyMtcL+NMt8=
|
||||
go.mau.fi/whatsmeow v0.0.0-20240916205343-ea8c175b2e2c h1:0pNAbeBNHdDmLbHG2bj+tnQnwE5YZVE83/QAfMlaYs4=
|
||||
|
@ -114,5 +114,5 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
|||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
maunium.net/go/mauflag v1.0.0 h1:YiaRc0tEI3toYtJMRIfjP+jklH45uDHtT80nUamyD4M=
|
||||
maunium.net/go/mauflag v1.0.0/go.mod h1:nLivPOpTpHnpzEh8jEdSL9UqO9+/KBJFmNRlwKfkPeA=
|
||||
maunium.net/go/mautrix v0.20.1-0.20240914094516-d89dac594db0 h1:fTX1P8TPv+oUqHGu08jj6FYH+Q/fC9jtmvkXcAw+KTo=
|
||||
maunium.net/go/mautrix v0.20.1-0.20240914094516-d89dac594db0/go.mod h1:amzKPIZVO7v1piD2JhKG1RvGZoV+5wEZfoHaEXOjjqA=
|
||||
maunium.net/go/mautrix v0.21.0 h1:Z6nVu+clkJgj6ANwFYQQ1BtYeVXZPZ9lRgwuFN57gOY=
|
||||
maunium.net/go/mautrix v0.21.0/go.mod h1:qm9oDhcHxF/Xby5RUuONIGpXw1SXXqLZj/GgvMxJxu0=
|
||||
|
|
|
@ -45,7 +45,7 @@ func (wa *WhatsAppConnector) GetName() bridgev2.BridgeName {
|
|||
func (wa *WhatsAppConnector) Init(bridge *bridgev2.Bridge) {
|
||||
wa.Bridge = bridge
|
||||
wa.MsgConv = msgconv.New(bridge)
|
||||
wa.DB = wadb.New(bridge.DB.Database, bridge.Log.With().Str("db_section", "whatsapp").Logger())
|
||||
wa.DB = wadb.New(bridge.ID, bridge.DB.Database, bridge.Log.With().Str("db_section", "whatsapp").Logger())
|
||||
|
||||
wa.DeviceStore = sqlstore.NewWithDB(
|
||||
bridge.DB.RawDB,
|
||||
|
|
143
pkg/connector/wadb/conversation.go
Normal file
143
pkg/connector/wadb/conversation.go
Normal file
|
@ -0,0 +1,143 @@
|
|||
package wadb
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"go.mau.fi/util/dbutil"
|
||||
"go.mau.fi/whatsmeow/proto/waHistorySync"
|
||||
"go.mau.fi/whatsmeow/types"
|
||||
"maunium.net/go/mautrix/bridgev2/networkid"
|
||||
)
|
||||
|
||||
type ConversationQuery struct {
|
||||
BridgeID networkid.BridgeID
|
||||
*dbutil.QueryHelper[*Conversation]
|
||||
}
|
||||
|
||||
type Conversation struct {
|
||||
BridgeID networkid.BridgeID
|
||||
UserLoginID networkid.UserLoginID
|
||||
ChatJID types.JID
|
||||
LastMessageTimestamp time.Time
|
||||
Archived bool
|
||||
Pinned bool
|
||||
MuteEndTime time.Time
|
||||
EndOfHistoryTransferType waHistorySync.Conversation_EndOfHistoryTransferType
|
||||
EphemeralExpiration time.Duration
|
||||
EphemeralSettingTimestamp int64
|
||||
MarkedAsUnread bool
|
||||
UnreadCount uint32
|
||||
}
|
||||
|
||||
const (
|
||||
upsertHistorySyncConversationQuery = `
|
||||
INSERT INTO whatsapp_history_sync_conversation (
|
||||
bridge_id, user_login_id, chat_jid, last_message_timestamp, archived, pinned, mute_end_time,
|
||||
end_of_history_transfer_type, ephemeral_expiration, ephemeral_setting_timestamp, marked_as_unread, unread_count
|
||||
)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
|
||||
ON CONFLICT (bridge_id, user_login_id, chat_jid)
|
||||
DO UPDATE SET
|
||||
last_message_timestamp=CASE
|
||||
WHEN excluded.last_message_timestamp > whatsapp_history_sync_conversation.last_message_timestamp THEN excluded.last_message_timestamp
|
||||
ELSE whatsapp_history_sync_conversation.last_message_timestamp
|
||||
END,
|
||||
ephemeral_expiration=COALESCE(excluded.ephemeral_expiration, whatsapp_history_sync_conversation.ephemeral_expiration),
|
||||
ephemeral_setting_timestamp=COALESCE(excluded.ephemeral_setting_timestamp, whatsapp_history_sync_conversation.ephemeral_setting_timestamp),
|
||||
end_of_history_transfer_type=excluded.end_of_history_transfer_type
|
||||
`
|
||||
getRecentConversations = `
|
||||
SELECT
|
||||
bridge_id, user_login_id, chat_jid, last_message_timestamp, archived, pinned, mute_end_time,
|
||||
end_of_history_transfer_type, ephemeral_expiration, ephemeral_setting_timestamp, marked_as_unread, unread_count
|
||||
FROM whatsapp_history_sync_conversation
|
||||
WHERE bridge_id=$1 AND user_login_id=$2
|
||||
ORDER BY last_message_timestamp DESC
|
||||
LIMIT $3
|
||||
`
|
||||
getConversationByJID = `
|
||||
SELECT
|
||||
bridge_id, user_login_id, chat_jid, last_message_timestamp, archived, pinned, mute_end_time,
|
||||
end_of_history_transfer_type, ephemeral_expiration, ephemeral_setting_timestamp, marked_as_unread, unread_count
|
||||
FROM whatsapp_history_sync_conversation
|
||||
WHERE bridge_id=$1 AND user_login_id=$2 AND chat_jid=$3
|
||||
`
|
||||
deleteAllConversationsQuery = "DELETE FROM whatsapp_history_sync_conversation WHERE bridge_id=$1 AND user_login_id=$2"
|
||||
deleteConversationQuery = `
|
||||
DELETE FROM whatsapp_history_sync_conversation
|
||||
WHERE bridge_id=$1 AND user_login_id=$2 AND chat_jid=$3
|
||||
`
|
||||
)
|
||||
|
||||
func (cq *ConversationQuery) Put(ctx context.Context, conv *Conversation) error {
|
||||
conv.BridgeID = cq.BridgeID
|
||||
return cq.Exec(ctx, upsertHistorySyncConversationQuery, conv.sqlVariables()...)
|
||||
}
|
||||
|
||||
func (cq *ConversationQuery) GetRecent(ctx context.Context, loginID networkid.UserLoginID, limit int) ([]*Conversation, error) {
|
||||
limitPtr := &limit
|
||||
// Negative limit on SQLite means unlimited, but Postgres prefers a NULL limit.
|
||||
if limit < 0 && cq.GetDB().Dialect == dbutil.Postgres {
|
||||
limitPtr = nil
|
||||
}
|
||||
return cq.QueryMany(ctx, getRecentConversations, cq.BridgeID, loginID, limitPtr)
|
||||
}
|
||||
|
||||
func (cq *ConversationQuery) Get(ctx context.Context, loginID networkid.UserLoginID, chatJID types.JID) (*Conversation, error) {
|
||||
return cq.QueryOne(ctx, getConversationByJID, cq.BridgeID, loginID, chatJID)
|
||||
}
|
||||
|
||||
func (cq *ConversationQuery) DeleteAll(ctx context.Context, loginID networkid.UserLoginID) error {
|
||||
return cq.Exec(ctx, deleteAllConversationsQuery, cq.BridgeID, loginID)
|
||||
}
|
||||
|
||||
func (cq *ConversationQuery) Delete(ctx context.Context, loginID networkid.UserLoginID, chatJID types.JID) error {
|
||||
return cq.Exec(ctx, deleteConversationQuery, cq.BridgeID, loginID, chatJID)
|
||||
}
|
||||
|
||||
func (c *Conversation) sqlVariables() []any {
|
||||
return []any{
|
||||
c.BridgeID,
|
||||
c.UserLoginID,
|
||||
c.ChatJID,
|
||||
c.LastMessageTimestamp.Unix(),
|
||||
c.Archived,
|
||||
c.Pinned,
|
||||
c.MuteEndTime.Unix(),
|
||||
c.EndOfHistoryTransferType,
|
||||
int64(c.EphemeralExpiration.Seconds()),
|
||||
c.EphemeralSettingTimestamp,
|
||||
c.MarkedAsUnread,
|
||||
c.UnreadCount,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Conversation) Scan(row dbutil.Scannable) (*Conversation, error) {
|
||||
var lastMessageTS, muteEndTime, ephemeralExpiration int64
|
||||
err := row.Scan(
|
||||
&c.BridgeID,
|
||||
&c.UserLoginID,
|
||||
&c.ChatJID,
|
||||
&lastMessageTS,
|
||||
&c.Archived,
|
||||
&c.Pinned,
|
||||
&muteEndTime,
|
||||
&c.EndOfHistoryTransferType,
|
||||
&ephemeralExpiration,
|
||||
&c.EphemeralSettingTimestamp,
|
||||
&c.MarkedAsUnread,
|
||||
&c.UnreadCount,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if lastMessageTS != 0 {
|
||||
c.LastMessageTimestamp = time.Unix(lastMessageTS, 0)
|
||||
}
|
||||
if muteEndTime != 0 {
|
||||
c.MuteEndTime = time.Unix(muteEndTime, 0)
|
||||
}
|
||||
c.EphemeralExpiration = time.Duration(ephemeralExpiration) * time.Second
|
||||
return c, nil
|
||||
}
|
|
@ -3,17 +3,42 @@ package wadb
|
|||
import (
|
||||
"github.com/rs/zerolog"
|
||||
"go.mau.fi/util/dbutil"
|
||||
"maunium.net/go/mautrix/bridgev2/networkid"
|
||||
|
||||
"maunium.net/go/mautrix-whatsapp/pkg/connector/wadb/upgrades"
|
||||
)
|
||||
|
||||
type Database struct {
|
||||
*dbutil.Database
|
||||
Conversation *ConversationQuery
|
||||
Message *MessageQuery
|
||||
PollOption *PollOptionQuery
|
||||
MediaRequest *MediaRequestQuery
|
||||
}
|
||||
|
||||
func New(db *dbutil.Database, log zerolog.Logger) *Database {
|
||||
func New(bridgeID networkid.BridgeID, db *dbutil.Database, log zerolog.Logger) *Database {
|
||||
db = db.Child("whatsapp_version", upgrades.Table, dbutil.ZeroLogger(log))
|
||||
return &Database{
|
||||
Database: db,
|
||||
Conversation: &ConversationQuery{
|
||||
BridgeID: bridgeID,
|
||||
QueryHelper: dbutil.MakeQueryHelper(db, func(_ *dbutil.QueryHelper[*Conversation]) *Conversation {
|
||||
return &Conversation{}
|
||||
}),
|
||||
},
|
||||
Message: &MessageQuery{
|
||||
BridgeID: bridgeID,
|
||||
Database: db,
|
||||
},
|
||||
PollOption: &PollOptionQuery{
|
||||
BridgeID: bridgeID,
|
||||
Database: db,
|
||||
},
|
||||
MediaRequest: &MediaRequestQuery{
|
||||
BridgeID: bridgeID,
|
||||
QueryHelper: dbutil.MakeQueryHelper(db, func(_ *dbutil.QueryHelper[*MediaRequest]) *MediaRequest {
|
||||
return &MediaRequest{}
|
||||
}),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
69
pkg/connector/wadb/mediarequest.go
Normal file
69
pkg/connector/wadb/mediarequest.go
Normal file
|
@ -0,0 +1,69 @@
|
|||
package wadb
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"go.mau.fi/util/dbutil"
|
||||
"maunium.net/go/mautrix/bridgev2/networkid"
|
||||
"maunium.net/go/mautrix/id"
|
||||
)
|
||||
|
||||
type MediaBackfillRequestStatus int
|
||||
|
||||
const (
|
||||
MediaBackfillRequestStatusNotRequested MediaBackfillRequestStatus = 0
|
||||
MediaBackfillRequestStatusRequested MediaBackfillRequestStatus = 1
|
||||
MediaBackfillRequestStatusRequestFailed MediaBackfillRequestStatus = 2
|
||||
)
|
||||
|
||||
type MediaRequestQuery struct {
|
||||
BridgeID networkid.BridgeID
|
||||
*dbutil.QueryHelper[*MediaRequest]
|
||||
}
|
||||
|
||||
type MediaRequest struct {
|
||||
BridgeID networkid.BridgeID
|
||||
UserLoginID networkid.UserLoginID
|
||||
PortalKey networkid.PortalKey
|
||||
EventID id.EventID
|
||||
MediaKey []byte
|
||||
Status MediaBackfillRequestStatus
|
||||
Error string
|
||||
}
|
||||
|
||||
const (
|
||||
upsertMediaRequestQuery = `
|
||||
INSERT INTO whatsapp_media_backfill_request (
|
||||
bridge_id, user_login_id, portal_id, portal_receiver, event_id, media_key, status, error
|
||||
)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
||||
ON CONFLICT (bridge_id, user_login_id, portal_id, portal_receiver, event_id) DO UPDATE SET
|
||||
media_key=excluded.media_key, status=excluded.status, error=excluded.error
|
||||
`
|
||||
getAllUnrequestedMediaRequestsForUserLoginQuery = `
|
||||
SELECT bridge_id, user_login_id, portal_id, portal_receiver, event_id, media_key, status, error
|
||||
FROM whatsapp_media_backfill_request
|
||||
WHERE bridge_id=$1 AND user_login_id=$2 AND status=0
|
||||
`
|
||||
)
|
||||
|
||||
func (mrq *MediaRequestQuery) Put(ctx context.Context, mr *MediaRequest) error {
|
||||
mr.BridgeID = mrq.BridgeID
|
||||
return mrq.Exec(ctx, upsertMediaRequestQuery, mr.sqlVariables()...)
|
||||
}
|
||||
|
||||
func (mrq *MediaRequestQuery) GetUnrequestedForUserLogin(ctx context.Context, loginID networkid.UserLoginID) ([]*MediaRequest, error) {
|
||||
return mrq.QueryMany(ctx, getAllUnrequestedMediaRequestsForUserLoginQuery, mrq.BridgeID, loginID)
|
||||
}
|
||||
|
||||
func (mr *MediaRequest) Scan(row dbutil.Scannable) (*MediaRequest, error) {
|
||||
err := row.Scan(&mr.BridgeID, &mr.UserLoginID, &mr.PortalKey.ID, &mr.PortalKey.Receiver, &mr.EventID, &mr.MediaKey, &mr.Status, &mr.Error)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return mr, nil
|
||||
}
|
||||
|
||||
func (mr *MediaRequest) sqlVariables() []any {
|
||||
return []any{mr.BridgeID, mr.UserLoginID, mr.PortalKey.ID, mr.PortalKey.Receiver, mr.EventID, mr.MediaKey, mr.Status, mr.Error}
|
||||
}
|
121
pkg/connector/wadb/message.go
Normal file
121
pkg/connector/wadb/message.go
Normal file
|
@ -0,0 +1,121 @@
|
|||
package wadb
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"go.mau.fi/util/dbutil"
|
||||
"go.mau.fi/whatsmeow/proto/waHistorySync"
|
||||
"go.mau.fi/whatsmeow/proto/waWeb"
|
||||
"go.mau.fi/whatsmeow/types"
|
||||
"google.golang.org/protobuf/proto"
|
||||
"maunium.net/go/mautrix/bridgev2/networkid"
|
||||
)
|
||||
|
||||
type MessageQuery struct {
|
||||
BridgeID networkid.BridgeID
|
||||
*dbutil.Database
|
||||
}
|
||||
|
||||
const (
|
||||
insertHistorySyncMessageQuery = `
|
||||
INSERT INTO whatsapp_history_sync_message (bridge_id, user_login_id, chat_jid, sender_jid, message_id, timestamp, data, inserted_time)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7)
|
||||
ON CONFLICT (bridge_id, user_login_id, chat_jid, sender_jid, message_id) DO NOTHING
|
||||
`
|
||||
getHistorySyncMessagesBetweenQueryTemplate = `
|
||||
SELECT data FROM whatsapp_history_sync_message
|
||||
WHERE bridge_id=$1 AND user_login_id=$2 AND chat_jid=$3
|
||||
%s
|
||||
ORDER BY timestamp DESC
|
||||
%s
|
||||
`
|
||||
deleteHistorySyncMessagesBetweenExclusiveQuery = `
|
||||
DELETE FROM whatsapp_history_sync_message
|
||||
WHERE bridge_id=$1 AND user_login_id=$2 AND chat_jid=$3 AND timestamp<$4 AND timestamp>$5
|
||||
`
|
||||
deleteAllHistorySyncMessagesQuery = "DELETE FROM whatsapp_history_sync_message WHERE bridge_id=$1 AND user_login_id=$2"
|
||||
deleteHistorySyncMessagesForPortalQuery = `
|
||||
DELETE FROM whatsapp_history_sync_message
|
||||
WHERE bridge_id=$1 AND user_login_id=$2 AND chat_jid=$3
|
||||
`
|
||||
conversationHasHistorySyncMessagesQuery = `
|
||||
SELECT EXISTS(
|
||||
SELECT 1 FROM whatsapp_history_sync_message
|
||||
WHERE bridge_id=$1 AND user_login_id=$2 AND chat_jid=$3
|
||||
)
|
||||
`
|
||||
)
|
||||
|
||||
func (mq *MessageQuery) Put(ctx context.Context, loginID networkid.UserLoginID, parsedInfo *types.MessageInfo, message *waHistorySync.HistorySyncMsg) error {
|
||||
msgData, err := proto.Marshal(message)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = mq.Exec(ctx, insertHistorySyncMessageQuery,
|
||||
mq.BridgeID, loginID, parsedInfo.Chat, parsedInfo.Sender.ToNonAD(), parsedInfo.ID,
|
||||
parsedInfo.Timestamp, msgData, time.Now())
|
||||
return err
|
||||
}
|
||||
|
||||
func scanWebMessageInfo(rows dbutil.Scannable) (*waWeb.WebMessageInfo, error) {
|
||||
var msgData []byte
|
||||
err := rows.Scan(&msgData)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var historySyncMsg waHistorySync.HistorySyncMsg
|
||||
err = proto.Unmarshal(msgData, &historySyncMsg)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to unmarshal message: %w", err)
|
||||
}
|
||||
return historySyncMsg.GetMessage(), nil
|
||||
}
|
||||
|
||||
var webMessageInfoConverter = dbutil.ConvertRowFn[*waWeb.WebMessageInfo](scanWebMessageInfo)
|
||||
|
||||
func (mq *MessageQuery) GetBetween(ctx context.Context, loginID networkid.UserLoginID, chatJID types.JID, startTime, endTime *time.Time, limit int) ([]*waWeb.WebMessageInfo, error) {
|
||||
whereClauses := ""
|
||||
args := []any{mq.BridgeID, loginID, chatJID}
|
||||
argNum := 4
|
||||
if startTime != nil {
|
||||
whereClauses += fmt.Sprintf(" AND timestamp >= $%d", argNum)
|
||||
args = append(args, startTime.Unix())
|
||||
argNum++
|
||||
}
|
||||
if endTime != nil {
|
||||
whereClauses += fmt.Sprintf(" AND timestamp <= $%d", argNum)
|
||||
args = append(args, endTime.Unix())
|
||||
}
|
||||
|
||||
limitClause := ""
|
||||
if limit > 0 {
|
||||
limitClause = fmt.Sprintf("LIMIT %d", limit)
|
||||
}
|
||||
query := fmt.Sprintf(getHistorySyncMessagesBetweenQueryTemplate, whereClauses, limitClause)
|
||||
|
||||
return webMessageInfoConverter.
|
||||
NewRowIter(mq.Query(ctx, query, args...)).
|
||||
AsList()
|
||||
}
|
||||
|
||||
func (mq *MessageQuery) DeleteBetween(ctx context.Context, loginID networkid.UserLoginID, chatJID types.JID, before, after time.Time) error {
|
||||
_, err := mq.Exec(ctx, deleteHistorySyncMessagesBetweenExclusiveQuery, mq.BridgeID, loginID, chatJID, before.Unix(), after.Unix())
|
||||
return err
|
||||
}
|
||||
|
||||
func (mq *MessageQuery) DeleteAll(ctx context.Context, loginID networkid.UserLoginID) error {
|
||||
_, err := mq.Exec(ctx, deleteAllHistorySyncMessagesQuery, mq.BridgeID, loginID)
|
||||
return err
|
||||
}
|
||||
|
||||
func (mq *MessageQuery) DeleteAllInChat(ctx context.Context, loginID networkid.UserLoginID, chatJID types.JID) error {
|
||||
_, err := mq.Exec(ctx, deleteHistorySyncMessagesForPortalQuery, mq.BridgeID, loginID, chatJID)
|
||||
return err
|
||||
}
|
||||
|
||||
func (mq *MessageQuery) ConversationHasMessages(ctx context.Context, loginID networkid.UserLoginID, chatJID types.JID) (exists bool, err error) {
|
||||
err = mq.QueryRow(ctx, conversationHasHistorySyncMessagesQuery, mq.BridgeID, loginID, chatJID).Scan(&exists)
|
||||
return
|
||||
}
|
120
pkg/connector/wadb/polloption.go
Normal file
120
pkg/connector/wadb/polloption.go
Normal file
|
@ -0,0 +1,120 @@
|
|||
package wadb
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/lib/pq"
|
||||
"go.mau.fi/util/dbutil"
|
||||
"maunium.net/go/mautrix/bridgev2/networkid"
|
||||
"maunium.net/go/mautrix/id"
|
||||
)
|
||||
|
||||
type PollOptionQuery struct {
|
||||
BridgeID networkid.BridgeID
|
||||
*dbutil.Database
|
||||
}
|
||||
|
||||
type pollOption struct {
|
||||
id string
|
||||
hash [32]byte
|
||||
}
|
||||
|
||||
const (
|
||||
putPollOptionBaseQuery = `
|
||||
INSERT INTO whatsapp_poll_option_id (bridge_id, msg_mxid, opt_id, opt_hash)
|
||||
VALUES ($1, $2, $3, $4)
|
||||
`
|
||||
getPollOptionIDsByHashesQuery = "SELECT opt_id, opt_hash FROM whatsapp_poll_option_id WHERE bridge_id=$1 AND msg_mxid=$2 AND opt_hash = ANY($3)"
|
||||
getPollOptionHashesByIDsQuery = "SELECT opt_id, opt_hash FROM whatsapp_poll_option_id WHERE bridge_id=$1 AND msg_mxid=$2 AND opt_id = ANY($3)"
|
||||
getPollOptionQuerySQLiteArrayTemplate = " IN (%s)"
|
||||
getPollOptionQueryArrayPlaceholder = " = ANY($3)"
|
||||
)
|
||||
|
||||
func init() {
|
||||
if strings.ReplaceAll(getPollOptionIDsByHashesQuery, getPollOptionQueryArrayPlaceholder, "meow") == getPollOptionIDsByHashesQuery {
|
||||
panic("Array select query placeholder not found")
|
||||
}
|
||||
if strings.ReplaceAll(getPollOptionHashesByIDsQuery, getPollOptionQueryArrayPlaceholder, "meow") == getPollOptionIDsByHashesQuery {
|
||||
panic("Array select query placeholder not found")
|
||||
}
|
||||
}
|
||||
|
||||
var pollOptionInserter = dbutil.NewMassInsertBuilder[*pollOption, [2]any](
|
||||
putPollOptionBaseQuery, "($1, $2, $%d, $%d)",
|
||||
)
|
||||
|
||||
func (poq *PollOptionQuery) Put(ctx context.Context, mxid id.EventID, opts map[[32]byte]string) error {
|
||||
if len(opts) == 0 {
|
||||
return nil
|
||||
}
|
||||
optArray := make([]*pollOption, len(opts))
|
||||
i := 0
|
||||
for hash, optID := range opts {
|
||||
optArray[i] = &pollOption{id: optID, hash: hash}
|
||||
i++
|
||||
}
|
||||
query, args := pollOptionInserter.Build([2]any{poq.BridgeID, mxid}, optArray)
|
||||
_, err := poq.Exec(ctx, query, args...)
|
||||
return err
|
||||
}
|
||||
|
||||
func (poq *PollOptionQuery) GetIDs(ctx context.Context, mxid id.EventID, hashes [][]byte) (map[[32]byte]string, error) {
|
||||
return getPollOptions(
|
||||
poq, ctx, mxid, getPollOptionIDsByHashesQuery, hashes,
|
||||
func(t *pollOption) ([32]byte, string) { return t.hash, t.id },
|
||||
)
|
||||
}
|
||||
|
||||
func (poq *PollOptionQuery) GetHashes(ctx context.Context, mxid id.EventID, ids []string) (map[string][32]byte, error) {
|
||||
return getPollOptions(
|
||||
poq, ctx, mxid, getPollOptionHashesByIDsQuery, ids,
|
||||
func(t *pollOption) (string, [32]byte) { return t.id, t.hash },
|
||||
)
|
||||
}
|
||||
|
||||
func getPollOptions[LookupKey any, Key comparable, Value any](
|
||||
poq *PollOptionQuery,
|
||||
ctx context.Context,
|
||||
mxid id.EventID,
|
||||
query string,
|
||||
things []LookupKey,
|
||||
getKeyValue func(option *pollOption) (Key, Value),
|
||||
) (map[Key]Value, error) {
|
||||
var args []any
|
||||
if poq.Dialect == dbutil.Postgres {
|
||||
args = []any{poq.BridgeID, mxid, pq.Array(things)}
|
||||
} else {
|
||||
query = strings.ReplaceAll(query, getPollOptionQueryArrayPlaceholder, fmt.Sprintf(getPollOptionQuerySQLiteArrayTemplate, strings.TrimSuffix(strings.Repeat("?,", len(things)), ",")))
|
||||
args = make([]any, len(things)+2)
|
||||
args[0] = poq.BridgeID
|
||||
args[1] = mxid
|
||||
for i, thing := range things {
|
||||
args[i+2] = thing
|
||||
}
|
||||
}
|
||||
return dbutil.RowIterAsMap(
|
||||
dbutil.ConvertRowFn[*pollOption](scanPollOption).NewRowIter(poq.Query(ctx, query, args...)),
|
||||
getKeyValue,
|
||||
)
|
||||
}
|
||||
|
||||
func scanPollOption(row dbutil.Scannable) (*pollOption, error) {
|
||||
var optionID string
|
||||
var hash []byte
|
||||
err := row.Scan(&optionID, &hash)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if len(hash) != 32 {
|
||||
return nil, fmt.Errorf("invalid hash length: %d", len(hash))
|
||||
}
|
||||
return &pollOption{
|
||||
id: optionID,
|
||||
hash: [32]byte(hash),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (po *pollOption) GetMassInsertValues() [2]any {
|
||||
return [2]any{po.id, po.hash[:]}
|
||||
}
|
|
@ -1,67 +1,68 @@
|
|||
-- v0 -> v1 (compatible with v1+): Latest revision
|
||||
-- v0 -> v2 (compatible with v2+): Latest revision
|
||||
|
||||
CREATE TABLE whatsapp_poll_option_id (
|
||||
bridge_id TEXT,
|
||||
msg_mxid TEXT,
|
||||
opt_id TEXT,
|
||||
opt_hash bytea CHECK ( length(opt_hash) = 32 ),
|
||||
bridge_id TEXT NOT NULL,
|
||||
msg_mxid TEXT NOT NULL,
|
||||
opt_id TEXT NOT NULL,
|
||||
opt_hash bytea NOT NULL CHECK ( length(opt_hash) = 32 ),
|
||||
|
||||
PRIMARY KEY (bridge_id, msg_mxid, opt_id),
|
||||
CONSTRAINT whatsapp_poll_option_unique_hash UNIQUE (bridge_id, msg_mxid, opt_hash),
|
||||
CONSTRAINT message_mxid_fkey FOREIGN KEY (bridge_id, msg_mxid)
|
||||
REFERENCES message(bridge_id, mxid) ON DELETE CASCADE ON UPDATE CASCADE
|
||||
REFERENCES message (bridge_id, mxid) ON DELETE CASCADE ON UPDATE CASCADE
|
||||
);
|
||||
|
||||
CREATE TABLE whatsapp_history_sync_conversation (
|
||||
bridge_id TEXT,
|
||||
user_login_id TEXT,
|
||||
chat_jid TEXT,
|
||||
bridge_id TEXT NOT NULL,
|
||||
user_login_id TEXT NOT NULL,
|
||||
chat_jid TEXT NOT NULL,
|
||||
|
||||
last_message_timestamp BIGINT,
|
||||
archived BOOLEAN,
|
||||
pinned BOOLEAN,
|
||||
mute_end_time BIGINT,
|
||||
end_of_history_transfer_type INTEGER,
|
||||
ephemeral_expiration INTEGER,
|
||||
ephemeral_setting_timestamp BIGINT,
|
||||
marked_as_unread BOOLEAN,
|
||||
unread_count INTEGER,
|
||||
last_message_timestamp BIGINT NOT NULL,
|
||||
archived BOOLEAN NOT NULL,
|
||||
pinned BOOLEAN NOT NULL,
|
||||
mute_end_time BIGINT NOT NULL,
|
||||
end_of_history_transfer_type INTEGER NOT NULL,
|
||||
ephemeral_expiration INTEGER NOT NULL,
|
||||
ephemeral_setting_timestamp BIGINT NOT NULL,
|
||||
marked_as_unread BOOLEAN NOT NULL,
|
||||
unread_count INTEGER NOT NULL,
|
||||
|
||||
PRIMARY KEY (bridge_id, user_login_id, chat_jid),
|
||||
CONSTRAINT whatsapp_history_sync_conversation_user_login_fkey FOREIGN KEY (bridge_id, user_login_id)
|
||||
REFERENCES user_login(bridge_id, id) ON UPDATE CASCADE ON DELETE CASCADE
|
||||
REFERENCES user_login (bridge_id, id) ON UPDATE CASCADE ON DELETE CASCADE
|
||||
);
|
||||
|
||||
CREATE TABLE whatsapp_history_sync_message (
|
||||
bridge_id TEXT,
|
||||
user_login_id TEXT,
|
||||
chat_jid TEXT,
|
||||
message_id TEXT,
|
||||
timestamp BIGINT,
|
||||
data bytea,
|
||||
inserted_time BIGINT,
|
||||
bridge_id TEXT NOT NULL,
|
||||
user_login_id TEXT NOT NULL,
|
||||
chat_jid TEXT NOT NULL,
|
||||
sender_jid TEXT NOT NULL,
|
||||
message_id TEXT NOT NULL,
|
||||
timestamp BIGINT NOT NULL,
|
||||
data bytea NOT NULL,
|
||||
inserted_time BIGINT NOT NULL,
|
||||
|
||||
PRIMARY KEY (bridge_id, user_login_id, chat_jid, message_id),
|
||||
PRIMARY KEY (bridge_id, user_login_id, chat_jid, sender_jid, message_id),
|
||||
CONSTRAINT whatsapp_history_sync_message_user_login_fkey FOREIGN KEY (bridge_id, user_login_id)
|
||||
REFERENCES user_login(bridge_id, id) ON UPDATE CASCADE ON DELETE CASCADE,
|
||||
REFERENCES user_login (bridge_id, id) ON UPDATE CASCADE ON DELETE CASCADE,
|
||||
CONSTRAINT whatsapp_history_sync_message_conversation_fkey FOREIGN KEY (bridge_id, user_login_id, chat_jid)
|
||||
REFERENCES whatsapp_history_sync_conversation(bridge_id, user_login_id, chat_jid) ON UPDATE CASCADE ON DELETE CASCADE
|
||||
REFERENCES whatsapp_history_sync_conversation (bridge_id, user_login_id, chat_jid) ON UPDATE CASCADE ON DELETE CASCADE
|
||||
);
|
||||
|
||||
CREATE TABLE whatsapp_media_backfill_request (
|
||||
bridge_id TEXT,
|
||||
user_login_id TEXT,
|
||||
portal_id TEXT,
|
||||
portal_receiver TEXT,
|
||||
bridge_id TEXT NOT NULL,
|
||||
user_login_id TEXT NOT NULL,
|
||||
portal_id TEXT NOT NULL,
|
||||
portal_receiver TEXT NOT NULL,
|
||||
|
||||
event_id TEXT,
|
||||
media_key bytea,
|
||||
status INTEGER,
|
||||
error TEXT,
|
||||
event_id TEXT NOT NULL,
|
||||
media_key bytea NOT NULL,
|
||||
status INTEGER NOT NULL,
|
||||
error TEXT NOT NULL,
|
||||
|
||||
PRIMARY KEY (bridge_id, user_login_id, portal_id, portal_receiver, event_id),
|
||||
CONSTRAINT whatsapp_media_backfill_request_user_login_fkey FOREIGN KEY (bridge_id, user_login_id)
|
||||
REFERENCES user_login(bridge_id, id) ON UPDATE CASCADE ON DELETE CASCADE,
|
||||
REFERENCES user_login (bridge_id, id) ON UPDATE CASCADE ON DELETE CASCADE,
|
||||
CONSTRAINT whatsapp_media_backfill_request_portal_fkey FOREIGN KEY (bridge_id, portal_id, portal_receiver)
|
||||
REFERENCES portal(bridge_id, id, receiver) ON UPDATE CASCADE ON DELETE CASCADE
|
||||
REFERENCES portal (bridge_id, id, receiver) ON UPDATE CASCADE ON DELETE CASCADE
|
||||
);
|
||||
|
|
|
@ -0,0 +1,4 @@
|
|||
-- v2: Add sender JID to history sync messages
|
||||
-- transaction: sqlite-fkey-off
|
||||
ALTER TABLE whatsapp_history_sync_message ADD COLUMN sender_jid TEXT NOT NULL DEFAULT '';
|
||||
ALTER TABLE whatsapp_history_sync_message ALTER COLUMN sender_jid DROP DEFAULT;
|
|
@ -0,0 +1,36 @@
|
|||
-- v2: Add sender JID to history sync messages
|
||||
-- transaction: sqlite-fkey-off
|
||||
|
||||
CREATE TABLE whatsapp_history_sync_message_new (
|
||||
bridge_id TEXT NOT NULL,
|
||||
user_login_id TEXT NOT NULL,
|
||||
chat_jid TEXT NOT NULL,
|
||||
sender_jid TEXT NOT NULL,
|
||||
message_id TEXT NOT NULL,
|
||||
timestamp BIGINT NOT NULL,
|
||||
data bytea NOT NULL,
|
||||
inserted_time BIGINT NOT NULL,
|
||||
|
||||
PRIMARY KEY (bridge_id, user_login_id, chat_jid, sender_jid, message_id),
|
||||
CONSTRAINT whatsapp_history_sync_message_user_login_fkey FOREIGN KEY (bridge_id, user_login_id)
|
||||
REFERENCES user_login (bridge_id, id) ON UPDATE CASCADE ON DELETE CASCADE,
|
||||
CONSTRAINT whatsapp_history_sync_message_conversation_fkey FOREIGN KEY (bridge_id, user_login_id, chat_jid)
|
||||
REFERENCES whatsapp_history_sync_conversation (bridge_id, user_login_id, chat_jid) ON UPDATE CASCADE ON DELETE CASCADE
|
||||
);
|
||||
|
||||
INSERT INTO whatsapp_history_sync_message_new (
|
||||
bridge_id, user_login_id, chat_jid, sender_jid, message_id, timestamp, data, inserted_time
|
||||
)
|
||||
SELECT
|
||||
bridge_id,
|
||||
user_login_id,
|
||||
chat_jid,
|
||||
message_id,
|
||||
'',
|
||||
timestamp,
|
||||
data,
|
||||
inserted_time
|
||||
FROM whatsapp_history_sync_message;
|
||||
|
||||
DROP TABLE whatsapp_history_sync_message;
|
||||
ALTER TABLE whatsapp_history_sync_message_new RENAME TO whatsapp_history_sync_message;
|
Loading…
Add table
Reference in a new issue