Docs and code cleanup

Signed-off-by: Alex Chen <minecnly@gmail.com>
This commit is contained in:
Cnly 2019-07-12 20:47:13 +08:00
parent 8ed82c0278
commit 4e84c2f2e9
10 changed files with 46 additions and 39 deletions

View file

@ -27,12 +27,15 @@ import (
sarama "gopkg.in/Shopify/sarama.v1" sarama "gopkg.in/Shopify/sarama.v1"
) )
// OutputTypingEventConsumer consumes events that originated in the typing server.
type OutputTypingEventConsumer struct { type OutputTypingEventConsumer struct {
typingConsumer *common.ContinualConsumer typingConsumer *common.ContinualConsumer
db *storage.SyncServerDatasource db *storage.SyncServerDatasource
notifier *sync.Notifier notifier *sync.Notifier
} }
// NewOutputTypingEventConsumer creates a new OutputTypingEventConsumer.
// Call Start() to begin consuming from the typing server.
func NewOutputTypingEventConsumer( func NewOutputTypingEventConsumer(
cfg *config.Dendrite, cfg *config.Dendrite,
kafkaConsumer sarama.Consumer, kafkaConsumer sarama.Consumer,
@ -78,7 +81,7 @@ func (s *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error
"room_id": output.Event.RoomID, "room_id": output.Event.RoomID,
"user_id": output.Event.UserID, "user_id": output.Event.UserID,
"typing": output.Event.Typing, "typing": output.Event.Typing,
}).Info("received data from typing server") }).Debug("received data from typing server")
var typingPos int64 var typingPos int64
typingEvent := output.Event typingEvent := output.Event

View file

@ -108,7 +108,7 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) {
return return
} }
// selectStateInRange returns the state events between the two given stream positions, exclusive of oldPos, inclusive of newPos. // selectStateInRange returns the state events between the two given PDU stream positions, exclusive of oldPos, inclusive of newPos.
// Results are bucketed based on the room ID. If the same state is overwritten multiple times between the // Results are bucketed based on the room ID. If the same state is overwritten multiple times between the
// two positions, only the most recent state is returned. // two positions, only the most recent state is returned.
func (s *outputRoomEventsStatements) selectStateInRange( func (s *outputRoomEventsStatements) selectStateInRange(

View file

@ -39,12 +39,12 @@ type stateDelta struct {
roomID string roomID string
stateEvents []gomatrixserverlib.Event stateEvents []gomatrixserverlib.Event
membership string membership string
// The stream position of the latest membership event for this user, if applicable. // The PDU stream position of the latest membership event for this user, if applicable.
// Can be 0 if there is no membership event in this delta. // Can be 0 if there is no membership event in this delta.
membershipPos int64 membershipPos int64
} }
// Same as gomatrixserverlib.Event but also has the stream position for this event. // Same as gomatrixserverlib.Event but also has the PDU stream position for this event.
type streamEvent struct { type streamEvent struct {
gomatrixserverlib.Event gomatrixserverlib.Event
streamPosition int64 streamPosition int64
@ -111,7 +111,7 @@ func (d *SyncServerDatasource) Events(ctx context.Context, eventIDs []string) ([
} }
// WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races // WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races
// when generating the stream position for this event. Returns the sync stream position for the inserted event. // when generating the sync stream position for this event. Returns the sync stream position for the inserted event.
// Returns an error if there was a problem inserting this event. // Returns an error if there was a problem inserting this event.
func (d *SyncServerDatasource) WriteEvent( func (d *SyncServerDatasource) WriteEvent(
ctx context.Context, ctx context.Context,
@ -228,8 +228,8 @@ func (d *SyncServerDatasource) syncPositionTx(
return return
} }
// addPDUDeltaToResponse adds all PDU delta to a sync response. // addPDUDeltaToResponse adds all PDU deltas to a sync response.
// IDs of all rooms the user joined are returned so EDU delta can be added for them. // IDs of all rooms the user joined are returned so EDU deltas can be added for them.
func (d *SyncServerDatasource) addPDUDeltaToResponse( func (d *SyncServerDatasource) addPDUDeltaToResponse(
ctx context.Context, ctx context.Context,
device authtypes.Device, device authtypes.Device,
@ -245,7 +245,7 @@ func (d *SyncServerDatasource) addPDUDeltaToResponse(
defer common.EndTransaction(txn, &succeeded) defer common.EndTransaction(txn, &succeeded)
// Work out which rooms to return in the response. This is done by getting not only the currently // Work out which rooms to return in the response. This is done by getting not only the currently
// joined rooms, but also which rooms have membership transitions for this user between the 2 stream positions. // joined rooms, but also which rooms have membership transitions for this user between the 2 PDU stream positions.
// This works out what the 'state' key should be for each room as well as which membership block // This works out what the 'state' key should be for each room as well as which membership block
// to put the room into. // to put the room into.
deltas, err := d.getStateDeltas(ctx, &device, txn, fromPos, toPos, device.UserID) deltas, err := d.getStateDeltas(ctx, &device, txn, fromPos, toPos, device.UserID)
@ -271,6 +271,8 @@ func (d *SyncServerDatasource) addPDUDeltaToResponse(
return joinedRoomIDs, nil return joinedRoomIDs, nil
} }
// addTypingDeltaToResponse adds all typing notifications to a sync response
// since the specified position.
func (d *SyncServerDatasource) addTypingDeltaToResponse( func (d *SyncServerDatasource) addTypingDeltaToResponse(
since int64, since int64,
joinedRoomIDs []string, joinedRoomIDs []string,
@ -372,7 +374,7 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync(
err error, err error,
) { ) {
// This needs to be all done in a transaction as we need to do multiple SELECTs, and we need to have // This needs to be all done in a transaction as we need to do multiple SELECTs, and we need to have
// a consistent view of the database throughout. This includes extracting the sync stream position. // a consistent view of the database throughout. This includes extracting the sync position.
// This does have the unfortunate side-effect that all the matrixy logic resides in this function, // This does have the unfortunate side-effect that all the matrixy logic resides in this function,
// but it's better to not hide the fact that this is being done in a transaction. // but it's better to not hide the fact that this is being done in a transaction.
txn, err := d.db.BeginTx(ctx, &txReadOnlySnapshot) txn, err := d.db.BeginTx(ctx, &txReadOnlySnapshot)
@ -440,7 +442,7 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync(
return res, toPos, joinedRoomIDs, err return res, toPos, joinedRoomIDs, err
} }
// CompleteSync a complete /sync API response for the given user. // CompleteSync returns a complete /sync API response for the given user.
func (d *SyncServerDatasource) CompleteSync( func (d *SyncServerDatasource) CompleteSync(
ctx context.Context, userID string, numRecentEventsPerRoom int, ctx context.Context, userID string, numRecentEventsPerRoom int,
) (*types.Response, error) { ) (*types.Response, error) {

View file

@ -26,7 +26,7 @@ import (
) )
// Notifier will wake up sleeping requests when there is some new data. // Notifier will wake up sleeping requests when there is some new data.
// It does not tell requests what that data is, only the stream position which // It does not tell requests what that data is, only the sync position which
// they can use to get at it. This is done to prevent races whereby we tell the caller // they can use to get at it. This is done to prevent races whereby we tell the caller
// the event, but the token has already advanced by the time they fetch it, resulting // the event, but the token has already advanced by the time they fetch it, resulting
// in missed events. // in missed events.
@ -43,7 +43,7 @@ type Notifier struct {
lastCleanUpTime time.Time lastCleanUpTime time.Time
} }
// NewNotifier creates a new notifier set to the given stream position. // NewNotifier creates a new notifier set to the given sync position.
// In order for this to be of any use, the Notifier needs to be told all rooms and // In order for this to be of any use, the Notifier needs to be told all rooms and
// the joined users within each of them by calling Notifier.Load(*storage.SyncServerDatabase). // the joined users within each of them by calling Notifier.Load(*storage.SyncServerDatabase).
func NewNotifier(pos types.SyncPosition) *Notifier { func NewNotifier(pos types.SyncPosition) *Notifier {
@ -58,13 +58,18 @@ func NewNotifier(pos types.SyncPosition) *Notifier {
// OnNewEvent is called when a new event is received from the room server. Must only be // OnNewEvent is called when a new event is received from the room server. Must only be
// called from a single goroutine, to avoid races between updates which could set the // called from a single goroutine, to avoid races between updates which could set the
// current position in the stream incorrectly. // current sync position incorrectly.
// Can be called with one among: a *gomatrixserverlib.Event, a room ID, or a list of user IDs. // Chooses which user sync streams to update by a provided *gomatrixserverlib.Event
// (based on the users in the event's room),
// a roomID directly, or a list of user IDs, whichever specified first in the parameters.
// posUpdate contains the latest position(s) for one or more types of events. // posUpdate contains the latest position(s) for one or more types of events.
// If a position in posUpdate is 0, it means no updates available of that type. // If a position in posUpdate is 0, it means no updates available of that type.
// Typically a consumer supplies a posUpdate with the sync position for the // Typically a consumer supplies a posUpdate with the sync position for the
// event type it handles set to the latest, leaving other fields as 0. // event type it handles set to the latest, leaving other fields as 0.
func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, roomID string, userIDs []string, posUpdate types.SyncPosition) { func (n *Notifier) OnNewEvent(
ev *gomatrixserverlib.Event, roomID string, userIDs []string,
posUpdate types.SyncPosition,
) {
// update the current position then notify relevant /sync streams. // update the current position then notify relevant /sync streams.
// This needs to be done PRIOR to waking up users as they will read this value. // This needs to be done PRIOR to waking up users as they will read this value.
n.streamLock.Lock() n.streamLock.Lock()
@ -109,7 +114,9 @@ func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, roomID string, userID
} else if len(userIDs) > 0 { } else if len(userIDs) > 0 {
n.wakeupUsers(userIDs, latestPos) n.wakeupUsers(userIDs, latestPos)
} else { } else {
log.Warn("WARNING: Notifier.OnNewEvent called but caller supplied no user to wake up") log.WithFields(log.Fields{
"posUpdate": posUpdate.String,
}).Warn("Notifier.OnNewEvent called but caller supplied no user to wake up")
} }
} }
@ -143,7 +150,7 @@ func (n *Notifier) Load(ctx context.Context, db *storage.SyncServerDatasource) e
return nil return nil
} }
// CurrentPosition returns the current stream position // CurrentPosition returns the current sync position
func (n *Notifier) CurrentPosition() types.SyncPosition { func (n *Notifier) CurrentPosition() types.SyncPosition {
n.streamLock.Lock() n.streamLock.Lock()
defer n.streamLock.Unlock() defer n.streamLock.Unlock()
@ -168,17 +175,12 @@ func (n *Notifier) setUsersJoinedToRooms(roomIDToUserIDs map[string][]string) {
func (n *Notifier) wakeupUsers(userIDs []string, newPos types.SyncPosition) { func (n *Notifier) wakeupUsers(userIDs []string, newPos types.SyncPosition) {
for _, userID := range userIDs { for _, userID := range userIDs {
n.wakeupUser(userID, newPos)
}
}
func (n *Notifier) wakeupUser(userID string, newPos types.SyncPosition) {
stream := n.fetchUserStream(userID, false) stream := n.fetchUserStream(userID, false)
if stream == nil { if stream != nil {
return
}
stream.Broadcast(newPos) // wake up all goroutines Wait()ing on this stream stream.Broadcast(newPos) // wake up all goroutines Wait()ing on this stream
} }
}
}
// fetchUserStream retrieves a stream unique to the given user. If makeIfNotExists is true, // fetchUserStream retrieves a stream unique to the given user. If makeIfNotExists is true,
// a stream will be made for this user if one doesn't exist and it will be returned. This // a stream will be made for this user if one doesn't exist and it will be returned. This

View file

@ -315,7 +315,7 @@ func waitForEvents(n *Notifier, req syncRequest) (types.SyncPosition, error) {
"waitForEvents timed out waiting for %s (pos=%d)", req.device.UserID, req.since, "waitForEvents timed out waiting for %s (pos=%d)", req.device.UserID, req.since,
) )
case <-listener.GetNotifyChannel(*req.since): case <-listener.GetNotifyChannel(*req.since):
p := listener.GetStreamPosition() p := listener.GetSyncPosition()
return p, nil return p, nil
} }
} }

View file

@ -75,7 +75,7 @@ func getTimeout(timeoutMS string) time.Duration {
} }
// getSyncStreamPosition tries to parse a 'since' token taken from the API to a // getSyncStreamPosition tries to parse a 'since' token taken from the API to a
// stream position. If the string is empty then (nil, nil) is returned. // types.SyncPosition. If the string is empty then (nil, nil) is returned.
// There are two forms of tokens: The full length form containing all PDU and EDU // There are two forms of tokens: The full length form containing all PDU and EDU
// positions separated by "_", and the short form containing only the PDU // positions separated by "_", and the short form containing only the PDU
// position. Short form can be used for, e.g., `prev_batch` tokens. // position. Short form can be used for, e.g., `prev_batch` tokens.
@ -86,7 +86,7 @@ func getSyncStreamPosition(since string) (*types.SyncPosition, error) {
posStrings := strings.Split(since, "_") posStrings := strings.Split(since, "_")
if len(posStrings) != 2 && len(posStrings) != 1 { if len(posStrings) != 2 && len(posStrings) != 1 {
// A token can either be a full length one or a short (PDU-only) one // A token can either be full length or short (PDU-only).
return nil, errors.New("malformed batch token") return nil, errors.New("malformed batch token")
} }

View file

@ -97,7 +97,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
select { select {
// Wait for notifier to wake us up // Wait for notifier to wake us up
case <-userStreamListener.GetNotifyChannel(sincePos): case <-userStreamListener.GetNotifyChannel(sincePos):
currPos = userStreamListener.GetStreamPosition() currPos = userStreamListener.GetSyncPosition()
sincePos = currPos sincePos = currPos
// Or for timeout to expire // Or for timeout to expire
case <-timer.C: case <-timer.C:

View file

@ -84,7 +84,7 @@ func (s *UserStream) GetListener(ctx context.Context) UserStreamListener {
return listener return listener
} }
// Broadcast a new stream position for this user. // Broadcast a new sync position for this user.
func (s *UserStream) Broadcast(pos types.SyncPosition) { func (s *UserStream) Broadcast(pos types.SyncPosition) {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()
@ -118,9 +118,9 @@ func (s *UserStream) TimeOfLastNonEmpty() time.Time {
return s.timeOfLastChannel return s.timeOfLastChannel
} }
// GetStreamPosition returns last stream position which the UserStream was // GetStreamPosition returns last sync position which the UserStream was
// notified about // notified about
func (s *UserStreamListener) GetStreamPosition() types.SyncPosition { func (s *UserStreamListener) GetSyncPosition() types.SyncPosition {
s.userStream.lock.Lock() s.userStream.lock.Lock()
defer s.userStream.lock.Unlock() defer s.userStream.lock.Unlock()

View file

@ -45,7 +45,7 @@ func SetupSyncAPIComponent(
pos, err := syncDB.SyncPosition(context.Background()) pos, err := syncDB.SyncPosition(context.Background())
if err != nil { if err != nil {
logrus.WithError(err).Panicf("failed to get stream position") logrus.WithError(err).Panicf("failed to get sync position")
} }
notifier := sync.NewNotifier(pos) notifier := sync.NewNotifier(pos)

View file

@ -25,7 +25,7 @@ import (
type SyncPosition struct { type SyncPosition struct {
// PDUPosition is the stream position for PDUs the client is at. // PDUPosition is the stream position for PDUs the client is at.
PDUPosition int64 PDUPosition int64
// TypingPosition is the position for typing notifications the client is at. // TypingPosition is the client's position for typing notifications.
TypingPosition int64 TypingPosition int64
} }
@ -35,15 +35,15 @@ func (sp SyncPosition) String() string {
strconv.FormatInt(sp.TypingPosition, 10) strconv.FormatInt(sp.TypingPosition, 10)
} }
// IsAfter returns whether sp refers to states newer than other // IsAfter returns whether one SyncPosition refers to states newer than another SyncPosition.
func (sp SyncPosition) IsAfter(other SyncPosition) bool { func (sp SyncPosition) IsAfter(other SyncPosition) bool {
return sp.PDUPosition > other.PDUPosition || return sp.PDUPosition > other.PDUPosition ||
sp.TypingPosition > other.TypingPosition sp.TypingPosition > other.TypingPosition
} }
// WithUpdates returns a copy of `sp` with updates represented by `other` applied. // WithUpdates returns a copy of the SyncPosition with updates applied from another SyncPosition.
// If a field is not 0 in `other`, it is considered an update and its value // If the latter SyncPosition contains a field that is not 0, it is considered an update,
// will replace the corresponding value in sp. // and its value will replace the corresponding value in the SyncPosition on which WithUpdates is called.
func (sp SyncPosition) WithUpdates(other SyncPosition) SyncPosition { func (sp SyncPosition) WithUpdates(other SyncPosition) SyncPosition {
ret := sp ret := sp
if other.PDUPosition != 0 { if other.PDUPosition != 0 {