From aa0126f6072aa26e85ae80d40d04efbbf21c6db0 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 6 Jan 2021 13:43:28 +0000 Subject: [PATCH] Dedupe a bit, add a template for the invite stream --- syncapi/storage/interface.go | 1 + syncapi/storage/shared/stream.go | 31 ++++++++ syncapi/storage/shared/stream_invite.go | 94 ++++++++++++++++++++++++ syncapi/storage/shared/stream_pdu.go | 35 +++------ syncapi/storage/shared/stream_receipt.go | 35 +++------ syncapi/storage/shared/stream_typing.go | 35 +++------ syncapi/storage/shared/syncserver.go | 13 +++- syncapi/sync/requestpool.go | 16 ++-- 8 files changed, 173 insertions(+), 87 deletions(-) create mode 100644 syncapi/storage/shared/stream.go create mode 100644 syncapi/storage/shared/stream_invite.go diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index adf534cf4..11567a9d0 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -35,6 +35,7 @@ type Database interface { PDUTopology() types.TopologyProvider TypingStream() types.StreamProvider ReceiptStream() types.StreamProvider + InviteStream() types.StreamProvider // AllJoinedUsersInRooms returns a map of room ID to a list of all joined user IDs. AllJoinedUsersInRooms(ctx context.Context) (map[string][]string, error) diff --git a/syncapi/storage/shared/stream.go b/syncapi/storage/shared/stream.go new file mode 100644 index 000000000..92d8fcb2a --- /dev/null +++ b/syncapi/storage/shared/stream.go @@ -0,0 +1,31 @@ +package shared + +import ( + "sync" + + "github.com/matrix-org/dendrite/syncapi/types" +) + +type StreamProvider struct { + DB *Database + latest types.StreamPosition + latestMutex sync.RWMutex + update *sync.Cond +} + +func (p *StreamProvider) StreamSetup() { + locker := &sync.Mutex{} + p.update = sync.NewCond(locker) +} + +func (p *StreamProvider) StreamAdvance( + latest types.StreamPosition, +) { + p.latestMutex.Lock() + defer p.latestMutex.Unlock() + + if latest > p.latest { + p.latest = latest + p.update.Broadcast() + } +} diff --git a/syncapi/storage/shared/stream_invite.go b/syncapi/storage/shared/stream_invite.go new file mode 100644 index 000000000..f4261bf7f --- /dev/null +++ b/syncapi/storage/shared/stream_invite.go @@ -0,0 +1,94 @@ +package shared + +import ( + "context" + + "github.com/matrix-org/dendrite/syncapi/types" +) + +type InviteStreamProvider struct { + StreamProvider +} + +func (p *InviteStreamProvider) StreamSetup() { + p.StreamProvider.StreamSetup() + + p.latestMutex.Lock() + defer p.latestMutex.Unlock() + + p.latest = 0 +} + +func (p *InviteStreamProvider) StreamLatestPosition( + ctx context.Context, +) types.StreamingToken { + p.latestMutex.RLock() + defer p.latestMutex.RUnlock() + + return types.StreamingToken{ + InvitePosition: p.latest, + } +} + +// nolint:gocyclo +func (p *InviteStreamProvider) StreamRange( + ctx context.Context, + req *types.StreamRangeRequest, + from, to types.StreamingToken, +) (newPos types.StreamingToken) { + + return types.StreamingToken{ + InvitePosition: 0, + } +} + +func (p *InviteStreamProvider) StreamNotifyAfter( + ctx context.Context, + from types.StreamingToken, +) chan struct{} { + ch := make(chan struct{}) + + check := func() bool { + p.latestMutex.RLock() + defer p.latestMutex.RUnlock() + if p.latest > from.InvitePosition { + close(ch) + return true + } + return false + } + + // If we've already advanced past the specified position + // then return straight away. + if check() { + return ch + } + + // If we haven't, then we'll subscribe to updates. The + // sync.Cond will fire every time the latest position + // updates, so we can check and see if we've advanced + // past it. + go func(p *InviteStreamProvider) { + p.update.L.Lock() + defer p.update.L.Unlock() + + for { + select { + case <-ctx.Done(): + // The context has expired, so there's no point + // in continuing to wait for the update. + return + default: + // The latest position has been advanced. Let's + // see if it's advanced to the position we care + // about. If it has then we'll return. + p.update.Wait() + if check() { + return + } + } + } + }(p) + + return ch +} diff --git a/syncapi/storage/shared/stream_pdu.go b/syncapi/storage/shared/stream_pdu.go index 08d7bbe59..19e95399d 100644 --- a/syncapi/storage/shared/stream_pdu.go +++ b/syncapi/storage/shared/stream_pdu.go @@ -2,22 +2,17 @@ package shared import ( "context" - "sync" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" ) type PDUStreamProvider struct { - DB *Database - latest types.StreamPosition - latestMutex sync.RWMutex - update *sync.Cond + StreamProvider } func (p *PDUStreamProvider) StreamSetup() { - locker := &sync.Mutex{} - p.update = sync.NewCond(locker) + p.StreamProvider.StreamSetup() p.latestMutex.Lock() defer p.latestMutex.Unlock() @@ -29,15 +24,14 @@ func (p *PDUStreamProvider) StreamSetup() { p.latest = types.StreamPosition(id) } -func (p *PDUStreamProvider) StreamAdvance( - latest types.StreamPosition, -) { - p.latestMutex.Lock() - defer p.latestMutex.Unlock() +func (p *PDUStreamProvider) StreamLatestPosition( + ctx context.Context, +) types.StreamingToken { + p.latestMutex.RLock() + defer p.latestMutex.RUnlock() - if latest > p.latest { - p.latest = latest - p.update.Broadcast() + return types.StreamingToken{ + PDUPosition: p.latest, } } @@ -179,14 +173,3 @@ func (p *PDUStreamProvider) StreamNotifyAfter( return ch } - -func (p *PDUStreamProvider) StreamLatestPosition( - ctx context.Context, -) types.StreamingToken { - p.latestMutex.RLock() - defer p.latestMutex.RUnlock() - - return types.StreamingToken{ - PDUPosition: p.latest, - } -} diff --git a/syncapi/storage/shared/stream_receipt.go b/syncapi/storage/shared/stream_receipt.go index 6c5cb9502..be3b59416 100644 --- a/syncapi/storage/shared/stream_receipt.go +++ b/syncapi/storage/shared/stream_receipt.go @@ -3,7 +3,6 @@ package shared import ( "context" "encoding/json" - "sync" eduAPI "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/syncapi/types" @@ -11,15 +10,11 @@ import ( ) type ReceiptStreamProvider struct { - DB *Database - latest types.StreamPosition - latestMutex sync.RWMutex - update *sync.Cond + StreamProvider } func (p *ReceiptStreamProvider) StreamSetup() { - locker := &sync.Mutex{} - p.update = sync.NewCond(locker) + p.StreamProvider.StreamSetup() latest, err := p.DB.Receipts.SelectMaxReceiptID(context.Background(), nil) if err != nil { @@ -29,15 +24,14 @@ func (p *ReceiptStreamProvider) StreamSetup() { p.latest = types.StreamPosition(latest) } -func (p *ReceiptStreamProvider) StreamAdvance( - latest types.StreamPosition, -) { - p.latestMutex.Lock() - defer p.latestMutex.Unlock() +func (p *ReceiptStreamProvider) StreamLatestPosition( + ctx context.Context, +) types.StreamingToken { + p.latestMutex.RLock() + defer p.latestMutex.RUnlock() - if latest > p.latest { - p.latest = latest - p.update.Broadcast() + return types.StreamingToken{ + ReceiptPosition: p.latest, } } @@ -151,14 +145,3 @@ func (p *ReceiptStreamProvider) StreamNotifyAfter( return ch } - -func (p *ReceiptStreamProvider) StreamLatestPosition( - ctx context.Context, -) types.StreamingToken { - p.latestMutex.RLock() - defer p.latestMutex.RUnlock() - - return types.StreamingToken{ - ReceiptPosition: p.latest, - } -} diff --git a/syncapi/storage/shared/stream_typing.go b/syncapi/storage/shared/stream_typing.go index c1686b165..476cb01d6 100644 --- a/syncapi/storage/shared/stream_typing.go +++ b/syncapi/storage/shared/stream_typing.go @@ -3,33 +3,27 @@ package shared import ( "context" "encoding/json" - "sync" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" ) type TypingStreamProvider struct { - DB *Database - latest types.StreamPosition - latestMutex sync.RWMutex - update *sync.Cond + StreamProvider } func (p *TypingStreamProvider) StreamSetup() { - locker := &sync.Mutex{} - p.update = sync.NewCond(locker) + p.StreamProvider.StreamSetup() } -func (p *TypingStreamProvider) StreamAdvance( - latest types.StreamPosition, -) { - p.latestMutex.Lock() - defer p.latestMutex.Unlock() +func (p *TypingStreamProvider) StreamLatestPosition( + ctx context.Context, +) types.StreamingToken { + p.latestMutex.RLock() + defer p.latestMutex.RUnlock() - if latest > p.latest { - p.latest = latest - p.update.Broadcast() + return types.StreamingToken{ + TypingPosition: p.latest, } } @@ -119,14 +113,3 @@ func (p *TypingStreamProvider) StreamNotifyAfter( return ch } - -func (p *TypingStreamProvider) StreamLatestPosition( - ctx context.Context, -) types.StreamingToken { - p.latestMutex.RLock() - defer p.latestMutex.RUnlock() - - return types.StreamingToken{ - TypingPosition: p.latest, - } -} diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index 07e753544..abbbb5c21 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -55,19 +55,22 @@ type Database struct { PDUTopologyProvider types.TopologyProvider TypingStreamProvider types.StreamProvider ReceiptStreamProvider types.StreamProvider + InviteStreamProvider types.StreamProvider } // ConfigureProviders creates instances of the various // stream and topology providers provided by the storage // packages. func (d *Database) ConfigureProviders() { - d.PDUStreamProvider = &PDUStreamProvider{DB: d} - d.TypingStreamProvider = &TypingStreamProvider{DB: d} - d.ReceiptStreamProvider = &ReceiptStreamProvider{DB: d} + d.PDUStreamProvider = &PDUStreamProvider{StreamProvider{DB: d}} + d.TypingStreamProvider = &TypingStreamProvider{StreamProvider{DB: d}} + d.ReceiptStreamProvider = &ReceiptStreamProvider{StreamProvider{DB: d}} + d.InviteStreamProvider = &InviteStreamProvider{StreamProvider{DB: d}} d.PDUStreamProvider.StreamSetup() d.TypingStreamProvider.StreamSetup() d.ReceiptStreamProvider.StreamSetup() + d.InviteStreamProvider.StreamSetup() d.PDUTopologyProvider = &PDUTopologyProvider{DB: d} } @@ -88,6 +91,10 @@ func (d *Database) ReceiptStream() types.StreamProvider { return d.ReceiptStreamProvider } +func (d *Database) InviteStream() types.StreamProvider { + return d.InviteStreamProvider +} + // Events lookups a list of event by their event ID. // Returns a list of events matching the requested IDs found in the database. // If an event is not found in the database then it will be omitted from the list. diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index 3b4b9f772..61cbd8968 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -71,7 +71,7 @@ func NewRequestPool( typingStream: db.TypingStream(), receiptStream: db.ReceiptStream(), sendToDeviceStream: nil, // TODO - inviteStream: nil, // TODO + inviteStream: db.InviteStream(), deviceListStream: nil, // TODO } go rp.cleanLastSeen() @@ -190,15 +190,19 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. return util.JSONResponse{Code: http.StatusOK, JSON: syncData} case <-rp.pduStream.StreamNotifyAfter(waitctx, syncReq.Since): + logger.Println("Responding to sync after PDU") case <-rp.typingStream.StreamNotifyAfter(waitctx, syncReq.Since): + logger.Println("Responding to sync after typing event") case <-rp.receiptStream.StreamNotifyAfter(waitctx, syncReq.Since): + logger.Println("Responding to sync after read receipt") + case <-rp.inviteStream.StreamNotifyAfter(waitctx, syncReq.Since): + logger.Println("Responding to sync after invite") + // case <-rp.sendToDeviceStream.StreamNotifyAfter(waitctx, syncReq.Since): - // case <-rp.inviteStream.StreamNotifyAfter(waitctx, syncReq.Since): // case <-rp.deviceListStream.StreamNotifyAfter(waitctx, syncReq.Since): } waitcancel() - logger.Println("Responding to sync after notify") } else { logger.Println("Responding to sync immediately") } @@ -208,15 +212,15 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. latest.ApplyUpdates(rp.typingStream.StreamLatestPosition(syncReq.Context)) latest.ApplyUpdates(rp.receiptStream.StreamLatestPosition(syncReq.Context)) // latest.ApplyUpdates(rp.sendToDeviceStream.StreamLatestPosition(syncReq.Context)) - // latest.ApplyUpdates(rp.inviteStream.StreamLatestPosition(syncReq.Context)) + latest.ApplyUpdates(rp.inviteStream.StreamLatestPosition(syncReq.Context)) // latest.ApplyUpdates(rp.deviceListStream.StreamLatestPosition(syncReq.Context)) syncReq.Response.NextBatch.ApplyUpdates(rp.pduStream.StreamRange(syncReq.Context, syncReq, syncReq.Since, latest)) syncReq.Response.NextBatch.ApplyUpdates(rp.typingStream.StreamRange(syncReq.Context, syncReq, syncReq.Since, latest)) syncReq.Response.NextBatch.ApplyUpdates(rp.receiptStream.StreamRange(syncReq.Context, syncReq, syncReq.Since, latest)) // syncReq.Response.NextBatch.ApplyUpdates(rp.sendToDeviceStream.StreamRange(syncReq.Context, syncReq, syncReq.Since, latest)) - // syncReq.Response.NextBatch.ApplyUpdates(rp.inviteStream.StreamRange(syncReq.Context, syncReq, syncReq.Since, latest)) - // syncReq.Response.NextBatch.ApplyUpdates(rp.inviteStream.StreamRange(syncReq.Context, syncReq, syncReq.Since, latest)) + syncReq.Response.NextBatch.ApplyUpdates(rp.inviteStream.StreamRange(syncReq.Context, syncReq, syncReq.Since, latest)) + // syncReq.Response.NextBatch.ApplyUpdates(rp.deviceListStream.StreamRange(syncReq.Context, syncReq, syncReq.Since, latest)) return util.JSONResponse{ Code: http.StatusOK,