signalmeow/storageservice: process updates in transaction

This commit is contained in:
Tulir Asokan 2025-01-20 19:55:51 +02:00
parent 3db54fd574
commit 121945a445
3 changed files with 20 additions and 14 deletions

View file

@ -767,7 +767,7 @@ func (cli *Client) StoreMasterKey(ctx context.Context, groupMasterKey types.Seri
}
err = cli.Store.GroupStore.StoreMasterKey(ctx, groupIdentifier, groupMasterKey)
if err != nil {
return "", fmt.Errorf("StoreMasterKey error: %w", err)
return groupIdentifier, fmt.Errorf("StoreMasterKey error: %w", err)
}
return groupIdentifier, nil
}

View file

@ -49,6 +49,16 @@ func (cli *Client) SyncStorage(ctx context.Context) {
log.Err(err).Msg("Failed to fetch storage")
return
}
err = cli.Store.DoTxn(ctx, func(ctx context.Context) error {
return cli.processStorageInTxn(ctx, update)
})
if err != nil {
log.Err(err).Msg("Failed to process storage update")
}
}
func (cli *Client) processStorageInTxn(ctx context.Context, update *StorageUpdate) error {
log := zerolog.Ctx(ctx)
for _, record := range update.NewRecords {
switch data := record.StorageRecord.GetRecord().(type) {
case *signalpb.StorageRecord_Contact:
@ -64,7 +74,7 @@ func (cli *Client) SyncStorage(ctx context.Context) {
continue
}
contact := data.Contact
_, err = cli.Store.RecipientStore.LoadAndUpdateRecipient(ctx, aci, pni, func(recipient *types.Recipient) (changed bool, err error) {
_, err := cli.Store.RecipientStore.LoadAndUpdateRecipient(ctx, aci, pni, func(recipient *types.Recipient) (changed bool, err error) {
if len(contact.ProfileKey) == libsignalgo.ProfileKeyLength {
newProfileKey := libsignalgo.ProfileKey(contact.ProfileKey)
changed = changed || recipient.Profile.Key != newProfileKey
@ -85,10 +95,7 @@ func (cli *Client) SyncStorage(ctx context.Context) {
return
})
if err != nil {
log.Err(err).
Stringer("aci", aci).
Stringer("pni", pni).
Msg("Failed to update contact")
return fmt.Errorf("failed to update contact %s/%s: %w", aci, pni, err)
}
case *signalpb.StorageRecord_GroupV2:
if len(data.GroupV2.MasterKey) != libsignalgo.GroupMasterKeyLength {
@ -98,25 +105,24 @@ func (cli *Client) SyncStorage(ctx context.Context) {
masterKey := libsignalgo.GroupMasterKey(data.GroupV2.MasterKey)
groupID, err := cli.StoreMasterKey(ctx, masterKeyFromBytes(masterKey))
if err != nil {
log.Err(err).Msg("Failed to store group master key from storage service")
} else {
log.Debug().Stringer("group_id", groupID).Msg("Stored group master key from storage service")
return fmt.Errorf("failed to store group master key for %s: %w", groupID, err)
}
log.Debug().Stringer("group_id", groupID).Msg("Stored group master key from storage service")
case *signalpb.StorageRecord_Account:
log.Trace().Any("account_record", data.Account).Msg("Found account record")
cli.Store.AccountRecord = data.Account
err = cli.Store.DeviceStore.PutDevice(ctx, &cli.Store.DeviceData)
err := cli.Store.DeviceStore.PutDevice(ctx, &cli.Store.DeviceData)
if err != nil {
log.Err(err).Msg("Failed to save device after receiving account record")
} else {
log.Debug().Msg("Saved device after receiving account record")
return fmt.Errorf("failed to save device after receiving account record: %w", err)
}
log.Debug().Msg("Saved device after receiving account record")
case *signalpb.StorageRecord_GroupV1, *signalpb.StorageRecord_StoryDistributionList:
// irrelevant data
default:
log.Warn().Type("type", data).Str("item_id", record.StorageID).Msg("Unknown storage record type")
}
}
return nil
}
type StorageUpdate struct {

View file

@ -109,7 +109,7 @@ func (c *Container) scanDevice(row dbutil.Scannable) (*Device, error) {
device.DeviceStore = baseStore
device.BackupStore = baseStore
device.DoTxn = func(ctx context.Context, fn func(context.Context) error) error {
return c.db.DoTxn(ctx, nil, fn)
return c.db.DoTxn(context.WithValue(ctx, dbutil.ContextKeyDoTxnCallerSkip, 1), nil, fn)
}
return &device, nil