diff --git a/src/github.com/matrix-org/dendrite/clientapi/storage/output_room_events_table.go b/src/github.com/matrix-org/dendrite/clientapi/storage/output_room_events_table.go index 5858891fb..61dab2fb4 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/storage/output_room_events_table.go +++ b/src/github.com/matrix-org/dendrite/clientapi/storage/output_room_events_table.go @@ -70,7 +70,7 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) { } // MaxID returns the ID of the last inserted event in this table. This should only ever be used at startup, as it will -// race with inserting events if it is done afterwards. +// race with inserting events if it is done afterwards. If there are no inserted events, 0 is returned. func (s *outputRoomEventsStatements) MaxID() (id int64, err error) { var nullableID sql.NullInt64 err = s.selectMaxIDStmt.QueryRow().Scan(&nullableID) @@ -80,6 +80,8 @@ func (s *outputRoomEventsStatements) MaxID() (id int64, err error) { return } +// InRange returns all the events in the range between oldPos exclusive and newPos inclusive. Returns an empty array if +// there are no events between the provided range. Returns an error if events are missing in the range. func (s *outputRoomEventsStatements) InRange(oldPos, newPos int64) ([]gomatrixserverlib.Event, error) { rows, err := s.selectEventsInRangeStmt.Query(oldPos, newPos) if err != nil { @@ -108,7 +110,8 @@ func (s *outputRoomEventsStatements) InRange(oldPos, newPos int64) ([]gomatrixse return result, nil } -// InsertEvent into the output_room_events table. addState and removeState are an optional list of state event IDs. +// InsertEvent into the output_room_events table. addState and removeState are an optional list of state event IDs. Returns the position +// of the inserted event. func (s *outputRoomEventsStatements) InsertEvent(txn *sql.Tx, event *gomatrixserverlib.Event, addState, removeState []string) (streamPos int64, err error) { err = txn.Stmt(s.insertEventStmt).QueryRow( event.RoomID(), event.EventID(), event.JSON(), pq.StringArray(addState), pq.StringArray(removeState), diff --git a/src/github.com/matrix-org/dendrite/clientapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/clientapi/storage/syncserver.go index 6bb6b69c6..e63680aed 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/clientapi/storage/syncserver.go @@ -85,7 +85,7 @@ func (d *SyncServerDatabase) SetPartitionOffset(topic string, partition int32, o return d.partitions.UpsertPartitionOffset(topic, partition, offset) } -// SyncStreamPosition returns the latest position in the sync stream +// SyncStreamPosition returns the latest position in the sync stream. Returns 0 if there are no events yet. func (d *SyncServerDatabase) SyncStreamPosition() (int64, error) { return d.events.MaxID() } diff --git a/src/github.com/matrix-org/dendrite/clientapi/sync/requestpool.go b/src/github.com/matrix-org/dendrite/clientapi/sync/requestpool.go index 9bac41ef4..83370cd66 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/sync/requestpool.go +++ b/src/github.com/matrix-org/dendrite/clientapi/sync/requestpool.go @@ -112,7 +112,8 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request) util.JSONRespons } // OnNewEvent is called when a new event is received from the room server. Must only be -// called from a single goroutine. +// called from a single goroutine, or else the current position in the stream may be +// set incorrectly as it is blindly clobbered. func (rp *RequestPool) OnNewEvent(ev *gomatrixserverlib.Event, pos syncStreamPosition) { // update the current position in a guard and then notify all /sync streams rp.cond.L.Lock()