From 405b0914af7ad6e5ebafab1242a0e472fa6162fa Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 10 Apr 2017 16:17:25 +0100 Subject: [PATCH] Move syncStreamPosition to a syncapi package --- .../dendrite/clientapi/storage/syncserver.go | 14 +++++++++---- .../dendrite/clientapi/sync/requestpool.go | 21 ++++++++++--------- .../clientapi/sync/syncapi/syncapi.go | 4 ++++ .../dendrite/clientapi/sync/syncserver.go | 6 ++---- 4 files changed, 27 insertions(+), 18 deletions(-) create mode 100644 src/github.com/matrix-org/dendrite/clientapi/sync/syncapi/syncapi.go diff --git a/src/github.com/matrix-org/dendrite/clientapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/clientapi/storage/syncserver.go index e63680aed..b00b508fe 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/clientapi/storage/syncserver.go @@ -4,6 +4,7 @@ import ( "database/sql" // Import the postgres database driver. _ "github.com/lib/pq" + "github.com/matrix-org/dendrite/clientapi/sync/syncapi" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/gomatrixserverlib" ) @@ -41,13 +42,14 @@ func NewSyncServerDatabase(dataSourceName string) (*SyncServerDatabase, error) { // WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races // when generating the stream position for this event. Returns the sync stream position for the inserted event. // Returns an error if there was a problem inserting this event. -func (d *SyncServerDatabase) WriteEvent(ev *gomatrixserverlib.Event, addStateEventIDs, removeStateEventIDs []string) (streamPos int64, returnErr error) { +func (d *SyncServerDatabase) WriteEvent(ev *gomatrixserverlib.Event, addStateEventIDs, removeStateEventIDs []string) (streamPos syncapi.StreamPosition, returnErr error) { returnErr = runTransaction(d.db, func(txn *sql.Tx) error { var err error - streamPos, err = d.events.InsertEvent(txn, ev, addStateEventIDs, removeStateEventIDs) + pos, err := d.events.InsertEvent(txn, ev, addStateEventIDs, removeStateEventIDs) if err != nil { return err } + streamPos = syncapi.StreamPosition(pos) if len(addStateEventIDs) == 0 && len(removeStateEventIDs) == 0 { // Nothing to do, the event may have just been a message event. @@ -86,8 +88,12 @@ func (d *SyncServerDatabase) SetPartitionOffset(topic string, partition int32, o } // SyncStreamPosition returns the latest position in the sync stream. Returns 0 if there are no events yet. -func (d *SyncServerDatabase) SyncStreamPosition() (int64, error) { - return d.events.MaxID() +func (d *SyncServerDatabase) SyncStreamPosition() (syncapi.StreamPosition, error) { + id, err := d.events.MaxID() + if err != nil { + return syncapi.StreamPosition(0), err + } + return syncapi.StreamPosition(id), nil } // EventsInRange returns all events in the given range, exclusive of oldPos, inclusive of newPos. diff --git a/src/github.com/matrix-org/dendrite/clientapi/sync/requestpool.go b/src/github.com/matrix-org/dendrite/clientapi/sync/requestpool.go index 4b4adc517..90b142b22 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/sync/requestpool.go +++ b/src/github.com/matrix-org/dendrite/clientapi/sync/requestpool.go @@ -11,6 +11,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/storage" + "github.com/matrix-org/dendrite/clientapi/sync/syncapi" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" ) @@ -20,7 +21,7 @@ const defaultSyncTimeout = time.Duration(30) * time.Second type syncRequest struct { userID string timeout time.Duration - since syncStreamPosition + since syncapi.StreamPosition wantFullState bool } @@ -28,7 +29,7 @@ type syncRequest struct { type RequestPool struct { db *storage.SyncServerDatabase // The latest sync stream position: guarded by 'cond'. - currPos syncStreamPosition + currPos syncapi.StreamPosition // A condition variable to notify all waiting goroutines of a new sync stream position cond *sync.Cond } @@ -39,7 +40,7 @@ func NewRequestPool(db *storage.SyncServerDatabase) (*RequestPool, error) { if err != nil { return nil, err } - return &RequestPool{db, syncStreamPosition(pos), sync.NewCond(&sync.Mutex{})}, nil + return &RequestPool{db, syncapi.StreamPosition(pos), sync.NewCond(&sync.Mutex{})}, nil } // OnIncomingSyncRequest is called when a client makes a /sync request. This function MUST be @@ -114,7 +115,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request) util.JSONRespons // OnNewEvent is called when a new event is received from the room server. Must only be // called from a single goroutine, to avoid races between updates which could set the // current position in the stream incorrectly. -func (rp *RequestPool) OnNewEvent(ev *gomatrixserverlib.Event, pos syncStreamPosition) { +func (rp *RequestPool) OnNewEvent(ev *gomatrixserverlib.Event, pos syncapi.StreamPosition) { // update the current position in a guard and then notify all /sync streams rp.cond.L.Lock() rp.currPos = pos @@ -123,7 +124,7 @@ func (rp *RequestPool) OnNewEvent(ev *gomatrixserverlib.Event, pos syncStreamPos rp.cond.Broadcast() // notify ALL waiting goroutines } -func (rp *RequestPool) waitForEvents(req syncRequest) syncStreamPosition { +func (rp *RequestPool) waitForEvents(req syncRequest) syncapi.StreamPosition { // In a guard, check if the /sync request should block, and block it until we get a new position rp.cond.L.Lock() currentPos := rp.currPos @@ -149,7 +150,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest) ([]gomatrixserverlib. // a) Check if the room is 'newly joined' (insufficient to just check for a join event because we allow dupe joins). // If it is, then we need to send the full room state down (and 'limited' is always true). // b) Check if user is still CURRENTLY invited to the room. If so, add room to 'invited' block. - // c) Check if the user is CURRENTLY left/banned. If so, add room to 'archived' block. // This has a TODO: How do we handle ban -> leave in same batch? + // c) Check if the user is CURRENTLY left/banned. If so, add room to 'archived' block. // Synapse has a TODO: How do we handle ban -> leave in same batch? // 4) Add joined rooms (joined room list) return rp.db.EventsInRange(int64(req.since), int64(currentPos)) @@ -166,13 +167,13 @@ func getTimeout(timeoutMS string) time.Duration { return time.Duration(i) * time.Millisecond } -func getSyncStreamPosition(since string) (syncStreamPosition, error) { +func getSyncStreamPosition(since string) (syncapi.StreamPosition, error) { if since == "" { - return syncStreamPosition(0), nil + return syncapi.StreamPosition(0), nil } i, err := strconv.Atoi(since) if err != nil { - return syncStreamPosition(0), err + return syncapi.StreamPosition(0), err } - return syncStreamPosition(i), nil + return syncapi.StreamPosition(i), nil } diff --git a/src/github.com/matrix-org/dendrite/clientapi/sync/syncapi/syncapi.go b/src/github.com/matrix-org/dendrite/clientapi/sync/syncapi/syncapi.go new file mode 100644 index 000000000..6dd0a9176 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/clientapi/sync/syncapi/syncapi.go @@ -0,0 +1,4 @@ +package syncapi + +// StreamPosition represents the offset in the sync stream a client is at. +type StreamPosition int64 diff --git a/src/github.com/matrix-org/dendrite/clientapi/sync/syncserver.go b/src/github.com/matrix-org/dendrite/clientapi/sync/syncserver.go index 892c163dd..142b6f117 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/sync/syncserver.go +++ b/src/github.com/matrix-org/dendrite/clientapi/sync/syncserver.go @@ -6,15 +6,13 @@ import ( log "github.com/Sirupsen/logrus" "github.com/matrix-org/dendrite/clientapi/config" "github.com/matrix-org/dendrite/clientapi/storage" + "github.com/matrix-org/dendrite/clientapi/sync/syncapi" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" sarama "gopkg.in/Shopify/sarama.v1" ) -// syncStreamPosition represents the offset in the sync stream a client is at. -type syncStreamPosition int64 - // Server contains all the logic for running a sync server type Server struct { roomServerConsumer *common.ContinualConsumer @@ -83,7 +81,7 @@ func (s *Server) onMessage(msg *sarama.ConsumerMessage) error { }).Panicf("roomserver output log: write event failure") return nil } - s.rp.OnNewEvent(&ev, syncStreamPosition(syncStreamPos)) + s.rp.OnNewEvent(&ev, syncapi.StreamPosition(syncStreamPos)) return nil }