From 954b36a74c7a909b90e29ced517ff4d7f5f53ba1 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 6 Jan 2021 13:05:36 +0000 Subject: [PATCH] Delete notifier, other tweaks --- syncapi/consumers/clientapi.go | 9 +- syncapi/consumers/eduserver_receipts.go | 4 - syncapi/consumers/eduserver_sendtodevice.go | 17 +- syncapi/consumers/eduserver_typing.go | 12 +- syncapi/consumers/keychange.go | 13 +- syncapi/consumers/roomserver.go | 24 +- syncapi/storage/postgres/receipt_table.go | 2 +- syncapi/storage/shared/stream_pdu.go | 11 +- syncapi/storage/shared/stream_receipt.go | 20 +- syncapi/storage/shared/stream_typing.go | 4 +- syncapi/storage/sqlite3/receipt_table.go | 2 +- syncapi/sync/notifier.go | 467 -------------------- syncapi/sync/notifier_test.go | 374 ---------------- syncapi/sync/request.go | 34 +- syncapi/sync/requestpool.go | 77 ++-- syncapi/syncapi.go | 27 +- syncapi/types/provider.go | 17 +- 17 files changed, 125 insertions(+), 989 deletions(-) delete mode 100644 syncapi/sync/notifier.go delete mode 100644 syncapi/sync/notifier_test.go diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go index 9883c6b03..3765fe23f 100644 --- a/syncapi/consumers/clientapi.go +++ b/syncapi/consumers/clientapi.go @@ -23,8 +23,6 @@ import ( "github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/syncapi/storage" - "github.com/matrix-org/dendrite/syncapi/sync" - "github.com/matrix-org/dendrite/syncapi/types" log "github.com/sirupsen/logrus" ) @@ -32,14 +30,12 @@ import ( type OutputClientDataConsumer struct { clientAPIConsumer *internal.ContinualConsumer db storage.Database - notifier *sync.Notifier } // NewOutputClientDataConsumer creates a new OutputClientData consumer. Call Start() to begin consuming from room servers. func NewOutputClientDataConsumer( cfg *config.SyncAPI, kafkaConsumer sarama.Consumer, - n *sync.Notifier, store storage.Database, ) *OutputClientDataConsumer { @@ -52,7 +48,6 @@ func NewOutputClientDataConsumer( s := &OutputClientDataConsumer{ clientAPIConsumer: &consumer, db: store, - notifier: n, } consumer.ProcessMessage = s.onMessage @@ -92,7 +87,9 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error }).Panicf("could not save account data") } - s.notifier.OnNewEvent(nil, "", []string{string(msg.Key)}, types.StreamingToken{PDUPosition: pduPos}) + _ = pduPos + + //s.notifier.OnNewEvent(nil, "", []string{string(msg.Key)}, types.StreamingToken{PDUPosition: pduPos}) return nil } diff --git a/syncapi/consumers/eduserver_receipts.go b/syncapi/consumers/eduserver_receipts.go index 01e89b8a9..71d039803 100644 --- a/syncapi/consumers/eduserver_receipts.go +++ b/syncapi/consumers/eduserver_receipts.go @@ -23,7 +23,6 @@ import ( "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/syncapi/storage" - "github.com/matrix-org/dendrite/syncapi/sync" log "github.com/sirupsen/logrus" ) @@ -31,7 +30,6 @@ import ( type OutputReceiptEventConsumer struct { receiptConsumer *internal.ContinualConsumer db storage.Database - notifier *sync.Notifier } // NewOutputReceiptEventConsumer creates a new OutputReceiptEventConsumer. @@ -39,7 +37,6 @@ type OutputReceiptEventConsumer struct { func NewOutputReceiptEventConsumer( cfg *config.SyncAPI, kafkaConsumer sarama.Consumer, - n *sync.Notifier, store storage.Database, ) *OutputReceiptEventConsumer { @@ -53,7 +50,6 @@ func NewOutputReceiptEventConsumer( s := &OutputReceiptEventConsumer{ receiptConsumer: &consumer, db: store, - notifier: n, } consumer.ProcessMessage = s.onMessage diff --git a/syncapi/consumers/eduserver_sendtodevice.go b/syncapi/consumers/eduserver_sendtodevice.go index a375baf83..c9e9732e8 100644 --- a/syncapi/consumers/eduserver_sendtodevice.go +++ b/syncapi/consumers/eduserver_sendtodevice.go @@ -23,8 +23,6 @@ import ( "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/syncapi/storage" - "github.com/matrix-org/dendrite/syncapi/sync" - "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" log "github.com/sirupsen/logrus" @@ -35,7 +33,6 @@ type OutputSendToDeviceEventConsumer struct { sendToDeviceConsumer *internal.ContinualConsumer db storage.Database serverName gomatrixserverlib.ServerName // our server name - notifier *sync.Notifier } // NewOutputSendToDeviceEventConsumer creates a new OutputSendToDeviceEventConsumer. @@ -43,7 +40,6 @@ type OutputSendToDeviceEventConsumer struct { func NewOutputSendToDeviceEventConsumer( cfg *config.SyncAPI, kafkaConsumer sarama.Consumer, - n *sync.Notifier, store storage.Database, ) *OutputSendToDeviceEventConsumer { @@ -58,7 +54,6 @@ func NewOutputSendToDeviceEventConsumer( sendToDeviceConsumer: &consumer, db: store, serverName: cfg.Matrix.ServerName, - notifier: n, } consumer.ProcessMessage = s.onMessage @@ -102,11 +97,13 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(msg *sarama.ConsumerMessage) return err } - s.notifier.OnNewSendToDevice( - output.UserID, - []string{output.DeviceID}, - types.StreamingToken{SendToDevicePosition: streamPos}, - ) + //s.notifier.OnNewSendToDevice( + // output.UserID, + // []string{output.DeviceID}, + // types.StreamingToken{SendToDevicePosition: streamPos}, + //) + + _ = streamPos return nil } diff --git a/syncapi/consumers/eduserver_typing.go b/syncapi/consumers/eduserver_typing.go index 74b2a1905..a27b6090e 100644 --- a/syncapi/consumers/eduserver_typing.go +++ b/syncapi/consumers/eduserver_typing.go @@ -22,7 +22,6 @@ import ( "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/syncapi/storage" - "github.com/matrix-org/dendrite/syncapi/sync" "github.com/matrix-org/dendrite/syncapi/types" log "github.com/sirupsen/logrus" ) @@ -31,7 +30,6 @@ import ( type OutputTypingEventConsumer struct { typingConsumer *internal.ContinualConsumer db storage.Database - notifier *sync.Notifier } // NewOutputTypingEventConsumer creates a new OutputTypingEventConsumer. @@ -39,7 +37,6 @@ type OutputTypingEventConsumer struct { func NewOutputTypingEventConsumer( cfg *config.SyncAPI, kafkaConsumer sarama.Consumer, - n *sync.Notifier, store storage.Database, ) *OutputTypingEventConsumer { @@ -53,7 +50,6 @@ func NewOutputTypingEventConsumer( s := &OutputTypingEventConsumer{ typingConsumer: &consumer, db: store, - notifier: n, } consumer.ProcessMessage = s.onMessage @@ -63,9 +59,11 @@ func NewOutputTypingEventConsumer( // Start consuming from EDU api func (s *OutputTypingEventConsumer) Start() error { - s.db.SetTypingTimeoutCallback(func(userID, roomID string, latestSyncPosition int64) { - s.notifier.OnNewTyping(roomID, types.StreamingToken{TypingPosition: types.StreamPosition(latestSyncPosition)}) - }) + /* + s.db.SetTypingTimeoutCallback(func(userID, roomID string, latestSyncPosition int64) { + s.notifier.OnNewTyping(roomID, types.StreamingToken{TypingPosition: types.StreamPosition(latestSyncPosition)}) + }) + */ return s.typingConsumer.Start() } diff --git a/syncapi/consumers/keychange.go b/syncapi/consumers/keychange.go index 59cd583d1..cec89eedb 100644 --- a/syncapi/consumers/keychange.go +++ b/syncapi/consumers/keychange.go @@ -24,7 +24,6 @@ import ( "github.com/matrix-org/dendrite/keyserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/storage" - syncapi "github.com/matrix-org/dendrite/syncapi/sync" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" log "github.com/sirupsen/logrus" @@ -39,7 +38,6 @@ type OutputKeyChangeEventConsumer struct { keyAPI api.KeyInternalAPI partitionToOffset map[int32]int64 partitionToOffsetMu sync.Mutex - notifier *syncapi.Notifier } // NewOutputKeyChangeEventConsumer creates a new OutputKeyChangeEventConsumer. @@ -48,7 +46,6 @@ func NewOutputKeyChangeEventConsumer( serverName gomatrixserverlib.ServerName, topic string, kafkaConsumer sarama.Consumer, - n *syncapi.Notifier, keyAPI api.KeyInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI, store storage.Database, @@ -69,7 +66,6 @@ func NewOutputKeyChangeEventConsumer( rsAPI: rsAPI, partitionToOffset: make(map[int32]int64), partitionToOffsetMu: sync.Mutex{}, - notifier: n, } consumer.ProcessMessage = s.onMessage @@ -120,8 +116,11 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er Partition: msg.Partition, }, } - for userID := range queryRes.UserIDsToCount { - s.notifier.OnNewKeyChange(posUpdate, userID, output.UserID) - } + + _ = posUpdate + + //for userID := range queryRes.UserIDsToCount { + // s.notifier.OnNewKeyChange(posUpdate, userID, output.UserID) + //} return nil } diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index b3b82b1bc..f181d8da1 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -24,7 +24,6 @@ import ( "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/syncapi/storage" - "github.com/matrix-org/dendrite/syncapi/sync" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" log "github.com/sirupsen/logrus" @@ -36,14 +35,12 @@ type OutputRoomEventConsumer struct { rsAPI api.RoomserverInternalAPI rsConsumer *internal.ContinualConsumer db storage.Database - notifier *sync.Notifier } // NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers. func NewOutputRoomEventConsumer( cfg *config.SyncAPI, kafkaConsumer sarama.Consumer, - n *sync.Notifier, store storage.Database, rsAPI api.RoomserverInternalAPI, ) *OutputRoomEventConsumer { @@ -58,7 +55,6 @@ func NewOutputRoomEventConsumer( cfg: cfg, rsConsumer: &consumer, db: store, - notifier: n, rsAPI: rsAPI, } consumer.ProcessMessage = s.onMessage @@ -274,7 +270,9 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent( }).Panicf("roomserver output log: write invite failure") return nil } - s.notifier.OnNewInvite(types.StreamingToken{InvitePosition: pduPos}, *msg.Event.StateKey()) + + _ = pduPos + //s.notifier.OnNewInvite(types.StreamingToken{InvitePosition: pduPos}, *msg.Event.StateKey()) return nil } @@ -292,7 +290,9 @@ func (s *OutputRoomEventConsumer) onRetireInviteEvent( } // Notify any active sync requests that the invite has been retired. // Invites share the same stream counter as PDUs - s.notifier.OnNewInvite(types.StreamingToken{InvitePosition: pduPos}, msg.TargetUserID) + + _ = pduPos + //s.notifier.OnNewInvite(types.StreamingToken{InvitePosition: pduPos}, msg.TargetUserID) return nil } @@ -308,11 +308,11 @@ func (s *OutputRoomEventConsumer) onNewPeek( return nil } // tell the notifier about the new peek so it knows to wake up new devices - s.notifier.OnNewPeek(msg.RoomID, msg.UserID, msg.DeviceID) - + //s.notifier.OnNewPeek(msg.RoomID, msg.UserID, msg.DeviceID) + _ = sp // we need to wake up the users who might need to now be peeking into this room, // so we send in a dummy event to trigger a wakeup - s.notifier.OnNewEvent(nil, msg.RoomID, nil, types.StreamingToken{PDUPosition: sp}) + //s.notifier.OnNewEvent(nil, msg.RoomID, nil, types.StreamingToken{PDUPosition: sp}) return nil } @@ -328,11 +328,11 @@ func (s *OutputRoomEventConsumer) onRetirePeek( return nil } // tell the notifier about the new peek so it knows to wake up new devices - s.notifier.OnRetirePeek(msg.RoomID, msg.UserID, msg.DeviceID) - + //s.notifier.OnRetirePeek(msg.RoomID, msg.UserID, msg.DeviceID) + _ = sp // we need to wake up the users who might need to now be peeking into this room, // so we send in a dummy event to trigger a wakeup - s.notifier.OnNewEvent(nil, msg.RoomID, nil, types.StreamingToken{PDUPosition: sp}) + //s.notifier.OnNewEvent(nil, msg.RoomID, nil, types.StreamingToken{PDUPosition: sp}) return nil } diff --git a/syncapi/storage/postgres/receipt_table.go b/syncapi/storage/postgres/receipt_table.go index 73bf4179e..f93081e1a 100644 --- a/syncapi/storage/postgres/receipt_table.go +++ b/syncapi/storage/postgres/receipt_table.go @@ -96,7 +96,7 @@ func (r *receiptStatements) UpsertReceipt(ctx context.Context, txn *sql.Tx, room } func (r *receiptStatements) SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []api.OutputReceiptEvent, error) { - lastPos := types.StreamPosition(0) + lastPos := streamPos rows, err := r.selectRoomReceipts.QueryContext(ctx, pq.Array(roomIDs), streamPos) if err != nil { return 0, nil, fmt.Errorf("unable to query room receipts: %w", err) diff --git a/syncapi/storage/shared/stream_pdu.go b/syncapi/storage/shared/stream_pdu.go index e2796376e..08d7bbe59 100644 --- a/syncapi/storage/shared/stream_pdu.go +++ b/syncapi/storage/shared/stream_pdu.go @@ -41,6 +41,7 @@ func (p *PDUStreamProvider) StreamAdvance( } } +// nolint:gocyclo func (p *PDUStreamProvider) StreamRange( ctx context.Context, req *types.StreamRangeRequest, @@ -63,12 +64,12 @@ func (p *PDUStreamProvider) StreamRange( // TODO: use filter provided in request stateFilter := gomatrixserverlib.DefaultStateFilter() - if from.IsEmpty() { - if stateDeltas, joinedRooms, err = p.DB.getStateDeltas(ctx, req.Device, nil, r, req.Device.UserID, &stateFilter); err != nil { + if req.WantFullState { + if stateDeltas, joinedRooms, err = p.DB.getStateDeltasForFullStateSync(ctx, req.Device, nil, r, req.Device.UserID, &stateFilter); err != nil { return } } else { - if stateDeltas, joinedRooms, err = p.DB.getStateDeltasForFullStateSync(ctx, req.Device, nil, r, req.Device.UserID, &stateFilter); err != nil { + if stateDeltas, joinedRooms, err = p.DB.getStateDeltas(ctx, req.Device, nil, r, req.Device.UserID, &stateFilter); err != nil { return } } @@ -93,7 +94,7 @@ func (p *PDUStreamProvider) StreamRange( } } - for _, event := range events { + for _, event := range p.DB.StreamEventsToEvents(req.Device, events) { room.Timeline.Events = append( room.Timeline.Events, gomatrixserverlib.ToClientEvent( @@ -101,7 +102,9 @@ func (p *PDUStreamProvider) StreamRange( gomatrixserverlib.FormatSync, ), ) + } + for _, event := range events { if event.StreamPosition > newPos.PDUPosition { newPos.PDUPosition = event.StreamPosition } diff --git a/syncapi/storage/shared/stream_receipt.go b/syncapi/storage/shared/stream_receipt.go index edaafd173..6c5cb9502 100644 --- a/syncapi/storage/shared/stream_receipt.go +++ b/syncapi/storage/shared/stream_receipt.go @@ -53,9 +53,13 @@ func (p *ReceiptStreamProvider) StreamRange( } } - lastPos, receipts, err := p.DB.Receipts.SelectRoomReceiptsAfter(context.TODO(), joinedRooms, from.ReceiptPosition) + lastPos, receipts, err := p.DB.Receipts.SelectRoomReceiptsAfter(ctx, joinedRooms, from.ReceiptPosition) if err != nil { - return types.StreamingToken{} //fmt.Errorf("unable to select receipts for rooms: %w", err) + return to //fmt.Errorf("unable to select receipts for rooms: %w", err) + } + + if len(receipts) == 0 || lastPos == 0 { + return to } // Group receipts by room, so we can create one ClientEvent for every room @@ -85,21 +89,15 @@ func (p *ReceiptStreamProvider) StreamRange( } ev.Content, err = json.Marshal(content) if err != nil { - return types.StreamingToken{} // err + return to // err } jr.Ephemeral.Events = append(jr.Ephemeral.Events, ev) req.Response.Rooms.Join[roomID] = jr } - if lastPos > 0 { - return types.StreamingToken{ - ReceiptPosition: lastPos, - } - } else { - return types.StreamingToken{ - ReceiptPosition: to.ReceiptPosition, - } + return types.StreamingToken{ + ReceiptPosition: lastPos, } } diff --git a/syncapi/storage/shared/stream_typing.go b/syncapi/storage/shared/stream_typing.go index 2f304176a..c1686b165 100644 --- a/syncapi/storage/shared/stream_typing.go +++ b/syncapi/storage/shared/stream_typing.go @@ -54,7 +54,9 @@ func (p *TypingStreamProvider) StreamRange( "user_ids": users, }) if err != nil { - return types.StreamingToken{} + return types.StreamingToken{ + TypingPosition: from.TypingPosition, + } } jr.Ephemeral.Events = append(jr.Ephemeral.Events, ev) diff --git a/syncapi/storage/sqlite3/receipt_table.go b/syncapi/storage/sqlite3/receipt_table.go index 69fc4e9d0..6b39ee879 100644 --- a/syncapi/storage/sqlite3/receipt_table.go +++ b/syncapi/storage/sqlite3/receipt_table.go @@ -101,7 +101,7 @@ func (r *receiptStatements) UpsertReceipt(ctx context.Context, txn *sql.Tx, room // SelectRoomReceiptsAfter select all receipts for a given room after a specific timestamp func (r *receiptStatements) SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []api.OutputReceiptEvent, error) { selectSQL := strings.Replace(selectRoomReceipts, "($2)", sqlutil.QueryVariadicOffset(len(roomIDs), 1), 1) - lastPos := types.StreamPosition(0) + lastPos := streamPos params := make([]interface{}, len(roomIDs)+1) params[0] = streamPos for k, v := range roomIDs { diff --git a/syncapi/sync/notifier.go b/syncapi/sync/notifier.go deleted file mode 100644 index 66460a8db..000000000 --- a/syncapi/sync/notifier.go +++ /dev/null @@ -1,467 +0,0 @@ -// Copyright 2017 Vector Creations Ltd -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package sync - -import ( - "context" - "sync" - "time" - - "github.com/matrix-org/dendrite/syncapi/storage" - "github.com/matrix-org/dendrite/syncapi/types" - "github.com/matrix-org/gomatrixserverlib" - log "github.com/sirupsen/logrus" -) - -// Notifier will wake up sleeping requests when there is some new data. -// It does not tell requests what that data is, only the sync position which -// they can use to get at it. This is done to prevent races whereby we tell the caller -// the event, but the token has already advanced by the time they fetch it, resulting -// in missed events. -type Notifier struct { - // A map of RoomID => Set : Must only be accessed by the OnNewEvent goroutine - roomIDToJoinedUsers map[string]userIDSet - // A map of RoomID => Set : Must only be accessed by the OnNewEvent goroutine - roomIDToPeekingDevices map[string]peekingDeviceSet - // Protects currPos and userStreams. - streamLock *sync.Mutex - // The latest sync position - currPos types.StreamingToken - // A map of user_id => device_id => UserStream which can be used to wake a given user's /sync request. - userDeviceStreams map[string]map[string]*UserDeviceStream - // The last time we cleaned out stale entries from the userStreams map - lastCleanUpTime time.Time -} - -// NewNotifier creates a new notifier set to the given sync position. -// In order for this to be of any use, the Notifier needs to be told all rooms and -// the joined users within each of them by calling Notifier.Load(*storage.SyncServerDatabase). -func NewNotifier(pos types.StreamingToken) *Notifier { - return &Notifier{ - currPos: pos, - roomIDToJoinedUsers: make(map[string]userIDSet), - roomIDToPeekingDevices: make(map[string]peekingDeviceSet), - userDeviceStreams: make(map[string]map[string]*UserDeviceStream), - streamLock: &sync.Mutex{}, - lastCleanUpTime: time.Now(), - } -} - -// OnNewEvent is called when a new event is received from the room server. Must only be -// called from a single goroutine, to avoid races between updates which could set the -// current sync position incorrectly. -// Chooses which user sync streams to update by a provided *gomatrixserverlib.Event -// (based on the users in the event's room), -// a roomID directly, or a list of user IDs, prioritised by parameter ordering. -// posUpdate contains the latest position(s) for one or more types of events. -// If a position in posUpdate is 0, it means no updates are available of that type. -// Typically a consumer supplies a posUpdate with the latest sync position for the -// event type it handles, leaving other fields as 0. -func (n *Notifier) OnNewEvent( - ev *gomatrixserverlib.HeaderedEvent, roomID string, userIDs []string, - posUpdate types.StreamingToken, -) { - // update the current position then notify relevant /sync streams. - // This needs to be done PRIOR to waking up users as they will read this value. - n.streamLock.Lock() - defer n.streamLock.Unlock() - - n.currPos.ApplyUpdates(posUpdate) - n.removeEmptyUserStreams() - - if ev != nil { - // Map this event's room_id to a list of joined users, and wake them up. - usersToNotify := n.joinedUsers(ev.RoomID()) - // Map this event's room_id to a list of peeking devices, and wake them up. - peekingDevicesToNotify := n.PeekingDevices(ev.RoomID()) - // If this is an invite, also add in the invitee to this list. - if ev.Type() == "m.room.member" && ev.StateKey() != nil { - targetUserID := *ev.StateKey() - membership, err := ev.Membership() - if err != nil { - log.WithError(err).WithField("event_id", ev.EventID()).Errorf( - "Notifier.OnNewEvent: Failed to unmarshal member event", - ) - } else { - // Keep the joined user map up-to-date - switch membership { - case gomatrixserverlib.Invite: - usersToNotify = append(usersToNotify, targetUserID) - case gomatrixserverlib.Join: - // Manually append the new user's ID so they get notified - // along all members in the room - usersToNotify = append(usersToNotify, targetUserID) - n.addJoinedUser(ev.RoomID(), targetUserID) - case gomatrixserverlib.Leave: - fallthrough - case gomatrixserverlib.Ban: - n.removeJoinedUser(ev.RoomID(), targetUserID) - } - } - } - - n.wakeupUsers(usersToNotify, peekingDevicesToNotify, n.currPos) - } else if roomID != "" { - n.wakeupUsers(n.joinedUsers(roomID), n.PeekingDevices(roomID), n.currPos) - } else if len(userIDs) > 0 { - n.wakeupUsers(userIDs, nil, n.currPos) - } else { - log.WithFields(log.Fields{ - "posUpdate": posUpdate.String, - }).Warn("Notifier.OnNewEvent called but caller supplied no user to wake up") - } -} - -func (n *Notifier) OnNewPeek( - roomID, userID, deviceID string, -) { - n.streamLock.Lock() - defer n.streamLock.Unlock() - - n.addPeekingDevice(roomID, userID, deviceID) - - // we don't wake up devices here given the roomserver consumer will do this shortly afterwards - // by calling OnNewEvent. -} - -func (n *Notifier) OnRetirePeek( - roomID, userID, deviceID string, -) { - n.streamLock.Lock() - defer n.streamLock.Unlock() - - n.removePeekingDevice(roomID, userID, deviceID) - - // we don't wake up devices here given the roomserver consumer will do this shortly afterwards - // by calling OnRetireEvent. -} - -func (n *Notifier) OnNewSendToDevice( - userID string, deviceIDs []string, - posUpdate types.StreamingToken, -) { - n.streamLock.Lock() - defer n.streamLock.Unlock() - - n.currPos.ApplyUpdates(posUpdate) - n.wakeupUserDevice(userID, deviceIDs, n.currPos) -} - -// OnNewReceipt updates the current position -func (n *Notifier) OnNewTyping( - roomID string, - posUpdate types.StreamingToken, -) { - n.streamLock.Lock() - defer n.streamLock.Unlock() - - n.currPos.ApplyUpdates(posUpdate) - n.wakeupUsers(n.joinedUsers(roomID), nil, n.currPos) -} - -// OnNewReceipt updates the current position -func (n *Notifier) OnNewReceipt( - roomID string, - posUpdate types.StreamingToken, -) { - n.streamLock.Lock() - defer n.streamLock.Unlock() - - n.currPos.ApplyUpdates(posUpdate) - n.wakeupUsers(n.joinedUsers(roomID), nil, n.currPos) -} - -func (n *Notifier) OnNewKeyChange( - posUpdate types.StreamingToken, wakeUserID, keyChangeUserID string, -) { - n.streamLock.Lock() - defer n.streamLock.Unlock() - - n.currPos.ApplyUpdates(posUpdate) - n.wakeupUsers([]string{wakeUserID}, nil, n.currPos) -} - -func (n *Notifier) OnNewInvite( - posUpdate types.StreamingToken, wakeUserID string, -) { - n.streamLock.Lock() - defer n.streamLock.Unlock() - - n.currPos.ApplyUpdates(posUpdate) - n.wakeupUsers([]string{wakeUserID}, nil, n.currPos) -} - -// GetListener returns a UserStreamListener that can be used to wait for -// updates for a user. Must be closed. -// notify for anything before sincePos -func (n *Notifier) GetListener(req syncRequest) UserDeviceStreamListener { - // Do what synapse does: https://github.com/matrix-org/synapse/blob/v0.20.0/synapse/notifier.py#L298 - // - Bucket request into a lookup map keyed off a list of joined room IDs and separately a user ID - // - Incoming events wake requests for a matching room ID - // - Incoming events wake requests for a matching user ID (needed for invites) - - // TODO: v1 /events 'peeking' has an 'explicit room ID' which is also tracked, - // but given we don't do /events, let's pretend it doesn't exist. - - n.streamLock.Lock() - defer n.streamLock.Unlock() - - n.removeEmptyUserStreams() - - return n.fetchUserDeviceStream(req.device.UserID, req.device.ID, true).GetListener(req.ctx) -} - -// Load the membership states required to notify users correctly. -func (n *Notifier) Load(ctx context.Context, db storage.Database) error { - roomToUsers, err := db.AllJoinedUsersInRooms(ctx) - if err != nil { - return err - } - n.setUsersJoinedToRooms(roomToUsers) - - roomToPeekingDevices, err := db.AllPeekingDevicesInRooms(ctx) - if err != nil { - return err - } - n.setPeekingDevices(roomToPeekingDevices) - - return nil -} - -// CurrentPosition returns the current sync position -func (n *Notifier) CurrentPosition() types.StreamingToken { - n.streamLock.Lock() - defer n.streamLock.Unlock() - - return n.currPos -} - -// setUsersJoinedToRooms marks the given users as 'joined' to the given rooms, such that new events from -// these rooms will wake the given users /sync requests. This should be called prior to ANY calls to -// OnNewEvent (eg on startup) to prevent racing. -func (n *Notifier) setUsersJoinedToRooms(roomIDToUserIDs map[string][]string) { - // This is just the bulk form of addJoinedUser - for roomID, userIDs := range roomIDToUserIDs { - if _, ok := n.roomIDToJoinedUsers[roomID]; !ok { - n.roomIDToJoinedUsers[roomID] = make(userIDSet) - } - for _, userID := range userIDs { - n.roomIDToJoinedUsers[roomID].add(userID) - } - } -} - -// setPeekingDevices marks the given devices as peeking in the given rooms, such that new events from -// these rooms will wake the given devices' /sync requests. This should be called prior to ANY calls to -// OnNewEvent (eg on startup) to prevent racing. -func (n *Notifier) setPeekingDevices(roomIDToPeekingDevices map[string][]types.PeekingDevice) { - // This is just the bulk form of addPeekingDevice - for roomID, peekingDevices := range roomIDToPeekingDevices { - if _, ok := n.roomIDToPeekingDevices[roomID]; !ok { - n.roomIDToPeekingDevices[roomID] = make(peekingDeviceSet) - } - for _, peekingDevice := range peekingDevices { - n.roomIDToPeekingDevices[roomID].add(peekingDevice) - } - } -} - -// wakeupUsers will wake up the sync strems for all of the devices for all of the -// specified user IDs, and also the specified peekingDevices -func (n *Notifier) wakeupUsers(userIDs []string, peekingDevices []types.PeekingDevice, newPos types.StreamingToken) { - for _, userID := range userIDs { - for _, stream := range n.fetchUserStreams(userID) { - if stream == nil { - continue - } - stream.Broadcast(newPos) // wake up all goroutines Wait()ing on this stream - } - } - - for _, peekingDevice := range peekingDevices { - // TODO: don't bother waking up for devices whose users we already woke up - if stream := n.fetchUserDeviceStream(peekingDevice.UserID, peekingDevice.DeviceID, false); stream != nil { - stream.Broadcast(newPos) // wake up all goroutines Wait()ing on this stream - } - } -} - -// wakeupUserDevice will wake up the sync stream for a specific user device. Other -// device streams will be left alone. -// nolint:unused -func (n *Notifier) wakeupUserDevice(userID string, deviceIDs []string, newPos types.StreamingToken) { - for _, deviceID := range deviceIDs { - if stream := n.fetchUserDeviceStream(userID, deviceID, false); stream != nil { - stream.Broadcast(newPos) // wake up all goroutines Wait()ing on this stream - } - } -} - -// fetchUserDeviceStream retrieves a stream unique to the given device. If makeIfNotExists is true, -// a stream will be made for this device if one doesn't exist and it will be returned. This -// function does not wait for data to be available on the stream. -// NB: Callers should have locked the mutex before calling this function. -func (n *Notifier) fetchUserDeviceStream(userID, deviceID string, makeIfNotExists bool) *UserDeviceStream { - _, ok := n.userDeviceStreams[userID] - if !ok { - if !makeIfNotExists { - return nil - } - n.userDeviceStreams[userID] = map[string]*UserDeviceStream{} - } - stream, ok := n.userDeviceStreams[userID][deviceID] - if !ok { - if !makeIfNotExists { - return nil - } - // TODO: Unbounded growth of streams (1 per user) - if stream = NewUserDeviceStream(userID, deviceID, n.currPos); stream != nil { - n.userDeviceStreams[userID][deviceID] = stream - } - } - return stream -} - -// fetchUserStreams retrieves all streams for the given user. If makeIfNotExists is true, -// a stream will be made for this user if one doesn't exist and it will be returned. This -// function does not wait for data to be available on the stream. -// NB: Callers should have locked the mutex before calling this function. -func (n *Notifier) fetchUserStreams(userID string) []*UserDeviceStream { - user, ok := n.userDeviceStreams[userID] - if !ok { - return []*UserDeviceStream{} - } - streams := []*UserDeviceStream{} - for _, stream := range user { - streams = append(streams, stream) - } - return streams -} - -// Not thread-safe: must be called on the OnNewEvent goroutine only -func (n *Notifier) addJoinedUser(roomID, userID string) { - if _, ok := n.roomIDToJoinedUsers[roomID]; !ok { - n.roomIDToJoinedUsers[roomID] = make(userIDSet) - } - n.roomIDToJoinedUsers[roomID].add(userID) -} - -// Not thread-safe: must be called on the OnNewEvent goroutine only -func (n *Notifier) removeJoinedUser(roomID, userID string) { - if _, ok := n.roomIDToJoinedUsers[roomID]; !ok { - n.roomIDToJoinedUsers[roomID] = make(userIDSet) - } - n.roomIDToJoinedUsers[roomID].remove(userID) -} - -// Not thread-safe: must be called on the OnNewEvent goroutine only -func (n *Notifier) joinedUsers(roomID string) (userIDs []string) { - if _, ok := n.roomIDToJoinedUsers[roomID]; !ok { - return - } - return n.roomIDToJoinedUsers[roomID].values() -} - -// Not thread-safe: must be called on the OnNewEvent goroutine only -func (n *Notifier) addPeekingDevice(roomID, userID, deviceID string) { - if _, ok := n.roomIDToPeekingDevices[roomID]; !ok { - n.roomIDToPeekingDevices[roomID] = make(peekingDeviceSet) - } - n.roomIDToPeekingDevices[roomID].add(types.PeekingDevice{UserID: userID, DeviceID: deviceID}) -} - -// Not thread-safe: must be called on the OnNewEvent goroutine only -// nolint:unused -func (n *Notifier) removePeekingDevice(roomID, userID, deviceID string) { - if _, ok := n.roomIDToPeekingDevices[roomID]; !ok { - n.roomIDToPeekingDevices[roomID] = make(peekingDeviceSet) - } - // XXX: is this going to work as a key? - n.roomIDToPeekingDevices[roomID].remove(types.PeekingDevice{UserID: userID, DeviceID: deviceID}) -} - -// Not thread-safe: must be called on the OnNewEvent goroutine only -func (n *Notifier) PeekingDevices(roomID string) (peekingDevices []types.PeekingDevice) { - if _, ok := n.roomIDToPeekingDevices[roomID]; !ok { - return - } - return n.roomIDToPeekingDevices[roomID].values() -} - -// removeEmptyUserStreams iterates through the user stream map and removes any -// that have been empty for a certain amount of time. This is a crude way of -// ensuring that the userStreams map doesn't grow forver. -// This should be called when the notifier gets called for whatever reason, -// the function itself is responsible for ensuring it doesn't iterate too -// often. -// NB: Callers should have locked the mutex before calling this function. -func (n *Notifier) removeEmptyUserStreams() { - // Only clean up now and again - now := time.Now() - if n.lastCleanUpTime.Add(time.Minute).After(now) { - return - } - n.lastCleanUpTime = now - - deleteBefore := now.Add(-5 * time.Minute) - for user, byUser := range n.userDeviceStreams { - for device, stream := range byUser { - if stream.TimeOfLastNonEmpty().Before(deleteBefore) { - delete(n.userDeviceStreams[user], device) - } - if len(n.userDeviceStreams[user]) == 0 { - delete(n.userDeviceStreams, user) - } - } - } -} - -// A string set, mainly existing for improving clarity of structs in this file. -type userIDSet map[string]bool - -func (s userIDSet) add(str string) { - s[str] = true -} - -func (s userIDSet) remove(str string) { - delete(s, str) -} - -func (s userIDSet) values() (vals []string) { - for str := range s { - vals = append(vals, str) - } - return -} - -// A set of PeekingDevices, similar to userIDSet - -type peekingDeviceSet map[types.PeekingDevice]bool - -func (s peekingDeviceSet) add(d types.PeekingDevice) { - s[d] = true -} - -// nolint:unused -func (s peekingDeviceSet) remove(d types.PeekingDevice) { - delete(s, d) -} - -func (s peekingDeviceSet) values() (vals []types.PeekingDevice) { - for d := range s { - vals = append(vals, d) - } - return -} diff --git a/syncapi/sync/notifier_test.go b/syncapi/sync/notifier_test.go deleted file mode 100644 index d24da4636..000000000 --- a/syncapi/sync/notifier_test.go +++ /dev/null @@ -1,374 +0,0 @@ -// Copyright 2017 Vector Creations Ltd -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package sync - -import ( - "context" - "encoding/json" - "fmt" - "sync" - "testing" - "time" - - "github.com/matrix-org/dendrite/syncapi/types" - userapi "github.com/matrix-org/dendrite/userapi/api" - "github.com/matrix-org/gomatrixserverlib" - "github.com/matrix-org/util" -) - -var ( - randomMessageEvent gomatrixserverlib.HeaderedEvent - aliceInviteBobEvent gomatrixserverlib.HeaderedEvent - bobLeaveEvent gomatrixserverlib.HeaderedEvent - syncPositionVeryOld = types.StreamingToken{PDUPosition: 5} - syncPositionBefore = types.StreamingToken{PDUPosition: 11} - syncPositionAfter = types.StreamingToken{PDUPosition: 12} - //syncPositionNewEDU = types.NewStreamToken(syncPositionAfter.PDUPosition, 1, 0, 0, nil) - syncPositionAfter2 = types.StreamingToken{PDUPosition: 13} -) - -var ( - roomID = "!test:localhost" - alice = "@alice:localhost" - aliceDev = "alicedevice" - bob = "@bob:localhost" - bobDev = "bobdev" -) - -func init() { - var err error - err = json.Unmarshal([]byte(`{ - "_room_version": "1", - "type": "m.room.message", - "content": { - "body": "Hello World", - "msgtype": "m.text" - }, - "sender": "@noone:localhost", - "room_id": "`+roomID+`", - "origin": "localhost", - "origin_server_ts": 12345, - "event_id": "$randomMessageEvent:localhost" - }`), &randomMessageEvent) - if err != nil { - panic(err) - } - err = json.Unmarshal([]byte(`{ - "_room_version": "1", - "type": "m.room.member", - "state_key": "`+bob+`", - "content": { - "membership": "invite" - }, - "sender": "`+alice+`", - "room_id": "`+roomID+`", - "origin": "localhost", - "origin_server_ts": 12345, - "event_id": "$aliceInviteBobEvent:localhost" - }`), &aliceInviteBobEvent) - if err != nil { - panic(err) - } - err = json.Unmarshal([]byte(`{ - "_room_version": "1", - "type": "m.room.member", - "state_key": "`+bob+`", - "content": { - "membership": "leave" - }, - "sender": "`+bob+`", - "room_id": "`+roomID+`", - "origin": "localhost", - "origin_server_ts": 12345, - "event_id": "$bobLeaveEvent:localhost" - }`), &bobLeaveEvent) - if err != nil { - panic(err) - } -} - -func mustEqualPositions(t *testing.T, got, want types.StreamingToken) { - if got.String() != want.String() { - t.Fatalf("mustEqualPositions got %s want %s", got.String(), want.String()) - } -} - -// Test that the current position is returned if a request is already behind. -func TestImmediateNotification(t *testing.T) { - n := NewNotifier(syncPositionBefore) - pos, err := waitForEvents(n, newTestSyncRequest(alice, aliceDev, syncPositionVeryOld)) - if err != nil { - t.Fatalf("TestImmediateNotification error: %s", err) - } - mustEqualPositions(t, pos, syncPositionBefore) -} - -// Test that new events to a joined room unblocks the request. -func TestNewEventAndJoinedToRoom(t *testing.T) { - n := NewNotifier(syncPositionBefore) - n.setUsersJoinedToRooms(map[string][]string{ - roomID: {alice, bob}, - }) - - var wg sync.WaitGroup - wg.Add(1) - go func() { - pos, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionBefore)) - if err != nil { - t.Errorf("TestNewEventAndJoinedToRoom error: %w", err) - } - mustEqualPositions(t, pos, syncPositionAfter) - wg.Done() - }() - - stream := lockedFetchUserStream(n, bob, bobDev) - waitForBlocking(stream, 1) - - n.OnNewEvent(&randomMessageEvent, "", nil, syncPositionAfter) - - wg.Wait() -} - -func TestCorrectStream(t *testing.T) { - n := NewNotifier(syncPositionBefore) - stream := lockedFetchUserStream(n, bob, bobDev) - if stream.UserID != bob { - t.Fatalf("expected user %q, got %q", bob, stream.UserID) - } - if stream.DeviceID != bobDev { - t.Fatalf("expected device %q, got %q", bobDev, stream.DeviceID) - } -} - -func TestCorrectStreamWakeup(t *testing.T) { - n := NewNotifier(syncPositionBefore) - awoken := make(chan string) - - streamone := lockedFetchUserStream(n, alice, "one") - streamtwo := lockedFetchUserStream(n, alice, "two") - - go func() { - select { - case <-streamone.signalChannel: - awoken <- "one" - case <-streamtwo.signalChannel: - awoken <- "two" - } - }() - - time.Sleep(1 * time.Second) - - wake := "two" - n.wakeupUserDevice(alice, []string{wake}, syncPositionAfter) - - if result := <-awoken; result != wake { - t.Fatalf("expected to wake %q, got %q", wake, result) - } -} - -// Test that an invite unblocks the request -func TestNewInviteEventForUser(t *testing.T) { - n := NewNotifier(syncPositionBefore) - n.setUsersJoinedToRooms(map[string][]string{ - roomID: {alice, bob}, - }) - - var wg sync.WaitGroup - wg.Add(1) - go func() { - pos, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionBefore)) - if err != nil { - t.Errorf("TestNewInviteEventForUser error: %w", err) - } - mustEqualPositions(t, pos, syncPositionAfter) - wg.Done() - }() - - stream := lockedFetchUserStream(n, bob, bobDev) - waitForBlocking(stream, 1) - - n.OnNewEvent(&aliceInviteBobEvent, "", nil, syncPositionAfter) - - wg.Wait() -} - -// Test an EDU-only update wakes up the request. -// TODO: Fix this test, invites wake up with an incremented -// PDU position, not EDU position -/* -func TestEDUWakeup(t *testing.T) { - n := NewNotifier(syncPositionAfter) - n.setUsersJoinedToRooms(map[string][]string{ - roomID: {alice, bob}, - }) - - var wg sync.WaitGroup - wg.Add(1) - go func() { - pos, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionAfter)) - if err != nil { - t.Errorf("TestNewInviteEventForUser error: %w", err) - } - mustEqualPositions(t, pos, syncPositionNewEDU) - wg.Done() - }() - - stream := lockedFetchUserStream(n, bob, bobDev) - waitForBlocking(stream, 1) - - n.OnNewEvent(&aliceInviteBobEvent, "", nil, syncPositionNewEDU) - - wg.Wait() -} -*/ - -// Test that all blocked requests get woken up on a new event. -func TestMultipleRequestWakeup(t *testing.T) { - n := NewNotifier(syncPositionBefore) - n.setUsersJoinedToRooms(map[string][]string{ - roomID: {alice, bob}, - }) - - var wg sync.WaitGroup - wg.Add(3) - poll := func() { - pos, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionBefore)) - if err != nil { - t.Errorf("TestMultipleRequestWakeup error: %w", err) - } - mustEqualPositions(t, pos, syncPositionAfter) - wg.Done() - } - go poll() - go poll() - go poll() - - stream := lockedFetchUserStream(n, bob, bobDev) - waitForBlocking(stream, 3) - - n.OnNewEvent(&randomMessageEvent, "", nil, syncPositionAfter) - - wg.Wait() - - numWaiting := stream.NumWaiting() - if numWaiting != 0 { - t.Errorf("TestMultipleRequestWakeup NumWaiting() want 0, got %d", numWaiting) - } -} - -// Test that you stop getting woken up when you leave a room. -func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) { - // listen as bob. Make bob leave room. Make alice send event to room. - // Make sure alice gets woken up only and not bob as well. - n := NewNotifier(syncPositionBefore) - n.setUsersJoinedToRooms(map[string][]string{ - roomID: {alice, bob}, - }) - - var leaveWG sync.WaitGroup - - // Make bob leave the room - leaveWG.Add(1) - go func() { - pos, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionBefore)) - if err != nil { - t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom error: %w", err) - } - mustEqualPositions(t, pos, syncPositionAfter) - leaveWG.Done() - }() - bobStream := lockedFetchUserStream(n, bob, bobDev) - waitForBlocking(bobStream, 1) - n.OnNewEvent(&bobLeaveEvent, "", nil, syncPositionAfter) - leaveWG.Wait() - - // send an event into the room. Make sure alice gets it. Bob should not. - var aliceWG sync.WaitGroup - aliceStream := lockedFetchUserStream(n, alice, aliceDev) - aliceWG.Add(1) - go func() { - pos, err := waitForEvents(n, newTestSyncRequest(alice, aliceDev, syncPositionAfter)) - if err != nil { - t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom error: %w", err) - } - mustEqualPositions(t, pos, syncPositionAfter2) - aliceWG.Done() - }() - - go func() { - // this should timeout with an error (but the main goroutine won't wait for the timeout explicitly) - _, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionAfter)) - if err == nil { - t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom expect error but got nil") - } - }() - - waitForBlocking(aliceStream, 1) - waitForBlocking(bobStream, 1) - - n.OnNewEvent(&randomMessageEvent, "", nil, syncPositionAfter2) - aliceWG.Wait() - - // it's possible that at this point alice has been informed and bob is about to be informed, so wait - // for a fraction of a second to account for this race - time.Sleep(1 * time.Millisecond) -} - -func waitForEvents(n *Notifier, req syncRequest) (types.StreamingToken, error) { - listener := n.GetListener(req) - defer listener.Close() - - select { - case <-time.After(5 * time.Second): - return types.StreamingToken{}, fmt.Errorf( - "waitForEvents timed out waiting for %s (pos=%v)", req.device.UserID, req.since, - ) - case <-listener.GetNotifyChannel(req.since): - p := listener.GetSyncPosition() - return p, nil - } -} - -// Wait until something is Wait()ing on the user stream. -func waitForBlocking(s *UserDeviceStream, numBlocking uint) { - for numBlocking != s.NumWaiting() { - // This is horrible but I don't want to add a signalling mechanism JUST for testing. - time.Sleep(1 * time.Microsecond) - } -} - -// lockedFetchUserStream invokes Notifier.fetchUserStream, respecting Notifier.streamLock. -// A new stream is made if it doesn't exist already. -func lockedFetchUserStream(n *Notifier, userID, deviceID string) *UserDeviceStream { - n.streamLock.Lock() - defer n.streamLock.Unlock() - - return n.fetchUserDeviceStream(userID, deviceID, true) -} - -func newTestSyncRequest(userID, deviceID string, since types.StreamingToken) syncRequest { - return syncRequest{ - device: userapi.Device{ - UserID: userID, - ID: deviceID, - }, - timeout: 1 * time.Minute, - since: since, - wantFullState: false, - limit: DefaultTimelineLimit, - log: util.GetLogger(context.TODO()), - ctx: context.TODO(), - } -} diff --git a/syncapi/sync/request.go b/syncapi/sync/request.go index f2f2894be..58ac97b6e 100644 --- a/syncapi/sync/request.go +++ b/syncapi/sync/request.go @@ -15,7 +15,6 @@ package sync import ( - "context" "encoding/json" "net/http" "strconv" @@ -26,7 +25,6 @@ import ( userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" - log "github.com/sirupsen/logrus" ) const defaultSyncTimeout = time.Duration(0) @@ -40,18 +38,7 @@ type filter struct { } `json:"room"` } -// syncRequest represents a /sync request, with sensible defaults/sanity checks applied. -type syncRequest struct { - ctx context.Context - device userapi.Device - limit int - timeout time.Duration - since types.StreamingToken // nil means that no since token was supplied - wantFullState bool - log *log.Entry -} - -func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Database) (*syncRequest, error) { +func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Database) (*types.StreamRangeRequest, error) { timeout := getTimeout(req.URL.Query().Get("timeout")) fullState := req.URL.Query().Get("full_state") wantFullState := fullState != "" && fullState != "false" @@ -88,14 +75,17 @@ func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Dat } } // TODO: Additional query params: set_presence, filter - return &syncRequest{ - ctx: req.Context(), - device: device, - timeout: timeout, - since: since, - wantFullState: wantFullState, - limit: timelineLimit, - log: util.GetLogger(req.Context()), + + return &types.StreamRangeRequest{ + Context: req.Context(), // + Device: &device, // + Response: types.NewResponse(), // Populated by all streams + Filter: gomatrixserverlib.DefaultEventFilter(), // + Since: since, // + Timeout: timeout, // + Limit: timelineLimit, // + Rooms: make(map[string]string), // Populated by the PDU stream + WantFullState: wantFullState, // }, nil } diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index 79aa8b56f..3b4b9f772 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -33,7 +33,6 @@ import ( "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" userapi "github.com/matrix-org/dendrite/userapi/api" - "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" @@ -57,7 +56,7 @@ type RequestPool struct { // NewRequestPool makes a new RequestPool func NewRequestPool( - db storage.Database, cfg *config.SyncAPI, n *Notifier, + db storage.Database, cfg *config.SyncAPI, userAPI userapi.UserInternalAPI, keyAPI keyapi.KeyInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI, ) *RequestPool { @@ -160,13 +159,11 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. logger := util.GetLogger(req.Context()).WithFields(log.Fields{ "user_id": device.UserID, "device_id": device.ID, - "since": syncReq.since, - "timeout": syncReq.timeout, - "limit": syncReq.limit, + "since": syncReq.Since, + "timeout": syncReq.Timeout, + "limit": syncReq.Limit, }) - _ = logger - activeSyncRequests.Inc() defer activeSyncRequests.Dec() @@ -176,50 +173,54 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. defer waitingSyncRequests.Dec() if !rp.shouldReturnImmediately(syncReq) { - timer := time.NewTimer(syncReq.timeout) // case of timeout=0 is handled above + timer := time.NewTimer(syncReq.Timeout) // case of timeout=0 is handled above defer timer.Stop() + // Use a subcontext so that we don't keep the StreamNotifyAfter + // goroutines alive any longer than they really need to be. + waitctx, waitcancel := context.WithCancel(syncReq.Context) + select { - case <-syncReq.ctx.Done(): // Caller gave up + case <-waitctx.Done(): // Caller gave up + waitcancel() return util.JSONResponse{Code: http.StatusOK, JSON: syncData} case <-timer.C: // Timeout reached + waitcancel() return util.JSONResponse{Code: http.StatusOK, JSON: syncData} - case <-rp.pduStream.StreamNotifyAfter(syncReq.ctx, syncReq.since): - case <-rp.typingStream.StreamNotifyAfter(syncReq.ctx, syncReq.since): - case <-rp.receiptStream.StreamNotifyAfter(syncReq.ctx, syncReq.since): - // case <-rp.sendToDeviceStream.StreamNotifyAfter(syncReq.ctx, syncReq.since): - // case <-rp.inviteStream.StreamNotifyAfter(syncReq.ctx, syncReq.since): - // case <-rp.deviceListStream.StreamNotifyAfter(syncReq.ctx, syncReq.since): + case <-rp.pduStream.StreamNotifyAfter(waitctx, syncReq.Since): + case <-rp.typingStream.StreamNotifyAfter(waitctx, syncReq.Since): + case <-rp.receiptStream.StreamNotifyAfter(waitctx, syncReq.Since): + // case <-rp.sendToDeviceStream.StreamNotifyAfter(waitctx, syncReq.Since): + // case <-rp.inviteStream.StreamNotifyAfter(waitctx, syncReq.Since): + // case <-rp.deviceListStream.StreamNotifyAfter(waitctx, syncReq.Since): } + + waitcancel() + logger.Println("Responding to sync after notify") + } else { + logger.Println("Responding to sync immediately") } var latest types.StreamingToken - latest.ApplyUpdates(rp.pduStream.StreamLatestPosition(syncReq.ctx)) - latest.ApplyUpdates(rp.typingStream.StreamLatestPosition(syncReq.ctx)) - latest.ApplyUpdates(rp.receiptStream.StreamLatestPosition(syncReq.ctx)) - // latest.ApplyUpdates(rp.sendToDeviceStream.StreamLatestPosition(syncReq.ctx)) - // latest.ApplyUpdates(rp.inviteStream.StreamLatestPosition(syncReq.ctx)) - // latest.ApplyUpdates(rp.deviceListStream.StreamLatestPosition(syncReq.ctx)) + latest.ApplyUpdates(rp.pduStream.StreamLatestPosition(syncReq.Context)) + latest.ApplyUpdates(rp.typingStream.StreamLatestPosition(syncReq.Context)) + latest.ApplyUpdates(rp.receiptStream.StreamLatestPosition(syncReq.Context)) + // latest.ApplyUpdates(rp.sendToDeviceStream.StreamLatestPosition(syncReq.Context)) + // latest.ApplyUpdates(rp.inviteStream.StreamLatestPosition(syncReq.Context)) + // latest.ApplyUpdates(rp.deviceListStream.StreamLatestPosition(syncReq.Context)) - sr := &types.StreamRangeRequest{ - Device: device, // - Response: types.NewResponse(), // Populated by all streams - Filter: gomatrixserverlib.DefaultEventFilter(), // - Rooms: make(map[string]string), // Populated by the PDU stream - } - - sr.Response.NextBatch.ApplyUpdates(rp.pduStream.StreamRange(syncReq.ctx, sr, syncReq.since, latest)) - sr.Response.NextBatch.ApplyUpdates(rp.typingStream.StreamRange(syncReq.ctx, sr, syncReq.since, latest)) - sr.Response.NextBatch.ApplyUpdates(rp.receiptStream.StreamRange(syncReq.ctx, sr, syncReq.since, latest)) - // sr.Response.NextBatch.ApplyUpdates(rp.sendToDeviceStream.StreamRange(syncReq.ctx, sr, syncReq.since, latest)) - // sr.Response.NextBatch.ApplyUpdates(rp.inviteStream.StreamRange(syncReq.ctx, sr, syncReq.since, latest)) - // sr.Response.NextBatch.ApplyUpdates(rp.inviteStream.StreamRange(syncReq.ctx, sr, syncReq.since, latest)) + syncReq.Response.NextBatch.ApplyUpdates(rp.pduStream.StreamRange(syncReq.Context, syncReq, syncReq.Since, latest)) + syncReq.Response.NextBatch.ApplyUpdates(rp.typingStream.StreamRange(syncReq.Context, syncReq, syncReq.Since, latest)) + syncReq.Response.NextBatch.ApplyUpdates(rp.receiptStream.StreamRange(syncReq.Context, syncReq, syncReq.Since, latest)) + // syncReq.Response.NextBatch.ApplyUpdates(rp.sendToDeviceStream.StreamRange(syncReq.Context, syncReq, syncReq.Since, latest)) + // syncReq.Response.NextBatch.ApplyUpdates(rp.inviteStream.StreamRange(syncReq.Context, syncReq, syncReq.Since, latest)) + // syncReq.Response.NextBatch.ApplyUpdates(rp.inviteStream.StreamRange(syncReq.Context, syncReq, syncReq.Since, latest)) return util.JSONResponse{ Code: http.StatusOK, - JSON: sr.Response, + JSON: syncReq.Response, } } @@ -458,10 +459,10 @@ func (rp *RequestPool) appendAccountData( // shouldReturnImmediately returns whether the /sync request is an initial sync, // or timeout=0, or full_state=true, in any of the cases the request should // return immediately. -func (rp *RequestPool) shouldReturnImmediately(syncReq *syncRequest) bool { - if syncReq.since.IsEmpty() || syncReq.timeout == 0 || syncReq.wantFullState { +func (rp *RequestPool) shouldReturnImmediately(syncReq *types.StreamRangeRequest) bool { + if syncReq.Since.IsEmpty() || syncReq.Timeout == 0 || syncReq.WantFullState { return true } - waiting, werr := rp.db.SendToDeviceUpdatesWaiting(context.TODO(), syncReq.device.UserID, syncReq.device.ID) + waiting, werr := rp.db.SendToDeviceUpdatesWaiting(context.TODO(), syncReq.Device.UserID, syncReq.Device.ID) return werr == nil && waiting } diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index 0610add53..3a45ffc6f 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -15,8 +15,6 @@ package syncapi import ( - "context" - "github.com/gorilla/mux" "github.com/sirupsen/logrus" @@ -50,57 +48,46 @@ func AddPublicRoutes( logrus.WithError(err).Panicf("failed to connect to sync db") } - pos, err := syncDB.SyncPosition(context.Background()) - if err != nil { - logrus.WithError(err).Panicf("failed to get sync position") - } - - notifier := sync.NewNotifier(pos) - err = notifier.Load(context.Background(), syncDB) - if err != nil { - logrus.WithError(err).Panicf("failed to start notifier") - } - - requestPool := sync.NewRequestPool(syncDB, cfg, notifier, userAPI, keyAPI, rsAPI) + requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI) keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer( cfg.Matrix.ServerName, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputKeyChangeEvent)), - consumer, notifier, keyAPI, rsAPI, syncDB, + consumer, keyAPI, rsAPI, syncDB, ) if err = keyChangeConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start key change consumer") } roomConsumer := consumers.NewOutputRoomEventConsumer( - cfg, consumer, notifier, syncDB, rsAPI, + cfg, consumer, syncDB, rsAPI, ) if err = roomConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start room server consumer") } clientConsumer := consumers.NewOutputClientDataConsumer( - cfg, consumer, notifier, syncDB, + cfg, consumer, syncDB, ) if err = clientConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start client data consumer") } typingConsumer := consumers.NewOutputTypingEventConsumer( - cfg, consumer, notifier, syncDB, + cfg, consumer, syncDB, ) if err = typingConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start typing consumer") } sendToDeviceConsumer := consumers.NewOutputSendToDeviceEventConsumer( - cfg, consumer, notifier, syncDB, + cfg, consumer, syncDB, ) if err = sendToDeviceConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start send-to-device consumer") } receiptConsumer := consumers.NewOutputReceiptEventConsumer( - cfg, consumer, notifier, syncDB, + cfg, consumer, syncDB, ) if err = receiptConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start receipts consumer") diff --git a/syncapi/types/provider.go b/syncapi/types/provider.go index 721efc5db..a79504b3c 100644 --- a/syncapi/types/provider.go +++ b/syncapi/types/provider.go @@ -2,16 +2,25 @@ package types import ( "context" + "time" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" ) type StreamRangeRequest struct { - Device *userapi.Device - Response *Response - Filter gomatrixserverlib.EventFilter - Rooms map[string]string + Context context.Context + Device *userapi.Device + Response *Response + Filter gomatrixserverlib.EventFilter + Since StreamingToken + Limit int + Timeout time.Duration + WantFullState bool + + // Below this line are items updated by the + // stream providers. Not thread-safe. + Rooms map[string]string } type StreamProvider interface {