From cbaeff609647232d0d8cc4c77887f179eeead44a Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 6 Jan 2021 14:37:09 +0000 Subject: [PATCH] Clean up, add templates for other streams --- syncapi/consumers/eduserver_receipts.go | 2 +- syncapi/consumers/eduserver_typing.go | 10 +- syncapi/consumers/roomserver.go | 4 +- syncapi/storage/interface.go | 2 + syncapi/storage/shared/stream.go | 65 ++++++++++++- syncapi/storage/shared/stream_invite.go | 97 ++++++------------- syncapi/storage/shared/stream_pdu.go | 88 +++-------------- syncapi/storage/shared/stream_receipt.go | 80 ++------------- syncapi/storage/shared/stream_sendtodevice.go | 20 ++++ syncapi/storage/shared/stream_typing.go | 90 +++-------------- syncapi/storage/shared/streamlog.go | 92 ++++++++++++++++++ .../storage/shared/streamlog_devicelist.go | 20 ++++ syncapi/storage/shared/syncserver.go | 32 ++++-- syncapi/sync/request.go | 4 +- syncapi/sync/requestpool.go | 55 ++++++----- syncapi/types/provider.go | 28 ++++-- 16 files changed, 336 insertions(+), 353 deletions(-) create mode 100644 syncapi/storage/shared/stream_sendtodevice.go create mode 100644 syncapi/storage/shared/streamlog.go create mode 100644 syncapi/storage/shared/streamlog_devicelist.go diff --git a/syncapi/consumers/eduserver_receipts.go b/syncapi/consumers/eduserver_receipts.go index 71d039803..fd1f806ce 100644 --- a/syncapi/consumers/eduserver_receipts.go +++ b/syncapi/consumers/eduserver_receipts.go @@ -82,7 +82,7 @@ func (s *OutputReceiptEventConsumer) onMessage(msg *sarama.ConsumerMessage) erro return err } - s.db.TypingStream().StreamAdvance(streamPos) + s.db.TypingStream().Advance(streamPos) return nil } diff --git a/syncapi/consumers/eduserver_typing.go b/syncapi/consumers/eduserver_typing.go index a27b6090e..585ddf6c9 100644 --- a/syncapi/consumers/eduserver_typing.go +++ b/syncapi/consumers/eduserver_typing.go @@ -59,11 +59,9 @@ func NewOutputTypingEventConsumer( // Start consuming from EDU api func (s *OutputTypingEventConsumer) Start() error { - /* - s.db.SetTypingTimeoutCallback(func(userID, roomID string, latestSyncPosition int64) { - s.notifier.OnNewTyping(roomID, types.StreamingToken{TypingPosition: types.StreamPosition(latestSyncPosition)}) - }) - */ + s.db.SetTypingTimeoutCallback(func(userID, roomID string, latestSyncPosition int64) { + s.db.TypingStream().Advance(types.StreamPosition(latestSyncPosition)) + }) return s.typingConsumer.Start() } @@ -90,7 +88,7 @@ func (s *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error typingPos = s.db.RemoveTypingUser(typingEvent.UserID, typingEvent.RoomID) } - s.db.TypingStream().StreamAdvance(typingPos) + s.db.TypingStream().Advance(typingPos) return nil } diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index f181d8da1..b720f24f8 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -176,7 +176,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent( return err } - s.db.PDUStream().StreamAdvance(pduPos) + s.db.PDUStream().Advance(pduPos) return nil } @@ -215,7 +215,7 @@ func (s *OutputRoomEventConsumer) onOldRoomEvent( return err } - s.db.PDUStream().StreamAdvance(pduPos) + s.db.PDUStream().Advance(pduPos) return nil } diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index 11567a9d0..24011254a 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -36,6 +36,8 @@ type Database interface { TypingStream() types.StreamProvider ReceiptStream() types.StreamProvider InviteStream() types.StreamProvider + SendToDeviceStream() types.StreamProvider + DeviceListStream() types.StreamLogProvider // AllJoinedUsersInRooms returns a map of room ID to a list of all joined user IDs. AllJoinedUsersInRooms(ctx context.Context) (map[string][]string, error) diff --git a/syncapi/storage/shared/stream.go b/syncapi/storage/shared/stream.go index 92d8fcb2a..8ba68dbe8 100644 --- a/syncapi/storage/shared/stream.go +++ b/syncapi/storage/shared/stream.go @@ -1,6 +1,7 @@ package shared import ( + "context" "sync" "github.com/matrix-org/dendrite/syncapi/types" @@ -13,12 +14,12 @@ type StreamProvider struct { update *sync.Cond } -func (p *StreamProvider) StreamSetup() { +func (p *StreamProvider) Setup() { locker := &sync.Mutex{} p.update = sync.NewCond(locker) } -func (p *StreamProvider) StreamAdvance( +func (p *StreamProvider) Advance( latest types.StreamPosition, ) { p.latestMutex.Lock() @@ -29,3 +30,63 @@ func (p *StreamProvider) StreamAdvance( p.update.Broadcast() } } + +func (p *StreamProvider) LatestPosition( + ctx context.Context, +) types.StreamPosition { + p.latestMutex.RLock() + defer p.latestMutex.RUnlock() + + return p.latest +} + +func (p *StreamProvider) NotifyAfter( + ctx context.Context, + from types.StreamPosition, +) chan struct{} { + ch := make(chan struct{}) + + check := func() bool { + p.latestMutex.RLock() + defer p.latestMutex.RUnlock() + if p.latest > from { + close(ch) + return true + } + return false + } + + // If we've already advanced past the specified position + // then return straight away. + if check() { + return ch + } + + // If we haven't, then we'll subscribe to updates. The + // sync.Cond will fire every time the latest position + // updates, so we can check and see if we've advanced + // past it. + go func(p *StreamProvider) { + p.update.L.Lock() + defer p.update.L.Unlock() + + for { + select { + case <-ctx.Done(): + // The context has expired, so there's no point + // in continuing to wait for the update. + return + default: + // The latest position has been advanced. Let's + // see if it's advanced to the position we care + // about. If it has then we'll return. + p.update.Wait() + if check() { + return + } + } + } + }(p) + + return ch +} diff --git a/syncapi/storage/shared/stream_invite.go b/syncapi/storage/shared/stream_invite.go index f4261bf7f..6f220f3bd 100644 --- a/syncapi/storage/shared/stream_invite.go +++ b/syncapi/storage/shared/stream_invite.go @@ -10,85 +10,48 @@ type InviteStreamProvider struct { StreamProvider } -func (p *InviteStreamProvider) StreamSetup() { - p.StreamProvider.StreamSetup() +func (p *InviteStreamProvider) Setup() { + p.StreamProvider.Setup() p.latestMutex.Lock() defer p.latestMutex.Unlock() - p.latest = 0 -} - -func (p *InviteStreamProvider) StreamLatestPosition( - ctx context.Context, -) types.StreamingToken { - p.latestMutex.RLock() - defer p.latestMutex.RUnlock() - - return types.StreamingToken{ - InvitePosition: p.latest, + latest, err := p.DB.Invites.SelectMaxInviteID(context.Background(), nil) + if err != nil { + return } + + p.latest = types.StreamPosition(latest) } -// nolint:gocyclo -func (p *InviteStreamProvider) StreamRange( +func (p *InviteStreamProvider) Range( ctx context.Context, - req *types.StreamRangeRequest, - from, to types.StreamingToken, -) (newPos types.StreamingToken) { - - return types.StreamingToken{ - InvitePosition: 0, + req *types.SyncRequest, + from, to types.StreamPosition, +) (newPos types.StreamPosition) { + r := types.Range{ + From: from, + To: to, } -} -func (p *InviteStreamProvider) StreamNotifyAfter( - ctx context.Context, - from types.StreamingToken, -) chan struct{} { - ch := make(chan struct{}) + invites, retiredInvites, err := p.DB.Invites.SelectInviteEventsInRange( + ctx, nil, req.Device.UserID, r, + ) + if err != nil { + return // fmt.Errorf("d.Invites.SelectInviteEventsInRange: %w", err) + } - check := func() bool { - p.latestMutex.RLock() - defer p.latestMutex.RUnlock() - if p.latest > from.InvitePosition { - close(ch) - return true + for roomID, inviteEvent := range invites { + ir := types.NewInviteResponse(inviteEvent) + req.Response.Rooms.Invite[roomID] = *ir + } + + for roomID := range retiredInvites { + if _, ok := req.Response.Rooms.Join[roomID]; !ok { + lr := types.NewLeaveResponse() + req.Response.Rooms.Leave[roomID] = *lr } - return false } - // If we've already advanced past the specified position - // then return straight away. - if check() { - return ch - } - - // If we haven't, then we'll subscribe to updates. The - // sync.Cond will fire every time the latest position - // updates, so we can check and see if we've advanced - // past it. - go func(p *InviteStreamProvider) { - p.update.L.Lock() - defer p.update.L.Unlock() - - for { - select { - case <-ctx.Done(): - // The context has expired, so there's no point - // in continuing to wait for the update. - return - default: - // The latest position has been advanced. Let's - // see if it's advanced to the position we care - // about. If it has then we'll return. - p.update.Wait() - if check() { - return - } - } - } - }(p) - - return ch + return to } diff --git a/syncapi/storage/shared/stream_pdu.go b/syncapi/storage/shared/stream_pdu.go index 19e95399d..7f58b7ba2 100644 --- a/syncapi/storage/shared/stream_pdu.go +++ b/syncapi/storage/shared/stream_pdu.go @@ -11,8 +11,8 @@ type PDUStreamProvider struct { StreamProvider } -func (p *PDUStreamProvider) StreamSetup() { - p.StreamProvider.StreamSetup() +func (p *PDUStreamProvider) Setup() { + p.StreamProvider.Setup() p.latestMutex.Lock() defer p.latestMutex.Unlock() @@ -24,31 +24,18 @@ func (p *PDUStreamProvider) StreamSetup() { p.latest = types.StreamPosition(id) } -func (p *PDUStreamProvider) StreamLatestPosition( - ctx context.Context, -) types.StreamingToken { - p.latestMutex.RLock() - defer p.latestMutex.RUnlock() - - return types.StreamingToken{ - PDUPosition: p.latest, - } -} - // nolint:gocyclo -func (p *PDUStreamProvider) StreamRange( +func (p *PDUStreamProvider) Range( ctx context.Context, - req *types.StreamRangeRequest, - from, to types.StreamingToken, -) (newPos types.StreamingToken) { + req *types.SyncRequest, + from, to types.StreamPosition, +) (newPos types.StreamPosition) { r := types.Range{ - From: from.PDUPosition, - To: to.PDUPosition, - Backwards: from.IsAfter(to), - } - newPos = types.StreamingToken{ - PDUPosition: to.PDUPosition, + From: from, + To: to, + Backwards: from > to, } + newPos = to var err error var events []types.StreamEvent @@ -99,8 +86,8 @@ func (p *PDUStreamProvider) StreamRange( } for _, event := range events { - if event.StreamPosition > newPos.PDUPosition { - newPos.PDUPosition = event.StreamPosition + if event.StreamPosition > newPos { + newPos = event.StreamPosition } } @@ -122,54 +109,3 @@ func (p *PDUStreamProvider) StreamRange( return newPos } - -func (p *PDUStreamProvider) StreamNotifyAfter( - ctx context.Context, - from types.StreamingToken, -) chan struct{} { - ch := make(chan struct{}) - - check := func() bool { - p.latestMutex.RLock() - defer p.latestMutex.RUnlock() - if p.latest > from.PDUPosition { - close(ch) - return true - } - return false - } - - // If we've already advanced past the specified position - // then return straight away. - if check() { - return ch - } - - // If we haven't, then we'll subscribe to updates. The - // sync.Cond will fire every time the latest position - // updates, so we can check and see if we've advanced - // past it. - go func(p *PDUStreamProvider) { - p.update.L.Lock() - defer p.update.L.Unlock() - - for { - select { - case <-ctx.Done(): - // The context has expired, so there's no point - // in continuing to wait for the update. - return - default: - // The latest position has been advanced. Let's - // see if it's advanced to the position we care - // about. If it has then we'll return. - p.update.Wait() - if check() { - return - } - } - } - }(p) - - return ch -} diff --git a/syncapi/storage/shared/stream_receipt.go b/syncapi/storage/shared/stream_receipt.go index be3b59416..a44f7de70 100644 --- a/syncapi/storage/shared/stream_receipt.go +++ b/syncapi/storage/shared/stream_receipt.go @@ -13,8 +13,8 @@ type ReceiptStreamProvider struct { StreamProvider } -func (p *ReceiptStreamProvider) StreamSetup() { - p.StreamProvider.StreamSetup() +func (p *ReceiptStreamProvider) Setup() { + p.StreamProvider.Setup() latest, err := p.DB.Receipts.SelectMaxReceiptID(context.Background(), nil) if err != nil { @@ -24,22 +24,11 @@ func (p *ReceiptStreamProvider) StreamSetup() { p.latest = types.StreamPosition(latest) } -func (p *ReceiptStreamProvider) StreamLatestPosition( +func (p *ReceiptStreamProvider) Range( ctx context.Context, -) types.StreamingToken { - p.latestMutex.RLock() - defer p.latestMutex.RUnlock() - - return types.StreamingToken{ - ReceiptPosition: p.latest, - } -} - -func (p *ReceiptStreamProvider) StreamRange( - ctx context.Context, - req *types.StreamRangeRequest, - from, to types.StreamingToken, -) types.StreamingToken { + req *types.SyncRequest, + from, to types.StreamPosition, +) types.StreamPosition { var joinedRooms []string for roomID, membership := range req.Rooms { if membership == gomatrixserverlib.Join { @@ -47,7 +36,7 @@ func (p *ReceiptStreamProvider) StreamRange( } } - lastPos, receipts, err := p.DB.Receipts.SelectRoomReceiptsAfter(ctx, joinedRooms, from.ReceiptPosition) + lastPos, receipts, err := p.DB.Receipts.SelectRoomReceiptsAfter(ctx, joinedRooms, from) if err != nil { return to //fmt.Errorf("unable to select receipts for rooms: %w", err) } @@ -90,58 +79,5 @@ func (p *ReceiptStreamProvider) StreamRange( req.Response.Rooms.Join[roomID] = jr } - return types.StreamingToken{ - ReceiptPosition: lastPos, - } -} - -func (p *ReceiptStreamProvider) StreamNotifyAfter( - ctx context.Context, - from types.StreamingToken, -) chan struct{} { - ch := make(chan struct{}) - - check := func() bool { - p.latestMutex.RLock() - defer p.latestMutex.RUnlock() - if p.latest > from.ReceiptPosition { - close(ch) - return true - } - return false - } - - // If we've already advanced past the specified position - // then return straight away. - if check() { - return ch - } - - // If we haven't, then we'll subscribe to updates. The - // sync.Cond will fire every time the latest position - // updates, so we can check and see if we've advanced - // past it. - go func(p *ReceiptStreamProvider) { - p.update.L.Lock() - defer p.update.L.Unlock() - - for { - select { - case <-ctx.Done(): - // The context has expired, so there's no point - // in continuing to wait for the update. - return - default: - // The latest position has been advanced. Let's - // see if it's advanced to the position we care - // about. If it has then we'll return. - p.update.Wait() - if check() { - return - } - } - } - }(p) - - return ch + return lastPos } diff --git a/syncapi/storage/shared/stream_sendtodevice.go b/syncapi/storage/shared/stream_sendtodevice.go new file mode 100644 index 000000000..e4815453b --- /dev/null +++ b/syncapi/storage/shared/stream_sendtodevice.go @@ -0,0 +1,20 @@ +package shared + +import ( + "context" + + "github.com/matrix-org/dendrite/syncapi/types" +) + +type SendToDeviceStreamProvider struct { + StreamProvider +} + +func (p *SendToDeviceStreamProvider) Range( + ctx context.Context, + req *types.SyncRequest, + from, to types.StreamPosition, +) types.StreamPosition { + + return to +} diff --git a/syncapi/storage/shared/stream_typing.go b/syncapi/storage/shared/stream_typing.go index 476cb01d6..093d8cec4 100644 --- a/syncapi/storage/shared/stream_typing.go +++ b/syncapi/storage/shared/stream_typing.go @@ -12,34 +12,23 @@ type TypingStreamProvider struct { StreamProvider } -func (p *TypingStreamProvider) StreamSetup() { - p.StreamProvider.StreamSetup() -} - -func (p *TypingStreamProvider) StreamLatestPosition( +func (p *TypingStreamProvider) Range( ctx context.Context, -) types.StreamingToken { - p.latestMutex.RLock() - defer p.latestMutex.RUnlock() - - return types.StreamingToken{ - TypingPosition: p.latest, - } -} - -func (p *TypingStreamProvider) StreamRange( - ctx context.Context, - req *types.StreamRangeRequest, - from, to types.StreamingToken, -) types.StreamingToken { + req *types.SyncRequest, + from, to types.StreamPosition, +) types.StreamPosition { var err error - for roomID := range req.Rooms { + for roomID, membership := range req.Rooms { + if membership != gomatrixserverlib.Join { + continue + } + // This may have already been set by a previous stream, so // reuse it if it exists. jr := req.Response.Rooms.Join[roomID] if users, updated := p.DB.EDUCache.GetTypingUsersIfUpdatedAfter( - roomID, int64(from.TypingPosition), + roomID, int64(from), ); updated { ev := gomatrixserverlib.ClientEvent{ Type: gomatrixserverlib.MTyping, @@ -48,9 +37,7 @@ func (p *TypingStreamProvider) StreamRange( "user_ids": users, }) if err != nil { - return types.StreamingToken{ - TypingPosition: from.TypingPosition, - } + return to } jr.Ephemeral.Events = append(jr.Ephemeral.Events, ev) @@ -58,58 +45,5 @@ func (p *TypingStreamProvider) StreamRange( } } - return types.StreamingToken{ - TypingPosition: types.StreamPosition(p.DB.EDUCache.GetLatestSyncPosition()), - } -} - -func (p *TypingStreamProvider) StreamNotifyAfter( - ctx context.Context, - from types.StreamingToken, -) chan struct{} { - ch := make(chan struct{}) - - check := func() bool { - p.latestMutex.RLock() - defer p.latestMutex.RUnlock() - if p.latest > from.TypingPosition { - close(ch) - return true - } - return false - } - - // If we've already advanced past the specified position - // then return straight away. - if check() { - return ch - } - - // If we haven't, then we'll subscribe to updates. The - // sync.Cond will fire every time the latest position - // updates, so we can check and see if we've advanced - // past it. - go func(p *TypingStreamProvider) { - p.update.L.Lock() - defer p.update.L.Unlock() - - for { - select { - case <-ctx.Done(): - // The context has expired, so there's no point - // in continuing to wait for the update. - return - default: - // The latest position has been advanced. Let's - // see if it's advanced to the position we care - // about. If it has then we'll return. - p.update.Wait() - if check() { - return - } - } - } - }(p) - - return ch + return to } diff --git a/syncapi/storage/shared/streamlog.go b/syncapi/storage/shared/streamlog.go new file mode 100644 index 000000000..f167108db --- /dev/null +++ b/syncapi/storage/shared/streamlog.go @@ -0,0 +1,92 @@ +package shared + +import ( + "context" + "sync" + + "github.com/matrix-org/dendrite/syncapi/types" +) + +type StreamLogProvider struct { + DB *Database + latest types.LogPosition + latestMutex sync.RWMutex + update *sync.Cond +} + +func (p *StreamLogProvider) Setup() { + locker := &sync.Mutex{} + p.update = sync.NewCond(locker) +} + +func (p *StreamLogProvider) Advance( + latest types.LogPosition, +) { + p.latestMutex.Lock() + defer p.latestMutex.Unlock() + + if latest.IsAfter(&p.latest) { + p.latest = latest + p.update.Broadcast() + } +} + +func (p *StreamLogProvider) LatestPosition( + ctx context.Context, +) types.LogPosition { + p.latestMutex.RLock() + defer p.latestMutex.RUnlock() + + return p.latest +} + +func (p *StreamLogProvider) NotifyAfter( + ctx context.Context, + from types.LogPosition, +) chan struct{} { + ch := make(chan struct{}) + + check := func() bool { + p.latestMutex.RLock() + defer p.latestMutex.RUnlock() + if p.latest.IsAfter(&from) { + close(ch) + return true + } + return false + } + + // If we've already advanced past the specified position + // then return straight away. + if check() { + return ch + } + + // If we haven't, then we'll subscribe to updates. The + // sync.Cond will fire every time the latest position + // updates, so we can check and see if we've advanced + // past it. + go func(p *StreamLogProvider) { + p.update.L.Lock() + defer p.update.L.Unlock() + + for { + select { + case <-ctx.Done(): + // The context has expired, so there's no point + // in continuing to wait for the update. + return + default: + // The latest position has been advanced. Let's + // see if it's advanced to the position we care + // about. If it has then we'll return. + p.update.Wait() + if check() { + return + } + } + } + }(p) + + return ch +} diff --git a/syncapi/storage/shared/streamlog_devicelist.go b/syncapi/storage/shared/streamlog_devicelist.go new file mode 100644 index 000000000..3c2320739 --- /dev/null +++ b/syncapi/storage/shared/streamlog_devicelist.go @@ -0,0 +1,20 @@ +package shared + +import ( + "context" + + "github.com/matrix-org/dendrite/syncapi/types" +) + +type DeviceListStreamProvider struct { + StreamLogProvider +} + +func (p *DeviceListStreamProvider) Range( + ctx context.Context, + req *types.SyncRequest, + from, to types.LogPosition, +) types.LogPosition { + + return to +} diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index abbbb5c21..672a3d278 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -51,11 +51,13 @@ type Database struct { Receipts tables.Receipts EDUCache *cache.EDUCache - PDUStreamProvider types.StreamProvider - PDUTopologyProvider types.TopologyProvider - TypingStreamProvider types.StreamProvider - ReceiptStreamProvider types.StreamProvider - InviteStreamProvider types.StreamProvider + PDUStreamProvider types.StreamProvider + PDUTopologyProvider types.TopologyProvider + TypingStreamProvider types.StreamProvider + ReceiptStreamProvider types.StreamProvider + InviteStreamProvider types.StreamProvider + SendToDeviceStreamProvider types.StreamProvider + DeviceListStreamProvider types.StreamLogProvider } // ConfigureProviders creates instances of the various @@ -66,11 +68,15 @@ func (d *Database) ConfigureProviders() { d.TypingStreamProvider = &TypingStreamProvider{StreamProvider{DB: d}} d.ReceiptStreamProvider = &ReceiptStreamProvider{StreamProvider{DB: d}} d.InviteStreamProvider = &InviteStreamProvider{StreamProvider{DB: d}} + d.SendToDeviceStreamProvider = &SendToDeviceStreamProvider{StreamProvider{DB: d}} + d.DeviceListStreamProvider = &DeviceListStreamProvider{StreamLogProvider{DB: d}} - d.PDUStreamProvider.StreamSetup() - d.TypingStreamProvider.StreamSetup() - d.ReceiptStreamProvider.StreamSetup() - d.InviteStreamProvider.StreamSetup() + d.PDUStreamProvider.Setup() + d.TypingStreamProvider.Setup() + d.ReceiptStreamProvider.Setup() + d.InviteStreamProvider.Setup() + d.SendToDeviceStreamProvider.Setup() + d.DeviceListStreamProvider.Setup() d.PDUTopologyProvider = &PDUTopologyProvider{DB: d} } @@ -95,6 +101,14 @@ func (d *Database) InviteStream() types.StreamProvider { return d.InviteStreamProvider } +func (d *Database) SendToDeviceStream() types.StreamProvider { + return d.SendToDeviceStreamProvider +} + +func (d *Database) DeviceListStream() types.StreamLogProvider { + return d.DeviceListStreamProvider +} + // Events lookups a list of event by their event ID. // Returns a list of events matching the requested IDs found in the database. // If an event is not found in the database then it will be omitted from the list. diff --git a/syncapi/sync/request.go b/syncapi/sync/request.go index 58ac97b6e..1c50bb061 100644 --- a/syncapi/sync/request.go +++ b/syncapi/sync/request.go @@ -38,7 +38,7 @@ type filter struct { } `json:"room"` } -func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Database) (*types.StreamRangeRequest, error) { +func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Database) (*types.SyncRequest, error) { timeout := getTimeout(req.URL.Query().Get("timeout")) fullState := req.URL.Query().Get("full_state") wantFullState := fullState != "" && fullState != "false" @@ -76,7 +76,7 @@ func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Dat } // TODO: Additional query params: set_presence, filter - return &types.StreamRangeRequest{ + return &types.SyncRequest{ Context: req.Context(), // Device: &device, // Response: types.NewResponse(), // Populated by all streams diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index 61cbd8968..b10168c8f 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -51,7 +51,7 @@ type RequestPool struct { receiptStream types.StreamProvider sendToDeviceStream types.StreamProvider inviteStream types.StreamProvider - deviceListStream types.StreamProvider + deviceListStream types.StreamLogProvider } // NewRequestPool makes a new RequestPool @@ -70,9 +70,9 @@ func NewRequestPool( pduStream: db.PDUStream(), typingStream: db.TypingStream(), receiptStream: db.ReceiptStream(), - sendToDeviceStream: nil, // TODO + sendToDeviceStream: db.SendToDeviceStream(), inviteStream: db.InviteStream(), - deviceListStream: nil, // TODO + deviceListStream: db.DeviceListStream(), } go rp.cleanLastSeen() return rp @@ -189,38 +189,37 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. waitcancel() return util.JSONResponse{Code: http.StatusOK, JSON: syncData} - case <-rp.pduStream.StreamNotifyAfter(waitctx, syncReq.Since): - logger.Println("Responding to sync after PDU") - case <-rp.typingStream.StreamNotifyAfter(waitctx, syncReq.Since): - logger.Println("Responding to sync after typing event") - case <-rp.receiptStream.StreamNotifyAfter(waitctx, syncReq.Since): - logger.Println("Responding to sync after read receipt") - case <-rp.inviteStream.StreamNotifyAfter(waitctx, syncReq.Since): - logger.Println("Responding to sync after invite") - - // case <-rp.sendToDeviceStream.StreamNotifyAfter(waitctx, syncReq.Since): - // case <-rp.deviceListStream.StreamNotifyAfter(waitctx, syncReq.Since): + case <-rp.pduStream.NotifyAfter(waitctx, syncReq.Since.PDUPosition): + case <-rp.typingStream.NotifyAfter(waitctx, syncReq.Since.TypingPosition): + case <-rp.receiptStream.NotifyAfter(waitctx, syncReq.Since.ReceiptPosition): + case <-rp.inviteStream.NotifyAfter(waitctx, syncReq.Since.InvitePosition): + case <-rp.sendToDeviceStream.NotifyAfter(waitctx, syncReq.Since.SendToDevicePosition): + case <-rp.deviceListStream.NotifyAfter(waitctx, syncReq.Since.DeviceListPosition): } + logger.Println("Responding to sync after wakeup") waitcancel() } else { logger.Println("Responding to sync immediately") } - var latest types.StreamingToken - latest.ApplyUpdates(rp.pduStream.StreamLatestPosition(syncReq.Context)) - latest.ApplyUpdates(rp.typingStream.StreamLatestPosition(syncReq.Context)) - latest.ApplyUpdates(rp.receiptStream.StreamLatestPosition(syncReq.Context)) - // latest.ApplyUpdates(rp.sendToDeviceStream.StreamLatestPosition(syncReq.Context)) - latest.ApplyUpdates(rp.inviteStream.StreamLatestPosition(syncReq.Context)) - // latest.ApplyUpdates(rp.deviceListStream.StreamLatestPosition(syncReq.Context)) + latest := types.StreamingToken{ + PDUPosition: rp.pduStream.LatestPosition(syncReq.Context), + TypingPosition: rp.typingStream.LatestPosition(syncReq.Context), + ReceiptPosition: rp.receiptStream.LatestPosition(syncReq.Context), + InvitePosition: rp.inviteStream.LatestPosition(syncReq.Context), + SendToDevicePosition: rp.sendToDeviceStream.LatestPosition(syncReq.Context), + DeviceListPosition: rp.db.DeviceListStream().LatestPosition(syncReq.Context), + } - syncReq.Response.NextBatch.ApplyUpdates(rp.pduStream.StreamRange(syncReq.Context, syncReq, syncReq.Since, latest)) - syncReq.Response.NextBatch.ApplyUpdates(rp.typingStream.StreamRange(syncReq.Context, syncReq, syncReq.Since, latest)) - syncReq.Response.NextBatch.ApplyUpdates(rp.receiptStream.StreamRange(syncReq.Context, syncReq, syncReq.Since, latest)) - // syncReq.Response.NextBatch.ApplyUpdates(rp.sendToDeviceStream.StreamRange(syncReq.Context, syncReq, syncReq.Since, latest)) - syncReq.Response.NextBatch.ApplyUpdates(rp.inviteStream.StreamRange(syncReq.Context, syncReq, syncReq.Since, latest)) - // syncReq.Response.NextBatch.ApplyUpdates(rp.deviceListStream.StreamRange(syncReq.Context, syncReq, syncReq.Since, latest)) + syncReq.Response.NextBatch = types.StreamingToken{ + PDUPosition: rp.pduStream.Range(syncReq.Context, syncReq, syncReq.Since.PDUPosition, latest.PDUPosition), + TypingPosition: rp.typingStream.Range(syncReq.Context, syncReq, syncReq.Since.TypingPosition, latest.TypingPosition), + ReceiptPosition: rp.receiptStream.Range(syncReq.Context, syncReq, syncReq.Since.ReceiptPosition, latest.ReceiptPosition), + InvitePosition: rp.inviteStream.Range(syncReq.Context, syncReq, syncReq.Since.InvitePosition, latest.InvitePosition), + SendToDevicePosition: rp.sendToDeviceStream.Range(syncReq.Context, syncReq, syncReq.Since.SendToDevicePosition, latest.SendToDevicePosition), + DeviceListPosition: rp.deviceListStream.Range(syncReq.Context, syncReq, syncReq.Since.DeviceListPosition, latest.DeviceListPosition), + } return util.JSONResponse{ Code: http.StatusOK, @@ -463,7 +462,7 @@ func (rp *RequestPool) appendAccountData( // shouldReturnImmediately returns whether the /sync request is an initial sync, // or timeout=0, or full_state=true, in any of the cases the request should // return immediately. -func (rp *RequestPool) shouldReturnImmediately(syncReq *types.StreamRangeRequest) bool { +func (rp *RequestPool) shouldReturnImmediately(syncReq *types.SyncRequest) bool { if syncReq.Since.IsEmpty() || syncReq.Timeout == 0 || syncReq.WantFullState { return true } diff --git a/syncapi/types/provider.go b/syncapi/types/provider.go index a79504b3c..563d56773 100644 --- a/syncapi/types/provider.go +++ b/syncapi/types/provider.go @@ -8,7 +8,7 @@ import ( "github.com/matrix-org/gomatrixserverlib" ) -type StreamRangeRequest struct { +type SyncRequest struct { Context context.Context Device *userapi.Device Response *Response @@ -24,23 +24,31 @@ type StreamRangeRequest struct { } type StreamProvider interface { - StreamSetup() + Setup() - // StreamAdvance will update the latest position of the stream based on + // Advance will update the latest position of the stream based on // an update and will wake callers waiting on StreamNotifyAfter. - StreamAdvance(latest StreamPosition) + Advance(latest StreamPosition) - // StreamRange will update the response to include all updates between + // Range will update the response to include all updates between // the from and to sync positions. It will always return immediately, // making no changes if the range contains no updates. - StreamRange(ctx context.Context, req *StreamRangeRequest, from, to StreamingToken) StreamingToken + Range(ctx context.Context, req *SyncRequest, from, to StreamPosition) StreamPosition - // StreamNotifyAfter returns a channel which will be closed once the + // NotifyAfter returns a channel which will be closed once the // stream advances past the "from" position. - StreamNotifyAfter(ctx context.Context, from StreamingToken) chan struct{} + NotifyAfter(ctx context.Context, from StreamPosition) chan struct{} - // StreamLatestPosition returns the latest stream position for this stream. - StreamLatestPosition(ctx context.Context) StreamingToken + // LatestPosition returns the latest stream position for this stream. + LatestPosition(ctx context.Context) StreamPosition +} + +type StreamLogProvider interface { + Setup() + Advance(latest LogPosition) + Range(ctx context.Context, req *SyncRequest, from, to LogPosition) LogPosition + NotifyAfter(ctx context.Context, from LogPosition) chan struct{} + LatestPosition(ctx context.Context) LogPosition } type TopologyProvider interface {