mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-07 15:03:09 -06:00
Review comments
This commit is contained in:
parent
ee7099f67f
commit
13f8237c54
|
|
@ -2,6 +2,7 @@ package jsonerror
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -64,12 +65,6 @@ func UnknownToken(msg string) *MatrixError {
|
||||||
return &MatrixError{"M_UNKNOWN_TOKEN", msg}
|
return &MatrixError{"M_UNKNOWN_TOKEN", msg}
|
||||||
}
|
}
|
||||||
|
|
||||||
// InvalidSync is an error when the client tries to hit /sync with an invalid
|
|
||||||
// ?since= parameter.
|
|
||||||
func InvalidSync(msg string) *MatrixError {
|
|
||||||
return &MatrixError{"M_BAD_SYNC", msg}
|
|
||||||
}
|
|
||||||
|
|
||||||
// LimitExceededError is a rate-limiting error.
|
// LimitExceededError is a rate-limiting error.
|
||||||
type LimitExceededError struct {
|
type LimitExceededError struct {
|
||||||
MatrixError
|
MatrixError
|
||||||
|
|
|
||||||
|
|
@ -56,7 +56,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request) util.JSONRespons
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return util.JSONResponse{
|
return util.JSONResponse{
|
||||||
Code: 400,
|
Code: 400,
|
||||||
JSON: jsonerror.InvalidSync(err.Error()),
|
JSON: jsonerror.Unknown(err.Error()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
timeout := getTimeout(req.URL.Query().Get("timeout"))
|
timeout := getTimeout(req.URL.Query().Get("timeout"))
|
||||||
|
|
@ -112,8 +112,8 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request) util.JSONRespons
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnNewEvent is called when a new event is received from the room server. Must only be
|
// OnNewEvent is called when a new event is received from the room server. Must only be
|
||||||
// called from a single goroutine, or else the current position in the stream may be
|
// called from a single goroutine, to avoid races between updates which could set the
|
||||||
// set incorrectly as it is blindly clobbered.
|
// current position in the stream incorrectly.
|
||||||
func (rp *RequestPool) OnNewEvent(ev *gomatrixserverlib.Event, pos syncStreamPosition) {
|
func (rp *RequestPool) OnNewEvent(ev *gomatrixserverlib.Event, pos syncStreamPosition) {
|
||||||
// update the current position in a guard and then notify all /sync streams
|
// update the current position in a guard and then notify all /sync streams
|
||||||
rp.cond.L.Lock()
|
rp.cond.L.Lock()
|
||||||
|
|
|
||||||
|
|
@ -67,8 +67,8 @@ func (c *ContinualConsumer) Start() error {
|
||||||
}
|
}
|
||||||
for _, offset := range storedOffsets {
|
for _, offset := range storedOffsets {
|
||||||
// We've already processed events from this partition so advance the offset to where we got to.
|
// We've already processed events from this partition so advance the offset to where we got to.
|
||||||
// Offsets are provided with each message, so if we use the same offset on startup then we'll
|
// ConsumePartition will start streaming from the message with the given offset (inclusive),
|
||||||
// get the same message a 2nd time, so increment 1 to indicate the next offset.
|
// so increment 1 to avoid getting the same message a second time.
|
||||||
offsets[offset.Partition] = 1 + offset.Offset
|
offsets[offset.Partition] = 1 + offset.Offset
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue