diff --git a/src/github.com/matrix-org/dendrite/clientapi/jsonerror/jsonerror.go b/src/github.com/matrix-org/dendrite/clientapi/jsonerror/jsonerror.go index b50759a6c..0cc432a97 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/jsonerror/jsonerror.go +++ b/src/github.com/matrix-org/dendrite/clientapi/jsonerror/jsonerror.go @@ -2,6 +2,7 @@ package jsonerror import ( "fmt" + "github.com/matrix-org/util" ) @@ -64,12 +65,6 @@ func UnknownToken(msg string) *MatrixError { return &MatrixError{"M_UNKNOWN_TOKEN", msg} } -// InvalidSync is an error when the client tries to hit /sync with an invalid -// ?since= parameter. -func InvalidSync(msg string) *MatrixError { - return &MatrixError{"M_BAD_SYNC", msg} -} - // LimitExceededError is a rate-limiting error. type LimitExceededError struct { MatrixError diff --git a/src/github.com/matrix-org/dendrite/clientapi/sync/requestpool.go b/src/github.com/matrix-org/dendrite/clientapi/sync/requestpool.go index 83370cd66..051e86636 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/sync/requestpool.go +++ b/src/github.com/matrix-org/dendrite/clientapi/sync/requestpool.go @@ -56,7 +56,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request) util.JSONRespons if err != nil { return util.JSONResponse{ Code: 400, - JSON: jsonerror.InvalidSync(err.Error()), + JSON: jsonerror.Unknown(err.Error()), } } timeout := getTimeout(req.URL.Query().Get("timeout")) @@ -112,8 +112,8 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request) util.JSONRespons } // OnNewEvent is called when a new event is received from the room server. Must only be -// called from a single goroutine, or else the current position in the stream may be -// set incorrectly as it is blindly clobbered. +// called from a single goroutine, to avoid races between updates which could set the +// current position in the stream incorrectly. func (rp *RequestPool) OnNewEvent(ev *gomatrixserverlib.Event, pos syncStreamPosition) { // update the current position in a guard and then notify all /sync streams rp.cond.L.Lock() diff --git a/src/github.com/matrix-org/dendrite/common/consumers.go b/src/github.com/matrix-org/dendrite/common/consumers.go index bf7a64145..caeeabca4 100644 --- a/src/github.com/matrix-org/dendrite/common/consumers.go +++ b/src/github.com/matrix-org/dendrite/common/consumers.go @@ -67,8 +67,8 @@ func (c *ContinualConsumer) Start() error { } for _, offset := range storedOffsets { // We've already processed events from this partition so advance the offset to where we got to. - // Offsets are provided with each message, so if we use the same offset on startup then we'll - // get the same message a 2nd time, so increment 1 to indicate the next offset. + // ConsumePartition will start streaming from the message with the given offset (inclusive), + // so increment 1 to avoid getting the same message a second time. offsets[offset.Partition] = 1 + offset.Offset }