diff --git a/syncapi/streams/stream_invite.go b/syncapi/streams/stream_invite.go index 70374c6a7..147f33fc7 100644 --- a/syncapi/streams/stream_invite.go +++ b/syncapi/streams/stream_invite.go @@ -54,6 +54,10 @@ func (p *InviteStreamProvider) IncrementalSync( } for roomID, inviteEvent := range invites { + // skip ignored user events + if _, ok := req.IgnoredUsers[inviteEvent.Sender()]; ok { + continue + } ir := types.NewInviteResponse(inviteEvent) req.Response.Rooms.Invite[roomID] = *ir } diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index d23209af3..ae3e7b5fa 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -2,6 +2,7 @@ package streams import ( "context" + "encoding/json" "sync" "time" @@ -25,6 +26,7 @@ type PDUStreamProvider struct { tasks chan func() workers atomic.Int32 + userAPI userapi.UserInternalAPI } func (p *PDUStreamProvider) worker() { @@ -87,6 +89,10 @@ func (p *PDUStreamProvider) CompleteSync( stateFilter := req.Filter.Room.State eventFilter := req.Filter.Room.Timeline + if err = p.addIgnoredUsersToFilter(ctx, req, &eventFilter); err != nil { + req.Log.WithError(err).Error("unable to update event filter with ignored users") + } + // Build up a /sync response. Add joined rooms. var reqMutex sync.Mutex var reqWaitGroup sync.WaitGroup @@ -175,6 +181,10 @@ func (p *PDUStreamProvider) IncrementalSync( return to } + if err = p.addIgnoredUsersToFilter(ctx, req, &eventFilter); err != nil { + req.Log.WithError(err).Error("unable to update event filter with ignored users") + } + newPos = from for _, delta := range stateDeltas { var pos types.StreamPosition @@ -402,6 +412,30 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync( return jr, nil } +// addIgnoredUsersToFilter adds ignored users to the eventfilter and +// the syncreq itself for further use in streams. +func (p *PDUStreamProvider) addIgnoredUsersToFilter(ctx context.Context, req *types.SyncRequest, eventFilter *gomatrixserverlib.RoomEventFilter) error { + accountData := userapi.QueryAccountDataResponse{} + err := p.userAPI.QueryAccountData(ctx, &userapi.QueryAccountDataRequest{ + UserID: req.Device.UserID, RoomID: "", DataType: "m.ignored_user_list", + }, &accountData) + if err != nil { + req.Log.WithError(err).Error("unable to query ignored users") + return err + } + if data, ok := accountData.GlobalAccountData["m.ignored_user_list"]; ok { + err = json.Unmarshal(data, &req) + if err != nil { + req.Log.WithError(err).Error("unable to parse json") + return err + } + for userID := range req.IgnoredUsers { + eventFilter.NotSenders = append(eventFilter.NotSenders, userID) + } + } + return nil +} + func removeDuplicates(stateEvents, recentEvents []*gomatrixserverlib.HeaderedEvent) []*gomatrixserverlib.HeaderedEvent { for _, recentEv := range recentEvents { if recentEv.StateKey() == nil { diff --git a/syncapi/streams/stream_receipt.go b/syncapi/streams/stream_receipt.go index 680f8cd8e..d65459530 100644 --- a/syncapi/streams/stream_receipt.go +++ b/syncapi/streams/stream_receipt.go @@ -54,6 +54,10 @@ func (p *ReceiptStreamProvider) IncrementalSync( // Group receipts by room, so we can create one ClientEvent for every room receiptsByRoom := make(map[string][]types.OutputReceiptEvent) for _, receipt := range receipts { + // skip ignored user events + if _, ok := req.IgnoredUsers[receipt.UserID]; ok { + continue + } receiptsByRoom[receipt.RoomID] = append(receiptsByRoom[receipt.RoomID], receipt) } diff --git a/syncapi/streams/stream_sendtodevice.go b/syncapi/streams/stream_sendtodevice.go index a3aaf3d7d..1eddeb9af 100644 --- a/syncapi/streams/stream_sendtodevice.go +++ b/syncapi/streams/stream_sendtodevice.go @@ -48,6 +48,10 @@ func (p *SendToDeviceStreamProvider) IncrementalSync( // Add the updates into the sync response. for _, event := range events { + // skip ignored user events + if _, ok := req.IgnoredUsers[event.Sender]; ok { + continue + } req.Response.ToDevice.Events = append(req.Response.ToDevice.Events, event.SendToDeviceEvent) } } diff --git a/syncapi/streams/stream_typing.go b/syncapi/streams/stream_typing.go index e46cd447b..276aad9ba 100644 --- a/syncapi/streams/stream_typing.go +++ b/syncapi/streams/stream_typing.go @@ -40,11 +40,18 @@ func (p *TypingStreamProvider) IncrementalSync( if users, updated := p.EDUCache.GetTypingUsersIfUpdatedAfter( roomID, int64(from), ); updated { + typingUsers := make([]string, 0, len(users)) + for i := range users { + // skip ignored user events + if _, ok := req.IgnoredUsers[users[i]]; !ok { + typingUsers = append(typingUsers, users[i]) + } + } ev := gomatrixserverlib.ClientEvent{ Type: gomatrixserverlib.MTyping, } ev.Content, err = json.Marshal(map[string]interface{}{ - "user_ids": users, + "user_ids": typingUsers, }) if err != nil { req.Log.WithError(err).Error("json.Marshal failed") diff --git a/syncapi/streams/streams.go b/syncapi/streams/streams.go index b2273aadb..02edafb0a 100644 --- a/syncapi/streams/streams.go +++ b/syncapi/streams/streams.go @@ -30,6 +30,7 @@ func NewSyncStreamProviders( streams := &Streams{ PDUStreamProvider: &PDUStreamProvider{ StreamProvider: StreamProvider{DB: d}, + userAPI: userAPI, }, TypingStreamProvider: &TypingStreamProvider{ StreamProvider: StreamProvider{DB: d}, diff --git a/syncapi/types/provider.go b/syncapi/types/provider.go index f6185fcb5..ae35b791f 100644 --- a/syncapi/types/provider.go +++ b/syncapi/types/provider.go @@ -21,6 +21,8 @@ type SyncRequest struct { // Updated by the PDU stream. Rooms map[string]string + // Updated by the PDU stream. + IgnoredUsers map[string]interface{} } type StreamProvider interface {