mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-20 12:33:09 -06:00
Handle m.room.tombstone events in the UserAPI
This commit is contained in:
parent
a7b74176e3
commit
bc3a94ceae
|
|
@ -8,6 +8,8 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/tidwall/gjson"
|
||||||
|
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/nats-io/nats.go"
|
"github.com/nats-io/nats.go"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
@ -185,13 +187,98 @@ func (s *OutputRoomEventConsumer) storeMessageStats(ctx context.Context, eventTy
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *OutputRoomEventConsumer) handleRoomUpgrade(ctx context.Context, oldRoomID, newRoomID string, localMembers []*localMembership, roomSize int) error {
|
||||||
|
for _, membership := range localMembers {
|
||||||
|
// Copy any existing push rules from old -> new room
|
||||||
|
if err := s.copyPushrules(ctx, oldRoomID, newRoomID, membership); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// preserve m.direct room state
|
||||||
|
if err := s.updateMDirect(ctx, oldRoomID, newRoomID, membership, roomSize); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *OutputRoomEventConsumer) copyPushrules(ctx context.Context, oldRoomID string, newRoomID string, membership *localMembership) error {
|
||||||
|
pushRules, err := s.db.QueryPushRules(ctx, membership.Localpart)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to query pushrules for user: %w", err)
|
||||||
|
}
|
||||||
|
if pushRules == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, roomRule := range pushRules.Global.Room {
|
||||||
|
if roomRule.RuleID != oldRoomID {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
cpRool := *roomRule
|
||||||
|
cpRool.RuleID = newRoomID
|
||||||
|
pushRules.Global.Room = append(pushRules.Global.Room, &cpRool)
|
||||||
|
rules, err := json.Marshal(pushRules)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := s.db.SaveAccountData(ctx, membership.Localpart, "", "m.push_rules", rules); err != nil {
|
||||||
|
return fmt.Errorf("failed to update pushrules: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// updateMDirect copies the "is_direct" flag from oldRoomID to newROomID
|
||||||
|
func (s *OutputRoomEventConsumer) updateMDirect(ctx context.Context, oldRoomID, newRoomID string, membership *localMembership, roomSize int) error {
|
||||||
|
// this is most likely not a DM, so skip updating m.direct state
|
||||||
|
if roomSize > 2 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
// Get direct message state
|
||||||
|
directChatsRaw, err := s.db.GetAccountDataByType(ctx, membership.Localpart, "", "m.direct")
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to get m.direct from database: %w", err)
|
||||||
|
}
|
||||||
|
directChats := gjson.ParseBytes(directChatsRaw)
|
||||||
|
newDirectChats := make(map[string][]string)
|
||||||
|
// iterate over all userID -> roomIDs
|
||||||
|
directChats.ForEach(func(userID, roomIDs gjson.Result) bool {
|
||||||
|
var found bool
|
||||||
|
for _, roomID := range roomIDs.Array() {
|
||||||
|
newDirectChats[userID.Str] = append(newDirectChats[userID.Str], roomID.Str)
|
||||||
|
// add the new roomID to m.direct
|
||||||
|
if roomID.Str == oldRoomID {
|
||||||
|
found = true
|
||||||
|
newDirectChats[userID.Str] = append(newDirectChats[userID.Str], newRoomID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Only hit the database if we found the old room as a DM for this user
|
||||||
|
if found {
|
||||||
|
data, err := json.Marshal(newDirectChats)
|
||||||
|
if err != nil {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
if err = s.db.SaveAccountData(ctx, membership.Localpart, "", "m.direct", data); err != nil {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to update m.direct state")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *OutputRoomEventConsumer) processMessage(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, streamPos uint64) error {
|
func (s *OutputRoomEventConsumer) processMessage(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, streamPos uint64) error {
|
||||||
members, roomSize, err := s.localRoomMembers(ctx, event.RoomID())
|
members, roomSize, err := s.localRoomMembers(ctx, event.RoomID())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("s.localRoomMembers: %w", err)
|
return fmt.Errorf("s.localRoomMembers: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if event.Type() == gomatrixserverlib.MRoomMember {
|
switch {
|
||||||
|
case event.Type() == gomatrixserverlib.MRoomMember:
|
||||||
cevent := gomatrixserverlib.HeaderedToClientEvent(event, gomatrixserverlib.FormatAll)
|
cevent := gomatrixserverlib.HeaderedToClientEvent(event, gomatrixserverlib.FormatAll)
|
||||||
var member *localMembership
|
var member *localMembership
|
||||||
member, err = newLocalMembership(&cevent)
|
member, err = newLocalMembership(&cevent)
|
||||||
|
|
@ -203,6 +290,13 @@ func (s *OutputRoomEventConsumer) processMessage(ctx context.Context, event *gom
|
||||||
// should also be pushed to the target user.
|
// should also be pushed to the target user.
|
||||||
members = append(members, member)
|
members = append(members, member)
|
||||||
}
|
}
|
||||||
|
case event.Type() == "m.room.tombstone" && event.StateKeyEquals(""):
|
||||||
|
// Handle room upgrades
|
||||||
|
oldRoomID := event.RoomID()
|
||||||
|
newRoomID := gjson.GetBytes(event.Content(), "replacement_room").Str
|
||||||
|
log.Debugf("XXX: received tombstone event: %s -> %s", oldRoomID, newRoomID)
|
||||||
|
s.handleRoomUpgrade(ctx, oldRoomID, newRoomID, members, roomSize)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: run in parallel with localRoomMembers.
|
// TODO: run in parallel with localRoomMembers.
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue