mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-16 10:33:11 -06:00
Fix statekey usage in syncapi/notifier
This commit is contained in:
parent
dde8c38800
commit
a7113dc8a6
|
|
@ -20,6 +20,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
rstypes "github.com/matrix-org/dendrite/roomserver/types"
|
rstypes "github.com/matrix-org/dendrite/roomserver/types"
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
|
|
@ -36,7 +37,8 @@ import (
|
||||||
// the event, but the token has already advanced by the time they fetch it, resulting
|
// the event, but the token has already advanced by the time they fetch it, resulting
|
||||||
// in missed events.
|
// in missed events.
|
||||||
type Notifier struct {
|
type Notifier struct {
|
||||||
lock *sync.RWMutex
|
lock *sync.RWMutex
|
||||||
|
rsAPI api.SyncRoomserverAPI
|
||||||
// A map of RoomID => Set<UserID> : Must only be accessed by the OnNewEvent goroutine
|
// A map of RoomID => Set<UserID> : Must only be accessed by the OnNewEvent goroutine
|
||||||
roomIDToJoinedUsers map[string]*userIDSet
|
roomIDToJoinedUsers map[string]*userIDSet
|
||||||
// A map of RoomID => Set<UserID> : Must only be accessed by the OnNewEvent goroutine
|
// A map of RoomID => Set<UserID> : Must only be accessed by the OnNewEvent goroutine
|
||||||
|
|
@ -55,8 +57,9 @@ type Notifier struct {
|
||||||
// NewNotifier creates a new notifier set to the given sync position.
|
// NewNotifier creates a new notifier set to the given sync position.
|
||||||
// In order for this to be of any use, the Notifier needs to be told all rooms and
|
// In order for this to be of any use, the Notifier needs to be told all rooms and
|
||||||
// the joined users within each of them by calling Notifier.Load(*storage.SyncServerDatabase).
|
// the joined users within each of them by calling Notifier.Load(*storage.SyncServerDatabase).
|
||||||
func NewNotifier() *Notifier {
|
func NewNotifier(rsAPI api.SyncRoomserverAPI) *Notifier {
|
||||||
return &Notifier{
|
return &Notifier{
|
||||||
|
rsAPI: rsAPI,
|
||||||
roomIDToJoinedUsers: make(map[string]*userIDSet),
|
roomIDToJoinedUsers: make(map[string]*userIDSet),
|
||||||
roomIDToPeekingDevices: make(map[string]peekingDeviceSet),
|
roomIDToPeekingDevices: make(map[string]peekingDeviceSet),
|
||||||
userDeviceStreams: make(map[string]map[string]*UserDeviceStream),
|
userDeviceStreams: make(map[string]map[string]*UserDeviceStream),
|
||||||
|
|
@ -104,26 +107,32 @@ func (n *Notifier) OnNewEvent(
|
||||||
peekingDevicesToNotify := n._peekingDevices(ev.RoomID())
|
peekingDevicesToNotify := n._peekingDevices(ev.RoomID())
|
||||||
// If this is an invite, also add in the invitee to this list.
|
// If this is an invite, also add in the invitee to this list.
|
||||||
if ev.Type() == "m.room.member" && ev.StateKey() != nil {
|
if ev.Type() == "m.room.member" && ev.StateKey() != nil {
|
||||||
targetUserID := *ev.StateKey()
|
targetUserID, err := n.rsAPI.QueryUserIDForSender(context.Background(), ev.RoomID(), spec.SenderID(*ev.StateKey()))
|
||||||
membership, err := ev.Membership()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).WithField("event_id", ev.EventID()).Errorf(
|
log.WithError(err).WithField("event_id", ev.EventID()).Errorf(
|
||||||
"Notifier.OnNewEvent: Failed to unmarshal member event",
|
"Notifier.OnNewEvent: Failed to find the userID for this event",
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
// Keep the joined user map up-to-date
|
membership, err := ev.Membership()
|
||||||
switch membership {
|
if err != nil {
|
||||||
case spec.Invite:
|
log.WithError(err).WithField("event_id", ev.EventID()).Errorf(
|
||||||
usersToNotify = append(usersToNotify, targetUserID)
|
"Notifier.OnNewEvent: Failed to unmarshal member event",
|
||||||
case spec.Join:
|
)
|
||||||
// Manually append the new user's ID so they get notified
|
} else {
|
||||||
// along all members in the room
|
// Keep the joined user map up-to-date
|
||||||
usersToNotify = append(usersToNotify, targetUserID)
|
switch membership {
|
||||||
n._addJoinedUser(ev.RoomID(), targetUserID)
|
case spec.Invite:
|
||||||
case spec.Leave:
|
usersToNotify = append(usersToNotify, targetUserID.String())
|
||||||
fallthrough
|
case spec.Join:
|
||||||
case spec.Ban:
|
// Manually append the new user's ID so they get notified
|
||||||
n._removeJoinedUser(ev.RoomID(), targetUserID)
|
// along all members in the room
|
||||||
|
usersToNotify = append(usersToNotify, targetUserID.String())
|
||||||
|
n._addJoinedUser(ev.RoomID(), targetUserID.String())
|
||||||
|
case spec.Leave:
|
||||||
|
fallthrough
|
||||||
|
case spec.Ban:
|
||||||
|
n._removeJoinedUser(ev.RoomID(), targetUserID.String())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -22,9 +22,11 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
rstypes "github.com/matrix-org/dendrite/roomserver/types"
|
rstypes "github.com/matrix-org/dendrite/roomserver/types"
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||||
|
"github.com/matrix-org/gomatrixserverlib/spec"
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -105,9 +107,15 @@ func mustEqualPositions(t *testing.T, got, want types.StreamingToken) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type TestRoomServer struct{ api.SyncRoomserverAPI }
|
||||||
|
|
||||||
|
func (t *TestRoomServer) QueryUserIDForSender(ctx context.Context, roomID string, senderID spec.SenderID) (*spec.UserID, error) {
|
||||||
|
return spec.NewUserID(string(senderID), true)
|
||||||
|
}
|
||||||
|
|
||||||
// Test that the current position is returned if a request is already behind.
|
// Test that the current position is returned if a request is already behind.
|
||||||
func TestImmediateNotification(t *testing.T) {
|
func TestImmediateNotification(t *testing.T) {
|
||||||
n := NewNotifier()
|
n := NewNotifier(&TestRoomServer{})
|
||||||
n.SetCurrentPosition(syncPositionBefore)
|
n.SetCurrentPosition(syncPositionBefore)
|
||||||
pos, err := waitForEvents(n, newTestSyncRequest(alice, aliceDev, syncPositionVeryOld))
|
pos, err := waitForEvents(n, newTestSyncRequest(alice, aliceDev, syncPositionVeryOld))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -118,7 +126,7 @@ func TestImmediateNotification(t *testing.T) {
|
||||||
|
|
||||||
// Test that new events to a joined room unblocks the request.
|
// Test that new events to a joined room unblocks the request.
|
||||||
func TestNewEventAndJoinedToRoom(t *testing.T) {
|
func TestNewEventAndJoinedToRoom(t *testing.T) {
|
||||||
n := NewNotifier()
|
n := NewNotifier(&TestRoomServer{})
|
||||||
n.SetCurrentPosition(syncPositionBefore)
|
n.SetCurrentPosition(syncPositionBefore)
|
||||||
n.setUsersJoinedToRooms(map[string][]string{
|
n.setUsersJoinedToRooms(map[string][]string{
|
||||||
roomID: {alice, bob},
|
roomID: {alice, bob},
|
||||||
|
|
@ -144,7 +152,7 @@ func TestNewEventAndJoinedToRoom(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCorrectStream(t *testing.T) {
|
func TestCorrectStream(t *testing.T) {
|
||||||
n := NewNotifier()
|
n := NewNotifier(&TestRoomServer{})
|
||||||
n.SetCurrentPosition(syncPositionBefore)
|
n.SetCurrentPosition(syncPositionBefore)
|
||||||
stream := lockedFetchUserStream(n, bob, bobDev)
|
stream := lockedFetchUserStream(n, bob, bobDev)
|
||||||
if stream.UserID != bob {
|
if stream.UserID != bob {
|
||||||
|
|
@ -156,7 +164,7 @@ func TestCorrectStream(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCorrectStreamWakeup(t *testing.T) {
|
func TestCorrectStreamWakeup(t *testing.T) {
|
||||||
n := NewNotifier()
|
n := NewNotifier(&TestRoomServer{})
|
||||||
n.SetCurrentPosition(syncPositionBefore)
|
n.SetCurrentPosition(syncPositionBefore)
|
||||||
awoken := make(chan string)
|
awoken := make(chan string)
|
||||||
|
|
||||||
|
|
@ -184,7 +192,7 @@ func TestCorrectStreamWakeup(t *testing.T) {
|
||||||
|
|
||||||
// Test that an invite unblocks the request
|
// Test that an invite unblocks the request
|
||||||
func TestNewInviteEventForUser(t *testing.T) {
|
func TestNewInviteEventForUser(t *testing.T) {
|
||||||
n := NewNotifier()
|
n := NewNotifier(&TestRoomServer{})
|
||||||
n.SetCurrentPosition(syncPositionBefore)
|
n.SetCurrentPosition(syncPositionBefore)
|
||||||
n.setUsersJoinedToRooms(map[string][]string{
|
n.setUsersJoinedToRooms(map[string][]string{
|
||||||
roomID: {alice, bob},
|
roomID: {alice, bob},
|
||||||
|
|
@ -241,7 +249,7 @@ func TestEDUWakeup(t *testing.T) {
|
||||||
|
|
||||||
// Test that all blocked requests get woken up on a new event.
|
// Test that all blocked requests get woken up on a new event.
|
||||||
func TestMultipleRequestWakeup(t *testing.T) {
|
func TestMultipleRequestWakeup(t *testing.T) {
|
||||||
n := NewNotifier()
|
n := NewNotifier(&TestRoomServer{})
|
||||||
n.SetCurrentPosition(syncPositionBefore)
|
n.SetCurrentPosition(syncPositionBefore)
|
||||||
n.setUsersJoinedToRooms(map[string][]string{
|
n.setUsersJoinedToRooms(map[string][]string{
|
||||||
roomID: {alice, bob},
|
roomID: {alice, bob},
|
||||||
|
|
@ -278,7 +286,7 @@ func TestMultipleRequestWakeup(t *testing.T) {
|
||||||
func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
|
func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
|
||||||
// listen as bob. Make bob leave room. Make alice send event to room.
|
// listen as bob. Make bob leave room. Make alice send event to room.
|
||||||
// Make sure alice gets woken up only and not bob as well.
|
// Make sure alice gets woken up only and not bob as well.
|
||||||
n := NewNotifier()
|
n := NewNotifier(&TestRoomServer{})
|
||||||
n.SetCurrentPosition(syncPositionBefore)
|
n.SetCurrentPosition(syncPositionBefore)
|
||||||
n.setUsersJoinedToRooms(map[string][]string{
|
n.setUsersJoinedToRooms(map[string][]string{
|
||||||
roomID: {alice, bob},
|
roomID: {alice, bob},
|
||||||
|
|
|
||||||
|
|
@ -60,7 +60,7 @@ func AddPublicRoutes(
|
||||||
}
|
}
|
||||||
|
|
||||||
eduCache := caching.NewTypingCache()
|
eduCache := caching.NewTypingCache()
|
||||||
notifier := notifier.NewNotifier()
|
notifier := notifier.NewNotifier(rsAPI)
|
||||||
streams := streams.NewSyncStreamProviders(syncDB, userAPI, rsAPI, eduCache, caches, notifier)
|
streams := streams.NewSyncStreamProviders(syncDB, userAPI, rsAPI, eduCache, caches, notifier)
|
||||||
notifier.SetCurrentPosition(streams.Latest(context.Background()))
|
notifier.SetCurrentPosition(streams.Latest(context.Background()))
|
||||||
if err = notifier.Load(context.Background(), syncDB); err != nil {
|
if err = notifier.Load(context.Background(), syncDB); err != nil {
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue