Comments from review comments

This commit is contained in:
Kegan Dougal 2017-04-10 14:48:36 +01:00
parent 6a544ce34b
commit ee7099f67f
3 changed files with 8 additions and 4 deletions

View file

@ -70,7 +70,7 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) {
}
// MaxID returns the ID of the last inserted event in this table. This should only ever be used at startup, as it will
// race with inserting events if it is done afterwards.
// race with inserting events if it is done afterwards. If there are no inserted events, 0 is returned.
func (s *outputRoomEventsStatements) MaxID() (id int64, err error) {
var nullableID sql.NullInt64
err = s.selectMaxIDStmt.QueryRow().Scan(&nullableID)
@ -80,6 +80,8 @@ func (s *outputRoomEventsStatements) MaxID() (id int64, err error) {
return
}
// InRange returns all the events in the range between oldPos exclusive and newPos inclusive. Returns an empty array if
// there are no events between the provided range. Returns an error if events are missing in the range.
func (s *outputRoomEventsStatements) InRange(oldPos, newPos int64) ([]gomatrixserverlib.Event, error) {
rows, err := s.selectEventsInRangeStmt.Query(oldPos, newPos)
if err != nil {
@ -108,7 +110,8 @@ func (s *outputRoomEventsStatements) InRange(oldPos, newPos int64) ([]gomatrixse
return result, nil
}
// InsertEvent into the output_room_events table. addState and removeState are an optional list of state event IDs.
// InsertEvent into the output_room_events table. addState and removeState are an optional list of state event IDs. Returns the position
// of the inserted event.
func (s *outputRoomEventsStatements) InsertEvent(txn *sql.Tx, event *gomatrixserverlib.Event, addState, removeState []string) (streamPos int64, err error) {
err = txn.Stmt(s.insertEventStmt).QueryRow(
event.RoomID(), event.EventID(), event.JSON(), pq.StringArray(addState), pq.StringArray(removeState),

View file

@ -85,7 +85,7 @@ func (d *SyncServerDatabase) SetPartitionOffset(topic string, partition int32, o
return d.partitions.UpsertPartitionOffset(topic, partition, offset)
}
// SyncStreamPosition returns the latest position in the sync stream
// SyncStreamPosition returns the latest position in the sync stream. Returns 0 if there are no events yet.
func (d *SyncServerDatabase) SyncStreamPosition() (int64, error) {
return d.events.MaxID()
}

View file

@ -112,7 +112,8 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request) util.JSONRespons
}
// OnNewEvent is called when a new event is received from the room server. Must only be
// called from a single goroutine.
// called from a single goroutine, or else the current position in the stream may be
// set incorrectly as it is blindly clobbered.
func (rp *RequestPool) OnNewEvent(ev *gomatrixserverlib.Event, pos syncStreamPosition) {
// update the current position in a guard and then notify all /sync streams
rp.cond.L.Lock()