From bc3a94ceae071fff799d482ed69eb65163066bbe Mon Sep 17 00:00:00 2001 From: Till Faelligen <2353100+S7evinK@users.noreply.github.com> Date: Mon, 7 Nov 2022 08:50:40 +0100 Subject: [PATCH] Handle m.room.tombstone events in the UserAPI --- userapi/consumers/roomserver.go | 96 ++++++++++++++++++++++++++++++++- 1 file changed, 95 insertions(+), 1 deletion(-) diff --git a/userapi/consumers/roomserver.go b/userapi/consumers/roomserver.go index 97c17e188..19266f99d 100644 --- a/userapi/consumers/roomserver.go +++ b/userapi/consumers/roomserver.go @@ -8,6 +8,8 @@ import ( "sync" "time" + "github.com/tidwall/gjson" + "github.com/matrix-org/gomatrixserverlib" "github.com/nats-io/nats.go" log "github.com/sirupsen/logrus" @@ -185,13 +187,98 @@ func (s *OutputRoomEventConsumer) storeMessageStats(ctx context.Context, eventTy } } +func (s *OutputRoomEventConsumer) handleRoomUpgrade(ctx context.Context, oldRoomID, newRoomID string, localMembers []*localMembership, roomSize int) error { + for _, membership := range localMembers { + // Copy any existing push rules from old -> new room + if err := s.copyPushrules(ctx, oldRoomID, newRoomID, membership); err != nil { + return err + } + + // preserve m.direct room state + if err := s.updateMDirect(ctx, oldRoomID, newRoomID, membership, roomSize); err != nil { + return err + } + } + return nil +} + +func (s *OutputRoomEventConsumer) copyPushrules(ctx context.Context, oldRoomID string, newRoomID string, membership *localMembership) error { + pushRules, err := s.db.QueryPushRules(ctx, membership.Localpart) + if err != nil { + return fmt.Errorf("failed to query pushrules for user: %w", err) + } + if pushRules == nil { + return nil + } + + for _, roomRule := range pushRules.Global.Room { + if roomRule.RuleID != oldRoomID { + continue + } + cpRool := *roomRule + cpRool.RuleID = newRoomID + pushRules.Global.Room = append(pushRules.Global.Room, &cpRool) + rules, err := json.Marshal(pushRules) + if err != nil { + return err + } + if err := s.db.SaveAccountData(ctx, membership.Localpart, "", "m.push_rules", rules); err != nil { + return fmt.Errorf("failed to update pushrules: %w", err) + } + } + return nil +} + +// updateMDirect copies the "is_direct" flag from oldRoomID to newROomID +func (s *OutputRoomEventConsumer) updateMDirect(ctx context.Context, oldRoomID, newRoomID string, membership *localMembership, roomSize int) error { + // this is most likely not a DM, so skip updating m.direct state + if roomSize > 2 { + return nil + } + // Get direct message state + directChatsRaw, err := s.db.GetAccountDataByType(ctx, membership.Localpart, "", "m.direct") + if err != nil { + return fmt.Errorf("failed to get m.direct from database: %w", err) + } + directChats := gjson.ParseBytes(directChatsRaw) + newDirectChats := make(map[string][]string) + // iterate over all userID -> roomIDs + directChats.ForEach(func(userID, roomIDs gjson.Result) bool { + var found bool + for _, roomID := range roomIDs.Array() { + newDirectChats[userID.Str] = append(newDirectChats[userID.Str], roomID.Str) + // add the new roomID to m.direct + if roomID.Str == oldRoomID { + found = true + newDirectChats[userID.Str] = append(newDirectChats[userID.Str], newRoomID) + } + } + // Only hit the database if we found the old room as a DM for this user + if found { + data, err := json.Marshal(newDirectChats) + if err != nil { + return true + } + if err = s.db.SaveAccountData(ctx, membership.Localpart, "", "m.direct", data); err != nil { + return true + } + } + return true + }) + if err != nil { + return fmt.Errorf("failed to update m.direct state") + } + return nil +} + func (s *OutputRoomEventConsumer) processMessage(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, streamPos uint64) error { members, roomSize, err := s.localRoomMembers(ctx, event.RoomID()) if err != nil { return fmt.Errorf("s.localRoomMembers: %w", err) } - if event.Type() == gomatrixserverlib.MRoomMember { + switch { + case event.Type() == gomatrixserverlib.MRoomMember: cevent := gomatrixserverlib.HeaderedToClientEvent(event, gomatrixserverlib.FormatAll) var member *localMembership member, err = newLocalMembership(&cevent) @@ -203,6 +290,13 @@ func (s *OutputRoomEventConsumer) processMessage(ctx context.Context, event *gom // should also be pushed to the target user. members = append(members, member) } + case event.Type() == "m.room.tombstone" && event.StateKeyEquals(""): + // Handle room upgrades + oldRoomID := event.RoomID() + newRoomID := gjson.GetBytes(event.Content(), "replacement_room").Str + log.Debugf("XXX: received tombstone event: %s -> %s", oldRoomID, newRoomID) + s.handleRoomUpgrade(ctx, oldRoomID, newRoomID, members, roomSize) + } // TODO: run in parallel with localRoomMembers.