mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-07 06:53:09 -06:00
Query the max id at startup
This commit is contained in:
parent
2b5c0714d7
commit
a1c2a3e3e1
|
|
@ -64,6 +64,12 @@ func UnknownToken(msg string) *MatrixError {
|
||||||
return &MatrixError{"M_UNKNOWN_TOKEN", msg}
|
return &MatrixError{"M_UNKNOWN_TOKEN", msg}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// InvalidSync is an error when the client tries to hit /sync with an invalid
|
||||||
|
// ?since= parameter.
|
||||||
|
func InvalidSync(msg string) *MatrixError {
|
||||||
|
return &MatrixError{"M_BAD_SYNC", msg}
|
||||||
|
}
|
||||||
|
|
||||||
// LimitExceededError is a rate-limiting error.
|
// LimitExceededError is a rate-limiting error.
|
||||||
type LimitExceededError struct {
|
type LimitExceededError struct {
|
||||||
MatrixError
|
MatrixError
|
||||||
|
|
|
||||||
|
|
@ -49,7 +49,7 @@ func Setup(servMux *http.ServeMux, httpClient *http.Client, cfg config.ClientAPI
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetupSyncServerListeners configures the given mux with sync-server listeners
|
// SetupSyncServerListeners configures the given mux with sync-server listeners
|
||||||
func SetupSyncServerListeners(servMux *http.ServeMux, httpClient *http.Client, cfg config.Sync, srp sync.RequestPool) {
|
func SetupSyncServerListeners(servMux *http.ServeMux, httpClient *http.Client, cfg config.Sync, srp *sync.RequestPool) {
|
||||||
apiMux := mux.NewRouter()
|
apiMux := mux.NewRouter()
|
||||||
r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter()
|
r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter()
|
||||||
r0mux.Handle("/sync", make("sync", util.NewJSONRequestHandler(func(req *http.Request) util.JSONResponse {
|
r0mux.Handle("/sync", make("sync", util.NewJSONRequestHandler(func(req *http.Request) util.JSONResponse {
|
||||||
|
|
|
||||||
|
|
@ -36,9 +36,13 @@ const insertEventSQL = "" +
|
||||||
const selectEventsSQL = "" +
|
const selectEventsSQL = "" +
|
||||||
"SELECT event_json FROM output_room_events WHERE event_id = ANY($1)"
|
"SELECT event_json FROM output_room_events WHERE event_id = ANY($1)"
|
||||||
|
|
||||||
|
const selectMaxIDSQL = "" +
|
||||||
|
"SELECT MAX(id) FROM output_room_events"
|
||||||
|
|
||||||
type outputRoomEventsStatements struct {
|
type outputRoomEventsStatements struct {
|
||||||
insertEventStmt *sql.Stmt
|
insertEventStmt *sql.Stmt
|
||||||
selectEventsStmt *sql.Stmt
|
selectEventsStmt *sql.Stmt
|
||||||
|
selectMaxIDStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) {
|
func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) {
|
||||||
|
|
@ -52,6 +56,16 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) {
|
||||||
if s.selectEventsStmt, err = db.Prepare(selectEventsSQL); err != nil {
|
if s.selectEventsStmt, err = db.Prepare(selectEventsSQL); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if s.selectMaxIDStmt, err = db.Prepare(selectMaxIDSQL); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// MaxID returns the ID of the last inserted event in this table. This should only ever be used at startup, as it will
|
||||||
|
// race with inserting events if it is done afterwards.
|
||||||
|
func (s *outputRoomEventsStatements) MaxID() (id int64, err error) {
|
||||||
|
err = s.selectMaxIDStmt.QueryRow().Scan(&id)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -85,6 +85,11 @@ func (d *SyncServerDatabase) SetPartitionOffset(topic string, partition int32, o
|
||||||
return d.partitions.UpsertPartitionOffset(topic, partition, offset)
|
return d.partitions.UpsertPartitionOffset(topic, partition, offset)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SyncStreamPosition returns the latest position in the sync stream
|
||||||
|
func (d *SyncServerDatabase) SyncStreamPosition() (int64, error) {
|
||||||
|
return d.events.MaxID()
|
||||||
|
}
|
||||||
|
|
||||||
func runTransaction(db *sql.DB, fn func(txn *sql.Tx) error) (err error) {
|
func runTransaction(db *sql.DB, fn func(txn *sql.Tx) error) (err error) {
|
||||||
txn, err := db.Begin()
|
txn, err := db.Begin()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
||||||
|
|
@ -9,6 +9,7 @@ import (
|
||||||
log "github.com/Sirupsen/logrus"
|
log "github.com/Sirupsen/logrus"
|
||||||
"github.com/matrix-org/dendrite/clientapi/auth"
|
"github.com/matrix-org/dendrite/clientapi/auth"
|
||||||
"github.com/matrix-org/dendrite/clientapi/httputil"
|
"github.com/matrix-org/dendrite/clientapi/httputil"
|
||||||
|
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||||
"github.com/matrix-org/dendrite/clientapi/storage"
|
"github.com/matrix-org/dendrite/clientapi/storage"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
|
|
@ -19,13 +20,14 @@ const defaultSyncTimeout = time.Duration(30) * time.Second
|
||||||
type syncRequest struct {
|
type syncRequest struct {
|
||||||
userID string
|
userID string
|
||||||
timeout time.Duration
|
timeout time.Duration
|
||||||
since string
|
since syncStreamPosition
|
||||||
wantFullState bool
|
wantFullState bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// RequestPool manages HTTP long-poll connections for /sync
|
// RequestPool manages HTTP long-poll connections for /sync
|
||||||
type RequestPool struct {
|
type RequestPool struct {
|
||||||
db *storage.SyncServerDatabase
|
db *storage.SyncServerDatabase
|
||||||
|
currPos syncStreamPosition
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnIncomingSyncRequest is called when a client makes a /sync request. This function MUST be
|
// OnIncomingSyncRequest is called when a client makes a /sync request. This function MUST be
|
||||||
|
|
@ -38,7 +40,13 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request) util.JSONRespons
|
||||||
if resErr != nil {
|
if resErr != nil {
|
||||||
return *resErr
|
return *resErr
|
||||||
}
|
}
|
||||||
since := req.URL.Query().Get("since")
|
since, err := getSyncStreamPosition(req.URL.Query().Get("since"))
|
||||||
|
if err != nil {
|
||||||
|
return util.JSONResponse{
|
||||||
|
Code: 400,
|
||||||
|
JSON: jsonerror.InvalidSync(err.Error()),
|
||||||
|
}
|
||||||
|
}
|
||||||
timeout := getTimeout(req.URL.Query().Get("timeout"))
|
timeout := getTimeout(req.URL.Query().Get("timeout"))
|
||||||
fullState := req.URL.Query().Get("full_state")
|
fullState := req.URL.Query().Get("full_state")
|
||||||
wantFullState := fullState != "" && fullState != "false"
|
wantFullState := fullState != "" && fullState != "false"
|
||||||
|
|
@ -66,15 +74,16 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request) util.JSONRespons
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnNewEvent is called when a new event is received from the room server
|
// OnNewEvent is called when a new event is received from the room server
|
||||||
func (rp *RequestPool) OnNewEvent(ev *gomatrixserverlib.Event, syncStreamPos int64) {
|
func (rp *RequestPool) OnNewEvent(ev *gomatrixserverlib.Event, pos syncStreamPosition) {
|
||||||
fmt.Println("OnNewEvent =>", ev.EventID(), syncStreamPos)
|
fmt.Println("OnNewEvent =>", ev.EventID(), " pos=", pos, " old_pos=", rp.currPos)
|
||||||
|
rp.currPos = pos
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rp *RequestPool) currentSyncForUser(req syncRequest) ([]gomatrixserverlib.Event, error) {
|
func (rp *RequestPool) currentSyncForUser(req syncRequest) ([]gomatrixserverlib.Event, error) {
|
||||||
// https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L179
|
// https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L179
|
||||||
// Check if we are going to return immediately and if so, calculate the current
|
// Check if we are going to return immediately and if so, calculate the current
|
||||||
// sync for this user and return.
|
// sync for this user and return.
|
||||||
if req.since == "" || req.timeout == time.Duration(0) || req.wantFullState {
|
if req.since == 0 || req.timeout == time.Duration(0) || req.wantFullState {
|
||||||
return []gomatrixserverlib.Event{}, nil
|
return []gomatrixserverlib.Event{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -96,7 +105,22 @@ func getTimeout(timeoutMS string) time.Duration {
|
||||||
return time.Duration(i) * time.Millisecond
|
return time.Duration(i) * time.Millisecond
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewRequestPool makes a new RequestPool
|
func getSyncStreamPosition(since string) (syncStreamPosition, error) {
|
||||||
func NewRequestPool(db *storage.SyncServerDatabase) RequestPool {
|
if since == "" {
|
||||||
return RequestPool{db}
|
return syncStreamPosition(0), nil
|
||||||
|
}
|
||||||
|
i, err := strconv.Atoi(since)
|
||||||
|
if err != nil {
|
||||||
|
return syncStreamPosition(0), err
|
||||||
|
}
|
||||||
|
return syncStreamPosition(i), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewRequestPool makes a new RequestPool
|
||||||
|
func NewRequestPool(db *storage.SyncServerDatabase) (*RequestPool, error) {
|
||||||
|
pos, err := db.SyncStreamPosition()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &RequestPool{db, syncStreamPosition(pos)}, nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -12,15 +12,18 @@ import (
|
||||||
sarama "gopkg.in/Shopify/sarama.v1"
|
sarama "gopkg.in/Shopify/sarama.v1"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// syncStreamPosition represents the offset in the sync stream a client is at.
|
||||||
|
type syncStreamPosition int64
|
||||||
|
|
||||||
// Server contains all the logic for running a sync server
|
// Server contains all the logic for running a sync server
|
||||||
type Server struct {
|
type Server struct {
|
||||||
roomServerConsumer *common.ContinualConsumer
|
roomServerConsumer *common.ContinualConsumer
|
||||||
db *storage.SyncServerDatabase
|
db *storage.SyncServerDatabase
|
||||||
rp RequestPool
|
rp *RequestPool
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewServer creates a new sync server. Call Start() to begin consuming from room servers.
|
// NewServer creates a new sync server. Call Start() to begin consuming from room servers.
|
||||||
func NewServer(cfg *config.Sync, rp RequestPool, store *storage.SyncServerDatabase) (*Server, error) {
|
func NewServer(cfg *config.Sync, rp *RequestPool, store *storage.SyncServerDatabase) (*Server, error) {
|
||||||
kafkaConsumer, err := sarama.NewConsumer(cfg.KafkaConsumerURIs, nil)
|
kafkaConsumer, err := sarama.NewConsumer(cfg.KafkaConsumerURIs, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
@ -77,7 +80,7 @@ func (s *Server) onMessage(msg *sarama.ConsumerMessage) error {
|
||||||
}).Panicf("roomserver output log: write event failure")
|
}).Panicf("roomserver output log: write event failure")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
s.rp.OnNewEvent(&ev, syncStreamPos)
|
s.rp.OnNewEvent(&ev, syncStreamPosition(syncStreamPos))
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -74,7 +74,10 @@ func main() {
|
||||||
log.Panicf("startup: failed to create sync server database with data source %s : %s", cfg.DataSource, err)
|
log.Panicf("startup: failed to create sync server database with data source %s : %s", cfg.DataSource, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
rp := sync.NewRequestPool(db)
|
rp, err := sync.NewRequestPool(db)
|
||||||
|
if err != nil {
|
||||||
|
log.Panicf("startup: Failed to create request pool : %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
server, err := sync.NewServer(cfg, rp, db)
|
server, err := sync.NewServer(cfg, rp, db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue