mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-01 03:03:10 -06:00
Add ignore users
This commit is contained in:
parent
16e2d243fc
commit
918576a96a
|
|
@ -54,6 +54,10 @@ func (p *InviteStreamProvider) IncrementalSync(
|
|||
}
|
||||
|
||||
for roomID, inviteEvent := range invites {
|
||||
// skip ignored user events
|
||||
if _, ok := req.IgnoredUsers[inviteEvent.Sender()]; ok {
|
||||
continue
|
||||
}
|
||||
ir := types.NewInviteResponse(inviteEvent)
|
||||
req.Response.Rooms.Invite[roomID] = *ir
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ package streams
|
|||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
|
@ -25,6 +26,7 @@ type PDUStreamProvider struct {
|
|||
|
||||
tasks chan func()
|
||||
workers atomic.Int32
|
||||
userAPI userapi.UserInternalAPI
|
||||
}
|
||||
|
||||
func (p *PDUStreamProvider) worker() {
|
||||
|
|
@ -87,6 +89,10 @@ func (p *PDUStreamProvider) CompleteSync(
|
|||
stateFilter := req.Filter.Room.State
|
||||
eventFilter := req.Filter.Room.Timeline
|
||||
|
||||
if err = p.addIgnoredUsersToFilter(ctx, req, &eventFilter); err != nil {
|
||||
req.Log.WithError(err).Error("unable to update event filter with ignored users")
|
||||
}
|
||||
|
||||
// Build up a /sync response. Add joined rooms.
|
||||
var reqMutex sync.Mutex
|
||||
var reqWaitGroup sync.WaitGroup
|
||||
|
|
@ -175,6 +181,10 @@ func (p *PDUStreamProvider) IncrementalSync(
|
|||
return to
|
||||
}
|
||||
|
||||
if err = p.addIgnoredUsersToFilter(ctx, req, &eventFilter); err != nil {
|
||||
req.Log.WithError(err).Error("unable to update event filter with ignored users")
|
||||
}
|
||||
|
||||
newPos = from
|
||||
for _, delta := range stateDeltas {
|
||||
var pos types.StreamPosition
|
||||
|
|
@ -402,6 +412,30 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
|
|||
return jr, nil
|
||||
}
|
||||
|
||||
// addIgnoredUsersToFilter adds ignored users to the eventfilter and
|
||||
// the syncreq itself for further use in streams.
|
||||
func (p *PDUStreamProvider) addIgnoredUsersToFilter(ctx context.Context, req *types.SyncRequest, eventFilter *gomatrixserverlib.RoomEventFilter) error {
|
||||
accountData := userapi.QueryAccountDataResponse{}
|
||||
err := p.userAPI.QueryAccountData(ctx, &userapi.QueryAccountDataRequest{
|
||||
UserID: req.Device.UserID, RoomID: "", DataType: "m.ignored_user_list",
|
||||
}, &accountData)
|
||||
if err != nil {
|
||||
req.Log.WithError(err).Error("unable to query ignored users")
|
||||
return err
|
||||
}
|
||||
if data, ok := accountData.GlobalAccountData["m.ignored_user_list"]; ok {
|
||||
err = json.Unmarshal(data, &req)
|
||||
if err != nil {
|
||||
req.Log.WithError(err).Error("unable to parse json")
|
||||
return err
|
||||
}
|
||||
for userID := range req.IgnoredUsers {
|
||||
eventFilter.NotSenders = append(eventFilter.NotSenders, userID)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func removeDuplicates(stateEvents, recentEvents []*gomatrixserverlib.HeaderedEvent) []*gomatrixserverlib.HeaderedEvent {
|
||||
for _, recentEv := range recentEvents {
|
||||
if recentEv.StateKey() == nil {
|
||||
|
|
|
|||
|
|
@ -54,6 +54,10 @@ func (p *ReceiptStreamProvider) IncrementalSync(
|
|||
// Group receipts by room, so we can create one ClientEvent for every room
|
||||
receiptsByRoom := make(map[string][]types.OutputReceiptEvent)
|
||||
for _, receipt := range receipts {
|
||||
// skip ignored user events
|
||||
if _, ok := req.IgnoredUsers[receipt.UserID]; ok {
|
||||
continue
|
||||
}
|
||||
receiptsByRoom[receipt.RoomID] = append(receiptsByRoom[receipt.RoomID], receipt)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -48,6 +48,10 @@ func (p *SendToDeviceStreamProvider) IncrementalSync(
|
|||
|
||||
// Add the updates into the sync response.
|
||||
for _, event := range events {
|
||||
// skip ignored user events
|
||||
if _, ok := req.IgnoredUsers[event.Sender]; ok {
|
||||
continue
|
||||
}
|
||||
req.Response.ToDevice.Events = append(req.Response.ToDevice.Events, event.SendToDeviceEvent)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -40,11 +40,18 @@ func (p *TypingStreamProvider) IncrementalSync(
|
|||
if users, updated := p.EDUCache.GetTypingUsersIfUpdatedAfter(
|
||||
roomID, int64(from),
|
||||
); updated {
|
||||
typingUsers := make([]string, 0, len(users))
|
||||
for i := range users {
|
||||
// skip ignored user events
|
||||
if _, ok := req.IgnoredUsers[users[i]]; !ok {
|
||||
typingUsers = append(typingUsers, users[i])
|
||||
}
|
||||
}
|
||||
ev := gomatrixserverlib.ClientEvent{
|
||||
Type: gomatrixserverlib.MTyping,
|
||||
}
|
||||
ev.Content, err = json.Marshal(map[string]interface{}{
|
||||
"user_ids": users,
|
||||
"user_ids": typingUsers,
|
||||
})
|
||||
if err != nil {
|
||||
req.Log.WithError(err).Error("json.Marshal failed")
|
||||
|
|
|
|||
|
|
@ -30,6 +30,7 @@ func NewSyncStreamProviders(
|
|||
streams := &Streams{
|
||||
PDUStreamProvider: &PDUStreamProvider{
|
||||
StreamProvider: StreamProvider{DB: d},
|
||||
userAPI: userAPI,
|
||||
},
|
||||
TypingStreamProvider: &TypingStreamProvider{
|
||||
StreamProvider: StreamProvider{DB: d},
|
||||
|
|
|
|||
|
|
@ -21,6 +21,8 @@ type SyncRequest struct {
|
|||
|
||||
// Updated by the PDU stream.
|
||||
Rooms map[string]string
|
||||
// Updated by the PDU stream.
|
||||
IgnoredUsers map[string]interface{}
|
||||
}
|
||||
|
||||
type StreamProvider interface {
|
||||
|
|
|
|||
Loading…
Reference in a new issue