From 4e84c2f2e9a9b51dbd6aac2389b42a89e205c536 Mon Sep 17 00:00:00 2001 From: Cnly Date: Fri, 12 Jul 2019 20:47:13 +0800 Subject: [PATCH] Docs and code cleanup Signed-off-by: Alex Chen --- syncapi/consumers/typingserver.go | 5 ++- syncapi/storage/output_room_events_table.go | 2 +- syncapi/storage/syncserver.go | 18 ++++++----- syncapi/sync/notifier.go | 34 +++++++++++---------- syncapi/sync/notifier_test.go | 2 +- syncapi/sync/request.go | 4 +-- syncapi/sync/requestpool.go | 2 +- syncapi/sync/userstream.go | 6 ++-- syncapi/syncapi.go | 2 +- syncapi/types/types.go | 10 +++--- 10 files changed, 46 insertions(+), 39 deletions(-) diff --git a/syncapi/consumers/typingserver.go b/syncapi/consumers/typingserver.go index 601ebaabd..5d998a18a 100644 --- a/syncapi/consumers/typingserver.go +++ b/syncapi/consumers/typingserver.go @@ -27,12 +27,15 @@ import ( sarama "gopkg.in/Shopify/sarama.v1" ) +// OutputTypingEventConsumer consumes events that originated in the typing server. type OutputTypingEventConsumer struct { typingConsumer *common.ContinualConsumer db *storage.SyncServerDatasource notifier *sync.Notifier } +// NewOutputTypingEventConsumer creates a new OutputTypingEventConsumer. +// Call Start() to begin consuming from the typing server. func NewOutputTypingEventConsumer( cfg *config.Dendrite, kafkaConsumer sarama.Consumer, @@ -78,7 +81,7 @@ func (s *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error "room_id": output.Event.RoomID, "user_id": output.Event.UserID, "typing": output.Event.Typing, - }).Info("received data from typing server") + }).Debug("received data from typing server") var typingPos int64 typingEvent := output.Event diff --git a/syncapi/storage/output_room_events_table.go b/syncapi/storage/output_room_events_table.go index f89c6c91e..06df017cb 100644 --- a/syncapi/storage/output_room_events_table.go +++ b/syncapi/storage/output_room_events_table.go @@ -108,7 +108,7 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) { return } -// selectStateInRange returns the state events between the two given stream positions, exclusive of oldPos, inclusive of newPos. +// selectStateInRange returns the state events between the two given PDU stream positions, exclusive of oldPos, inclusive of newPos. // Results are bucketed based on the room ID. If the same state is overwritten multiple times between the // two positions, only the most recent state is returned. func (s *outputRoomEventsStatements) selectStateInRange( diff --git a/syncapi/storage/syncserver.go b/syncapi/storage/syncserver.go index e76a38e0d..5f2aa7413 100644 --- a/syncapi/storage/syncserver.go +++ b/syncapi/storage/syncserver.go @@ -39,12 +39,12 @@ type stateDelta struct { roomID string stateEvents []gomatrixserverlib.Event membership string - // The stream position of the latest membership event for this user, if applicable. + // The PDU stream position of the latest membership event for this user, if applicable. // Can be 0 if there is no membership event in this delta. membershipPos int64 } -// Same as gomatrixserverlib.Event but also has the stream position for this event. +// Same as gomatrixserverlib.Event but also has the PDU stream position for this event. type streamEvent struct { gomatrixserverlib.Event streamPosition int64 @@ -111,7 +111,7 @@ func (d *SyncServerDatasource) Events(ctx context.Context, eventIDs []string) ([ } // WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races -// when generating the stream position for this event. Returns the sync stream position for the inserted event. +// when generating the sync stream position for this event. Returns the sync stream position for the inserted event. // Returns an error if there was a problem inserting this event. func (d *SyncServerDatasource) WriteEvent( ctx context.Context, @@ -228,8 +228,8 @@ func (d *SyncServerDatasource) syncPositionTx( return } -// addPDUDeltaToResponse adds all PDU delta to a sync response. -// IDs of all rooms the user joined are returned so EDU delta can be added for them. +// addPDUDeltaToResponse adds all PDU deltas to a sync response. +// IDs of all rooms the user joined are returned so EDU deltas can be added for them. func (d *SyncServerDatasource) addPDUDeltaToResponse( ctx context.Context, device authtypes.Device, @@ -245,7 +245,7 @@ func (d *SyncServerDatasource) addPDUDeltaToResponse( defer common.EndTransaction(txn, &succeeded) // Work out which rooms to return in the response. This is done by getting not only the currently - // joined rooms, but also which rooms have membership transitions for this user between the 2 stream positions. + // joined rooms, but also which rooms have membership transitions for this user between the 2 PDU stream positions. // This works out what the 'state' key should be for each room as well as which membership block // to put the room into. deltas, err := d.getStateDeltas(ctx, &device, txn, fromPos, toPos, device.UserID) @@ -271,6 +271,8 @@ func (d *SyncServerDatasource) addPDUDeltaToResponse( return joinedRoomIDs, nil } +// addTypingDeltaToResponse adds all typing notifications to a sync response +// since the specified position. func (d *SyncServerDatasource) addTypingDeltaToResponse( since int64, joinedRoomIDs []string, @@ -372,7 +374,7 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync( err error, ) { // This needs to be all done in a transaction as we need to do multiple SELECTs, and we need to have - // a consistent view of the database throughout. This includes extracting the sync stream position. + // a consistent view of the database throughout. This includes extracting the sync position. // This does have the unfortunate side-effect that all the matrixy logic resides in this function, // but it's better to not hide the fact that this is being done in a transaction. txn, err := d.db.BeginTx(ctx, &txReadOnlySnapshot) @@ -440,7 +442,7 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync( return res, toPos, joinedRoomIDs, err } -// CompleteSync a complete /sync API response for the given user. +// CompleteSync returns a complete /sync API response for the given user. func (d *SyncServerDatasource) CompleteSync( ctx context.Context, userID string, numRecentEventsPerRoom int, ) (*types.Response, error) { diff --git a/syncapi/sync/notifier.go b/syncapi/sync/notifier.go index 13df3b2ca..9f6371e03 100644 --- a/syncapi/sync/notifier.go +++ b/syncapi/sync/notifier.go @@ -26,7 +26,7 @@ import ( ) // Notifier will wake up sleeping requests when there is some new data. -// It does not tell requests what that data is, only the stream position which +// It does not tell requests what that data is, only the sync position which // they can use to get at it. This is done to prevent races whereby we tell the caller // the event, but the token has already advanced by the time they fetch it, resulting // in missed events. @@ -43,7 +43,7 @@ type Notifier struct { lastCleanUpTime time.Time } -// NewNotifier creates a new notifier set to the given stream position. +// NewNotifier creates a new notifier set to the given sync position. // In order for this to be of any use, the Notifier needs to be told all rooms and // the joined users within each of them by calling Notifier.Load(*storage.SyncServerDatabase). func NewNotifier(pos types.SyncPosition) *Notifier { @@ -58,13 +58,18 @@ func NewNotifier(pos types.SyncPosition) *Notifier { // OnNewEvent is called when a new event is received from the room server. Must only be // called from a single goroutine, to avoid races between updates which could set the -// current position in the stream incorrectly. -// Can be called with one among: a *gomatrixserverlib.Event, a room ID, or a list of user IDs. +// current sync position incorrectly. +// Chooses which user sync streams to update by a provided *gomatrixserverlib.Event +// (based on the users in the event's room), +// a roomID directly, or a list of user IDs, whichever specified first in the parameters. // posUpdate contains the latest position(s) for one or more types of events. // If a position in posUpdate is 0, it means no updates available of that type. // Typically a consumer supplies a posUpdate with the sync position for the // event type it handles set to the latest, leaving other fields as 0. -func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, roomID string, userIDs []string, posUpdate types.SyncPosition) { +func (n *Notifier) OnNewEvent( + ev *gomatrixserverlib.Event, roomID string, userIDs []string, + posUpdate types.SyncPosition, +) { // update the current position then notify relevant /sync streams. // This needs to be done PRIOR to waking up users as they will read this value. n.streamLock.Lock() @@ -109,7 +114,9 @@ func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, roomID string, userID } else if len(userIDs) > 0 { n.wakeupUsers(userIDs, latestPos) } else { - log.Warn("WARNING: Notifier.OnNewEvent called but caller supplied no user to wake up") + log.WithFields(log.Fields{ + "posUpdate": posUpdate.String, + }).Warn("Notifier.OnNewEvent called but caller supplied no user to wake up") } } @@ -143,7 +150,7 @@ func (n *Notifier) Load(ctx context.Context, db *storage.SyncServerDatasource) e return nil } -// CurrentPosition returns the current stream position +// CurrentPosition returns the current sync position func (n *Notifier) CurrentPosition() types.SyncPosition { n.streamLock.Lock() defer n.streamLock.Unlock() @@ -168,18 +175,13 @@ func (n *Notifier) setUsersJoinedToRooms(roomIDToUserIDs map[string][]string) { func (n *Notifier) wakeupUsers(userIDs []string, newPos types.SyncPosition) { for _, userID := range userIDs { - n.wakeupUser(userID, newPos) + stream := n.fetchUserStream(userID, false) + if stream != nil { + stream.Broadcast(newPos) // wake up all goroutines Wait()ing on this stream + } } } -func (n *Notifier) wakeupUser(userID string, newPos types.SyncPosition) { - stream := n.fetchUserStream(userID, false) - if stream == nil { - return - } - stream.Broadcast(newPos) // wakeup all goroutines Wait()ing on this stream -} - // fetchUserStream retrieves a stream unique to the given user. If makeIfNotExists is true, // a stream will be made for this user if one doesn't exist and it will be returned. This // function does not wait for data to be available on the stream. diff --git a/syncapi/sync/notifier_test.go b/syncapi/sync/notifier_test.go index 0578734d0..904315e9f 100644 --- a/syncapi/sync/notifier_test.go +++ b/syncapi/sync/notifier_test.go @@ -315,7 +315,7 @@ func waitForEvents(n *Notifier, req syncRequest) (types.SyncPosition, error) { "waitForEvents timed out waiting for %s (pos=%d)", req.device.UserID, req.since, ) case <-listener.GetNotifyChannel(*req.since): - p := listener.GetStreamPosition() + p := listener.GetSyncPosition() return p, nil } } diff --git a/syncapi/sync/request.go b/syncapi/sync/request.go index 2f73dd787..a5d2f60f4 100644 --- a/syncapi/sync/request.go +++ b/syncapi/sync/request.go @@ -75,7 +75,7 @@ func getTimeout(timeoutMS string) time.Duration { } // getSyncStreamPosition tries to parse a 'since' token taken from the API to a -// stream position. If the string is empty then (nil, nil) is returned. +// types.SyncPosition. If the string is empty then (nil, nil) is returned. // There are two forms of tokens: The full length form containing all PDU and EDU // positions separated by "_", and the short form containing only the PDU // position. Short form can be used for, e.g., `prev_batch` tokens. @@ -86,7 +86,7 @@ func getSyncStreamPosition(since string) (*types.SyncPosition, error) { posStrings := strings.Split(since, "_") if len(posStrings) != 2 && len(posStrings) != 1 { - // A token can either be a full length one or a short (PDU-only) one + // A token can either be full length or short (PDU-only). return nil, errors.New("malformed batch token") } diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index 1bfdb206e..a6ec6bd92 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -97,7 +97,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype select { // Wait for notifier to wake us up case <-userStreamListener.GetNotifyChannel(sincePos): - currPos = userStreamListener.GetStreamPosition() + currPos = userStreamListener.GetSyncPosition() sincePos = currPos // Or for timeout to expire case <-timer.C: diff --git a/syncapi/sync/userstream.go b/syncapi/sync/userstream.go index 05458fabe..beb10e487 100644 --- a/syncapi/sync/userstream.go +++ b/syncapi/sync/userstream.go @@ -84,7 +84,7 @@ func (s *UserStream) GetListener(ctx context.Context) UserStreamListener { return listener } -// Broadcast a new stream position for this user. +// Broadcast a new sync position for this user. func (s *UserStream) Broadcast(pos types.SyncPosition) { s.lock.Lock() defer s.lock.Unlock() @@ -118,9 +118,9 @@ func (s *UserStream) TimeOfLastNonEmpty() time.Time { return s.timeOfLastChannel } -// GetStreamPosition returns last stream position which the UserStream was +// GetStreamPosition returns last sync position which the UserStream was // notified about -func (s *UserStreamListener) GetStreamPosition() types.SyncPosition { +func (s *UserStreamListener) GetSyncPosition() types.SyncPosition { s.userStream.lock.Lock() defer s.userStream.lock.Unlock() diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index 632a57b15..4738feea2 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -45,7 +45,7 @@ func SetupSyncAPIComponent( pos, err := syncDB.SyncPosition(context.Background()) if err != nil { - logrus.WithError(err).Panicf("failed to get stream position") + logrus.WithError(err).Panicf("failed to get sync position") } notifier := sync.NewNotifier(pos) diff --git a/syncapi/types/types.go b/syncapi/types/types.go index 565c00501..af7ec865f 100644 --- a/syncapi/types/types.go +++ b/syncapi/types/types.go @@ -25,7 +25,7 @@ import ( type SyncPosition struct { // PDUPosition is the stream position for PDUs the client is at. PDUPosition int64 - // TypingPosition is the position for typing notifications the client is at. + // TypingPosition is the client's position for typing notifications. TypingPosition int64 } @@ -35,15 +35,15 @@ func (sp SyncPosition) String() string { strconv.FormatInt(sp.TypingPosition, 10) } -// IsAfter returns whether sp refers to states newer than other +// IsAfter returns whether one SyncPosition refers to states newer than another SyncPosition. func (sp SyncPosition) IsAfter(other SyncPosition) bool { return sp.PDUPosition > other.PDUPosition || sp.TypingPosition > other.TypingPosition } -// WithUpdates returns a copy of `sp` with updates represented by `other` applied. -// If a field is not 0 in `other`, it is considered an update and its value -// will replace the corresponding value in sp. +// WithUpdates returns a copy of the SyncPosition with updates applied from another SyncPosition. +// If the latter SyncPosition contains a field that is not 0, it is considered an update, +// and its value will replace the corresponding value in the SyncPosition on which WithUpdates is called. func (sp SyncPosition) WithUpdates(other SyncPosition) SyncPosition { ret := sp if other.PDUPosition != 0 {