Move syncStreamPosition to a syncapi package

This commit is contained in:
Kegan Dougal 2017-04-10 16:17:25 +01:00
parent d948f820fa
commit 405b0914af
4 changed files with 27 additions and 18 deletions

View file

@ -4,6 +4,7 @@ import (
"database/sql" "database/sql"
// Import the postgres database driver. // Import the postgres database driver.
_ "github.com/lib/pq" _ "github.com/lib/pq"
"github.com/matrix-org/dendrite/clientapi/sync/syncapi"
"github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
) )
@ -41,13 +42,14 @@ func NewSyncServerDatabase(dataSourceName string) (*SyncServerDatabase, error) {
// WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races // WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races
// when generating the stream position for this event. Returns the sync stream position for the inserted event. // when generating the stream position for this event. Returns the sync stream position for the inserted event.
// Returns an error if there was a problem inserting this event. // Returns an error if there was a problem inserting this event.
func (d *SyncServerDatabase) WriteEvent(ev *gomatrixserverlib.Event, addStateEventIDs, removeStateEventIDs []string) (streamPos int64, returnErr error) { func (d *SyncServerDatabase) WriteEvent(ev *gomatrixserverlib.Event, addStateEventIDs, removeStateEventIDs []string) (streamPos syncapi.StreamPosition, returnErr error) {
returnErr = runTransaction(d.db, func(txn *sql.Tx) error { returnErr = runTransaction(d.db, func(txn *sql.Tx) error {
var err error var err error
streamPos, err = d.events.InsertEvent(txn, ev, addStateEventIDs, removeStateEventIDs) pos, err := d.events.InsertEvent(txn, ev, addStateEventIDs, removeStateEventIDs)
if err != nil { if err != nil {
return err return err
} }
streamPos = syncapi.StreamPosition(pos)
if len(addStateEventIDs) == 0 && len(removeStateEventIDs) == 0 { if len(addStateEventIDs) == 0 && len(removeStateEventIDs) == 0 {
// Nothing to do, the event may have just been a message event. // Nothing to do, the event may have just been a message event.
@ -86,8 +88,12 @@ func (d *SyncServerDatabase) SetPartitionOffset(topic string, partition int32, o
} }
// SyncStreamPosition returns the latest position in the sync stream. Returns 0 if there are no events yet. // SyncStreamPosition returns the latest position in the sync stream. Returns 0 if there are no events yet.
func (d *SyncServerDatabase) SyncStreamPosition() (int64, error) { func (d *SyncServerDatabase) SyncStreamPosition() (syncapi.StreamPosition, error) {
return d.events.MaxID() id, err := d.events.MaxID()
if err != nil {
return syncapi.StreamPosition(0), err
}
return syncapi.StreamPosition(id), nil
} }
// EventsInRange returns all events in the given range, exclusive of oldPos, inclusive of newPos. // EventsInRange returns all events in the given range, exclusive of oldPos, inclusive of newPos.

View file

@ -11,6 +11,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/storage" "github.com/matrix-org/dendrite/clientapi/storage"
"github.com/matrix-org/dendrite/clientapi/sync/syncapi"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
) )
@ -20,7 +21,7 @@ const defaultSyncTimeout = time.Duration(30) * time.Second
type syncRequest struct { type syncRequest struct {
userID string userID string
timeout time.Duration timeout time.Duration
since syncStreamPosition since syncapi.StreamPosition
wantFullState bool wantFullState bool
} }
@ -28,7 +29,7 @@ type syncRequest struct {
type RequestPool struct { type RequestPool struct {
db *storage.SyncServerDatabase db *storage.SyncServerDatabase
// The latest sync stream position: guarded by 'cond'. // The latest sync stream position: guarded by 'cond'.
currPos syncStreamPosition currPos syncapi.StreamPosition
// A condition variable to notify all waiting goroutines of a new sync stream position // A condition variable to notify all waiting goroutines of a new sync stream position
cond *sync.Cond cond *sync.Cond
} }
@ -39,7 +40,7 @@ func NewRequestPool(db *storage.SyncServerDatabase) (*RequestPool, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &RequestPool{db, syncStreamPosition(pos), sync.NewCond(&sync.Mutex{})}, nil return &RequestPool{db, syncapi.StreamPosition(pos), sync.NewCond(&sync.Mutex{})}, nil
} }
// OnIncomingSyncRequest is called when a client makes a /sync request. This function MUST be // OnIncomingSyncRequest is called when a client makes a /sync request. This function MUST be
@ -114,7 +115,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request) util.JSONRespons
// OnNewEvent is called when a new event is received from the room server. Must only be // OnNewEvent is called when a new event is received from the room server. Must only be
// called from a single goroutine, to avoid races between updates which could set the // called from a single goroutine, to avoid races between updates which could set the
// current position in the stream incorrectly. // current position in the stream incorrectly.
func (rp *RequestPool) OnNewEvent(ev *gomatrixserverlib.Event, pos syncStreamPosition) { func (rp *RequestPool) OnNewEvent(ev *gomatrixserverlib.Event, pos syncapi.StreamPosition) {
// update the current position in a guard and then notify all /sync streams // update the current position in a guard and then notify all /sync streams
rp.cond.L.Lock() rp.cond.L.Lock()
rp.currPos = pos rp.currPos = pos
@ -123,7 +124,7 @@ func (rp *RequestPool) OnNewEvent(ev *gomatrixserverlib.Event, pos syncStreamPos
rp.cond.Broadcast() // notify ALL waiting goroutines rp.cond.Broadcast() // notify ALL waiting goroutines
} }
func (rp *RequestPool) waitForEvents(req syncRequest) syncStreamPosition { func (rp *RequestPool) waitForEvents(req syncRequest) syncapi.StreamPosition {
// In a guard, check if the /sync request should block, and block it until we get a new position // In a guard, check if the /sync request should block, and block it until we get a new position
rp.cond.L.Lock() rp.cond.L.Lock()
currentPos := rp.currPos currentPos := rp.currPos
@ -149,7 +150,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest) ([]gomatrixserverlib.
// a) Check if the room is 'newly joined' (insufficient to just check for a join event because we allow dupe joins). // a) Check if the room is 'newly joined' (insufficient to just check for a join event because we allow dupe joins).
// If it is, then we need to send the full room state down (and 'limited' is always true). // If it is, then we need to send the full room state down (and 'limited' is always true).
// b) Check if user is still CURRENTLY invited to the room. If so, add room to 'invited' block. // b) Check if user is still CURRENTLY invited to the room. If so, add room to 'invited' block.
// c) Check if the user is CURRENTLY left/banned. If so, add room to 'archived' block. // This has a TODO: How do we handle ban -> leave in same batch? // c) Check if the user is CURRENTLY left/banned. If so, add room to 'archived' block. // Synapse has a TODO: How do we handle ban -> leave in same batch?
// 4) Add joined rooms (joined room list) // 4) Add joined rooms (joined room list)
return rp.db.EventsInRange(int64(req.since), int64(currentPos)) return rp.db.EventsInRange(int64(req.since), int64(currentPos))
@ -166,13 +167,13 @@ func getTimeout(timeoutMS string) time.Duration {
return time.Duration(i) * time.Millisecond return time.Duration(i) * time.Millisecond
} }
func getSyncStreamPosition(since string) (syncStreamPosition, error) { func getSyncStreamPosition(since string) (syncapi.StreamPosition, error) {
if since == "" { if since == "" {
return syncStreamPosition(0), nil return syncapi.StreamPosition(0), nil
} }
i, err := strconv.Atoi(since) i, err := strconv.Atoi(since)
if err != nil { if err != nil {
return syncStreamPosition(0), err return syncapi.StreamPosition(0), err
} }
return syncStreamPosition(i), nil return syncapi.StreamPosition(i), nil
} }

View file

@ -0,0 +1,4 @@
package syncapi
// StreamPosition represents the offset in the sync stream a client is at.
type StreamPosition int64

View file

@ -6,15 +6,13 @@ import (
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
"github.com/matrix-org/dendrite/clientapi/config" "github.com/matrix-org/dendrite/clientapi/config"
"github.com/matrix-org/dendrite/clientapi/storage" "github.com/matrix-org/dendrite/clientapi/storage"
"github.com/matrix-org/dendrite/clientapi/sync/syncapi"
"github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
sarama "gopkg.in/Shopify/sarama.v1" sarama "gopkg.in/Shopify/sarama.v1"
) )
// syncStreamPosition represents the offset in the sync stream a client is at.
type syncStreamPosition int64
// Server contains all the logic for running a sync server // Server contains all the logic for running a sync server
type Server struct { type Server struct {
roomServerConsumer *common.ContinualConsumer roomServerConsumer *common.ContinualConsumer
@ -83,7 +81,7 @@ func (s *Server) onMessage(msg *sarama.ConsumerMessage) error {
}).Panicf("roomserver output log: write event failure") }).Panicf("roomserver output log: write event failure")
return nil return nil
} }
s.rp.OnNewEvent(&ev, syncStreamPosition(syncStreamPos)) s.rp.OnNewEvent(&ev, syncapi.StreamPosition(syncStreamPos))
return nil return nil
} }