From a1c2a3e3e1b51be7fc2eae8eb470bd688be93618 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Thu, 6 Apr 2017 14:59:14 +0100 Subject: [PATCH] Query the max id at startup --- .../dendrite/clientapi/jsonerror/jsonerror.go | 6 +++ .../dendrite/clientapi/routing/routing.go | 2 +- .../storage/output_room_events_table.go | 14 +++++++ .../dendrite/clientapi/storage/syncserver.go | 5 +++ .../dendrite/clientapi/sync/requestpool.go | 42 +++++++++++++++---- .../dendrite/clientapi/sync/syncserver.go | 9 ++-- .../dendrite/cmd/dendrite-sync-server/main.go | 5 ++- 7 files changed, 69 insertions(+), 14 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/clientapi/jsonerror/jsonerror.go b/src/github.com/matrix-org/dendrite/clientapi/jsonerror/jsonerror.go index ea64896db..b50759a6c 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/jsonerror/jsonerror.go +++ b/src/github.com/matrix-org/dendrite/clientapi/jsonerror/jsonerror.go @@ -64,6 +64,12 @@ func UnknownToken(msg string) *MatrixError { return &MatrixError{"M_UNKNOWN_TOKEN", msg} } +// InvalidSync is an error when the client tries to hit /sync with an invalid +// ?since= parameter. +func InvalidSync(msg string) *MatrixError { + return &MatrixError{"M_BAD_SYNC", msg} +} + // LimitExceededError is a rate-limiting error. type LimitExceededError struct { MatrixError diff --git a/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go b/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go index 9679fab63..afb1b6a83 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go +++ b/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go @@ -49,7 +49,7 @@ func Setup(servMux *http.ServeMux, httpClient *http.Client, cfg config.ClientAPI } // SetupSyncServerListeners configures the given mux with sync-server listeners -func SetupSyncServerListeners(servMux *http.ServeMux, httpClient *http.Client, cfg config.Sync, srp sync.RequestPool) { +func SetupSyncServerListeners(servMux *http.ServeMux, httpClient *http.Client, cfg config.Sync, srp *sync.RequestPool) { apiMux := mux.NewRouter() r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter() r0mux.Handle("/sync", make("sync", util.NewJSONRequestHandler(func(req *http.Request) util.JSONResponse { diff --git a/src/github.com/matrix-org/dendrite/clientapi/storage/output_room_events_table.go b/src/github.com/matrix-org/dendrite/clientapi/storage/output_room_events_table.go index 2837f6884..bf799f609 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/storage/output_room_events_table.go +++ b/src/github.com/matrix-org/dendrite/clientapi/storage/output_room_events_table.go @@ -36,9 +36,13 @@ const insertEventSQL = "" + const selectEventsSQL = "" + "SELECT event_json FROM output_room_events WHERE event_id = ANY($1)" +const selectMaxIDSQL = "" + + "SELECT MAX(id) FROM output_room_events" + type outputRoomEventsStatements struct { insertEventStmt *sql.Stmt selectEventsStmt *sql.Stmt + selectMaxIDStmt *sql.Stmt } func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) { @@ -52,6 +56,16 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) { if s.selectEventsStmt, err = db.Prepare(selectEventsSQL); err != nil { return } + if s.selectMaxIDStmt, err = db.Prepare(selectMaxIDSQL); err != nil { + return + } + return +} + +// MaxID returns the ID of the last inserted event in this table. This should only ever be used at startup, as it will +// race with inserting events if it is done afterwards. +func (s *outputRoomEventsStatements) MaxID() (id int64, err error) { + err = s.selectMaxIDStmt.QueryRow().Scan(&id) return } diff --git a/src/github.com/matrix-org/dendrite/clientapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/clientapi/storage/syncserver.go index 9b74ae033..c9bff724b 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/clientapi/storage/syncserver.go @@ -85,6 +85,11 @@ func (d *SyncServerDatabase) SetPartitionOffset(topic string, partition int32, o return d.partitions.UpsertPartitionOffset(topic, partition, offset) } +// SyncStreamPosition returns the latest position in the sync stream +func (d *SyncServerDatabase) SyncStreamPosition() (int64, error) { + return d.events.MaxID() +} + func runTransaction(db *sql.DB, fn func(txn *sql.Tx) error) (err error) { txn, err := db.Begin() if err != nil { diff --git a/src/github.com/matrix-org/dendrite/clientapi/sync/requestpool.go b/src/github.com/matrix-org/dendrite/clientapi/sync/requestpool.go index aefa179a3..155840af6 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/sync/requestpool.go +++ b/src/github.com/matrix-org/dendrite/clientapi/sync/requestpool.go @@ -9,6 +9,7 @@ import ( log "github.com/Sirupsen/logrus" "github.com/matrix-org/dendrite/clientapi/auth" "github.com/matrix-org/dendrite/clientapi/httputil" + "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/storage" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" @@ -19,13 +20,14 @@ const defaultSyncTimeout = time.Duration(30) * time.Second type syncRequest struct { userID string timeout time.Duration - since string + since syncStreamPosition wantFullState bool } // RequestPool manages HTTP long-poll connections for /sync type RequestPool struct { - db *storage.SyncServerDatabase + db *storage.SyncServerDatabase + currPos syncStreamPosition } // OnIncomingSyncRequest is called when a client makes a /sync request. This function MUST be @@ -38,7 +40,13 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request) util.JSONRespons if resErr != nil { return *resErr } - since := req.URL.Query().Get("since") + since, err := getSyncStreamPosition(req.URL.Query().Get("since")) + if err != nil { + return util.JSONResponse{ + Code: 400, + JSON: jsonerror.InvalidSync(err.Error()), + } + } timeout := getTimeout(req.URL.Query().Get("timeout")) fullState := req.URL.Query().Get("full_state") wantFullState := fullState != "" && fullState != "false" @@ -66,15 +74,16 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request) util.JSONRespons } // OnNewEvent is called when a new event is received from the room server -func (rp *RequestPool) OnNewEvent(ev *gomatrixserverlib.Event, syncStreamPos int64) { - fmt.Println("OnNewEvent =>", ev.EventID(), syncStreamPos) +func (rp *RequestPool) OnNewEvent(ev *gomatrixserverlib.Event, pos syncStreamPosition) { + fmt.Println("OnNewEvent =>", ev.EventID(), " pos=", pos, " old_pos=", rp.currPos) + rp.currPos = pos } func (rp *RequestPool) currentSyncForUser(req syncRequest) ([]gomatrixserverlib.Event, error) { // https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L179 // Check if we are going to return immediately and if so, calculate the current // sync for this user and return. - if req.since == "" || req.timeout == time.Duration(0) || req.wantFullState { + if req.since == 0 || req.timeout == time.Duration(0) || req.wantFullState { return []gomatrixserverlib.Event{}, nil } @@ -96,7 +105,22 @@ func getTimeout(timeoutMS string) time.Duration { return time.Duration(i) * time.Millisecond } -// NewRequestPool makes a new RequestPool -func NewRequestPool(db *storage.SyncServerDatabase) RequestPool { - return RequestPool{db} +func getSyncStreamPosition(since string) (syncStreamPosition, error) { + if since == "" { + return syncStreamPosition(0), nil + } + i, err := strconv.Atoi(since) + if err != nil { + return syncStreamPosition(0), err + } + return syncStreamPosition(i), nil +} + +// NewRequestPool makes a new RequestPool +func NewRequestPool(db *storage.SyncServerDatabase) (*RequestPool, error) { + pos, err := db.SyncStreamPosition() + if err != nil { + return nil, err + } + return &RequestPool{db, syncStreamPosition(pos)}, nil } diff --git a/src/github.com/matrix-org/dendrite/clientapi/sync/syncserver.go b/src/github.com/matrix-org/dendrite/clientapi/sync/syncserver.go index 359b0a50a..9095a9d67 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/sync/syncserver.go +++ b/src/github.com/matrix-org/dendrite/clientapi/sync/syncserver.go @@ -12,15 +12,18 @@ import ( sarama "gopkg.in/Shopify/sarama.v1" ) +// syncStreamPosition represents the offset in the sync stream a client is at. +type syncStreamPosition int64 + // Server contains all the logic for running a sync server type Server struct { roomServerConsumer *common.ContinualConsumer db *storage.SyncServerDatabase - rp RequestPool + rp *RequestPool } // NewServer creates a new sync server. Call Start() to begin consuming from room servers. -func NewServer(cfg *config.Sync, rp RequestPool, store *storage.SyncServerDatabase) (*Server, error) { +func NewServer(cfg *config.Sync, rp *RequestPool, store *storage.SyncServerDatabase) (*Server, error) { kafkaConsumer, err := sarama.NewConsumer(cfg.KafkaConsumerURIs, nil) if err != nil { return nil, err @@ -77,7 +80,7 @@ func (s *Server) onMessage(msg *sarama.ConsumerMessage) error { }).Panicf("roomserver output log: write event failure") return nil } - s.rp.OnNewEvent(&ev, syncStreamPos) + s.rp.OnNewEvent(&ev, syncStreamPosition(syncStreamPos)) return nil } diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-server/main.go index e04797c28..106f8bf5a 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-server/main.go @@ -74,7 +74,10 @@ func main() { log.Panicf("startup: failed to create sync server database with data source %s : %s", cfg.DataSource, err) } - rp := sync.NewRequestPool(db) + rp, err := sync.NewRequestPool(db) + if err != nil { + log.Panicf("startup: Failed to create request pool : %s", err) + } server, err := sync.NewServer(cfg, rp, db) if err != nil {