From 475ae4ca3b9aca5322919fe1f49ebf9a52296fed Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Tue, 5 Jan 2021 17:22:27 +0000 Subject: [PATCH] It's half-alive --- syncapi/storage/interface.go | 5 + syncapi/storage/postgres/syncserver.go | 1 + syncapi/storage/shared/pdu_stream.go | 175 ++++++++++++++++++++++++ syncapi/storage/shared/pdu_topology.go | 31 +++++ syncapi/storage/shared/syncserver.go | 30 ++++ syncapi/storage/shared/typing_stream.go | 102 ++++++++++++++ syncapi/storage/sqlite3/syncserver.go | 1 + syncapi/sync/requestpool.go | 121 ++++++++-------- syncapi/types/provider.go | 37 +++++ 9 files changed, 438 insertions(+), 65 deletions(-) create mode 100644 syncapi/storage/shared/pdu_stream.go create mode 100644 syncapi/storage/shared/pdu_topology.go create mode 100644 syncapi/storage/shared/typing_stream.go create mode 100644 syncapi/types/provider.go diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index 9ab6f9157..db11837df 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -30,6 +30,11 @@ import ( type Database interface { internal.PartitionStorer + + PDUStream() types.StreamProvider + PDUTopology() types.TopologyProvider + TypingStream() types.StreamProvider + // AllJoinedUsersInRooms returns a map of room ID to a list of all joined user IDs. AllJoinedUsersInRooms(ctx context.Context) (map[string][]string, error) // AllPeekingDevicesInRooms returns a map of room ID to a list of all peeking devices. diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go index 60d67ac0e..b276a6a83 100644 --- a/syncapi/storage/postgres/syncserver.go +++ b/syncapi/storage/postgres/syncserver.go @@ -108,5 +108,6 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e Receipts: receipts, EDUCache: cache.New(), } + d.Database.ConfigureProviders() return &d, nil } diff --git a/syncapi/storage/shared/pdu_stream.go b/syncapi/storage/shared/pdu_stream.go new file mode 100644 index 000000000..4aee8d5ee --- /dev/null +++ b/syncapi/storage/shared/pdu_stream.go @@ -0,0 +1,175 @@ +package shared + +import ( + "context" + "sync" + + "github.com/matrix-org/dendrite/syncapi/types" + userapi "github.com/matrix-org/dendrite/userapi/api" + "github.com/matrix-org/gomatrixserverlib" +) + +type PDUStreamProvider struct { + DB *Database + latest types.StreamPosition + latestMutex sync.RWMutex + update *sync.Cond +} + +func (p *PDUStreamProvider) StreamSetup() { + locker := &sync.Mutex{} + p.update = sync.NewCond(locker) + + p.latestMutex.Lock() + defer p.latestMutex.Unlock() + + id, err := p.DB.OutputEvents.SelectMaxEventID(context.Background(), nil) + if err != nil { + return + } + p.latest = types.StreamPosition(id) +} + +func (p *PDUStreamProvider) StreamAdvance( + latest types.StreamPosition, +) { + p.latestMutex.Lock() + defer p.latestMutex.Unlock() + + p.latest = latest + p.update.Broadcast() +} + +func (p *PDUStreamProvider) StreamRange( + ctx context.Context, + res *types.Response, + device *userapi.Device, + from, to types.StreamingToken, + filter gomatrixserverlib.EventFilter, +) (newPos types.StreamPosition) { + r := types.Range{ + From: from.PDUPosition, + To: to.PDUPosition, + Backwards: from.IsAfter(to), + } + newPos = from.PDUPosition + + var err error + var events []types.StreamEvent + var stateDeltas []stateDelta + + // TODO: use filter provided in request + stateFilter := gomatrixserverlib.DefaultStateFilter() + + if from.IsEmpty() { + if stateDeltas, _, err = p.DB.getStateDeltas(ctx, device, nil, r, device.UserID, &stateFilter); err != nil { + return + } + } else { + if stateDeltas, _, err = p.DB.getStateDeltasForFullStateSync(ctx, device, nil, r, device.UserID, &stateFilter); err != nil { + return + } + } + + for _, stateDelta := range stateDeltas { + roomID := stateDelta.roomID + room := types.JoinResponse{} + + if r.Backwards { + // When using backward ordering, we want the most recent events first. + if events, _, err = p.DB.OutputEvents.SelectRecentEvents(ctx, nil, roomID, r, filter.Limit, false, false); err != nil { + return + } + } else { + // When using forward ordering, we want the least recent events first. + if events, err = p.DB.OutputEvents.SelectEarlyEvents(ctx, nil, roomID, r, filter.Limit); err != nil { + return + } + } + + for _, event := range events { + room.Timeline.Events = append( + room.Timeline.Events, + gomatrixserverlib.ToClientEvent( + event.Event, + gomatrixserverlib.FormatSync, + ), + ) + + if event.StreamPosition > newPos { + newPos = event.StreamPosition + } + } + + room.State.Events = gomatrixserverlib.HeaderedToClientEvents( + stateDelta.stateEvents, + gomatrixserverlib.FormatSync, + ) + + // TODO: fill in prev_batch + + res.Rooms.Join[roomID] = room + } + + return newPos +} + +func (p *PDUStreamProvider) StreamNotifyAfter( + ctx context.Context, + from types.StreamingToken, +) chan struct{} { + ch := make(chan struct{}) + + check := func() bool { + p.latestMutex.RLock() + defer p.latestMutex.RUnlock() + if from.PDUPosition > p.latest { + close(ch) + return true + } + return false + } + + // If we've already advanced past the specified position + // then return straight away. + if check() { + return ch + } + + // If we haven't, then we'll subscribe to updates. The + // sync.Cond will fire every time the latest position + // updates, so we can check and see if we've advanced + // past it. + go func(p *PDUStreamProvider) { + p.update.L.Lock() + defer p.update.L.Unlock() + + for { + select { + case <-ctx.Done(): + // The context has expired, so there's no point + // in continuing to wait for the update. + return + default: + // The latest position has been advanced. Let's + // see if it's advanced to the position we care + // about. If it has then we'll return. + p.update.Wait() + if check() { + return + } + } + } + }(p) + + return ch +} + +func (p *PDUStreamProvider) StreamLatestPosition( + ctx context.Context, +) types.StreamPosition { + p.latestMutex.RLock() + defer p.latestMutex.RUnlock() + + return p.latest +} diff --git a/syncapi/storage/shared/pdu_topology.go b/syncapi/storage/shared/pdu_topology.go new file mode 100644 index 000000000..c2b024cde --- /dev/null +++ b/syncapi/storage/shared/pdu_topology.go @@ -0,0 +1,31 @@ +package shared + +import ( + "context" + + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/gomatrixserverlib" +) + +type PDUTopologyProvider struct { + DB *Database +} + +func (p *PDUTopologyProvider) TopologyRange(ctx context.Context, res *types.Response, roomID string, from, to types.TopologyToken, filter gomatrixserverlib.EventFilter) { + backwardOrdering := from.Depth > to.Depth || from.PDUPosition > to.PDUPosition + + events, err := p.DB.GetEventsInTopologicalRange(ctx, &from, &to, roomID, filter.Limit, backwardOrdering) + if err != nil { + return + } + + _ = events +} + +func (p *PDUTopologyProvider) TopologyLatestPosition(ctx context.Context, roomID string) types.StreamPosition { + token, err := p.DB.MaxTopologicalPosition(ctx, roomID) + if err != nil { + return 0 + } + return token.PDUPosition +} diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index ba9403a53..a57f24720 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -50,6 +50,35 @@ type Database struct { Filter tables.Filter Receipts tables.Receipts EDUCache *cache.EDUCache + + PDUStreamProvider types.StreamProvider + PDUTopologyProvider types.TopologyProvider + TypingStreamProvider types.StreamProvider +} + +// ConfigureProviders creates instances of the various +// stream and topology providers provided by the storage +// packages. +func (d *Database) ConfigureProviders() { + d.PDUStreamProvider = &PDUStreamProvider{DB: d} + d.TypingStreamProvider = &TypingStreamProvider{DB: d} + + d.PDUStreamProvider.StreamSetup() + d.TypingStreamProvider.StreamSetup() + + d.PDUTopologyProvider = &PDUTopologyProvider{DB: d} +} + +func (d *Database) PDUStream() types.StreamProvider { + return d.PDUStreamProvider +} + +func (d *Database) PDUTopology() types.TopologyProvider { + return d.PDUTopologyProvider +} + +func (d *Database) TypingStream() types.StreamProvider { + return d.TypingStreamProvider } // Events lookups a list of event by their event ID. @@ -325,6 +354,7 @@ func (d *Database) WriteEvent( return fmt.Errorf("d.OutputEvents.InsertEvent: %w", err) } pduPosition = pos + d.PDUStreamProvider.StreamAdvance(pduPosition) if err = d.Topology.InsertEventInTopology(ctx, txn, ev, pos); err != nil { return fmt.Errorf("d.Topology.InsertEventInTopology: %w", err) diff --git a/syncapi/storage/shared/typing_stream.go b/syncapi/storage/shared/typing_stream.go new file mode 100644 index 000000000..baac972a0 --- /dev/null +++ b/syncapi/storage/shared/typing_stream.go @@ -0,0 +1,102 @@ +package shared + +import ( + "context" + "sync" + + "github.com/matrix-org/dendrite/syncapi/types" + userapi "github.com/matrix-org/dendrite/userapi/api" + "github.com/matrix-org/gomatrixserverlib" +) + +type TypingStreamProvider struct { + DB *Database + latest types.StreamPosition + latestMutex sync.RWMutex + update *sync.Cond +} + +func (p *TypingStreamProvider) StreamSetup() { + locker := &sync.Mutex{} + p.update = sync.NewCond(locker) +} + +func (p *TypingStreamProvider) StreamAdvance( + latest types.StreamPosition, +) { + p.latestMutex.Lock() + defer p.latestMutex.Unlock() + + p.latest = latest + p.update.Broadcast() +} + +func (p *TypingStreamProvider) StreamRange( + ctx context.Context, + res *types.Response, + device *userapi.Device, + from, to types.StreamingToken, + filter gomatrixserverlib.EventFilter, +) types.StreamPosition { + return 0 +} + +func (p *TypingStreamProvider) StreamNotifyAfter( + ctx context.Context, + from types.StreamingToken, +) chan struct{} { + ch := make(chan struct{}) + + check := func() bool { + p.latestMutex.RLock() + defer p.latestMutex.RUnlock() + if from.TypingPosition > p.latest { + close(ch) + return true + } + return false + } + + // If we've already advanced past the specified position + // then return straight away. + if check() { + return ch + } + + // If we haven't, then we'll subscribe to updates. The + // sync.Cond will fire every time the latest position + // updates, so we can check and see if we've advanced + // past it. + go func(p *TypingStreamProvider) { + p.update.L.Lock() + defer p.update.L.Unlock() + + for { + select { + case <-ctx.Done(): + // The context has expired, so there's no point + // in continuing to wait for the update. + return + default: + // The latest position has been advanced. Let's + // see if it's advanced to the position we care + // about. If it has then we'll return. + p.update.Wait() + if check() { + return + } + } + } + }(p) + + return ch +} + +func (p *TypingStreamProvider) StreamLatestPosition( + ctx context.Context, +) types.StreamPosition { + p.latestMutex.RLock() + defer p.latestMutex.RUnlock() + + return p.latest +} diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go index 1ad0e9473..561dff91a 100644 --- a/syncapi/storage/sqlite3/syncserver.go +++ b/syncapi/storage/sqlite3/syncserver.go @@ -121,5 +121,6 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er Receipts: receipts, EDUCache: cache.New(), } + d.Database.ConfigureProviders() return nil } diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index 0751487a2..94f680ca6 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -41,13 +41,15 @@ import ( // RequestPool manages HTTP long-poll connections for /sync type RequestPool struct { - db storage.Database - cfg *config.SyncAPI - userAPI userapi.UserInternalAPI - Notifier *Notifier - keyAPI keyapi.KeyInternalAPI - rsAPI roomserverAPI.RoomserverInternalAPI - lastseen sync.Map + db storage.Database + cfg *config.SyncAPI + userAPI userapi.UserInternalAPI + Notifier *Notifier + keyAPI keyapi.KeyInternalAPI + rsAPI roomserverAPI.RoomserverInternalAPI + lastseen sync.Map + pduStream types.StreamProvider + typingStream types.StreamProvider } // NewRequestPool makes a new RequestPool @@ -56,7 +58,17 @@ func NewRequestPool( userAPI userapi.UserInternalAPI, keyAPI keyapi.KeyInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI, ) *RequestPool { - rp := &RequestPool{db, cfg, userAPI, n, keyAPI, rsAPI, sync.Map{}} + rp := &RequestPool{ + db: db, + cfg: cfg, + userAPI: userAPI, + Notifier: n, + keyAPI: keyAPI, + rsAPI: rsAPI, + lastseen: sync.Map{}, + pduStream: db.PDUStream(), + typingStream: db.TypingStream(), + } go rp.cleanLastSeen() return rp } @@ -147,81 +159,58 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. "limit": syncReq.limit, }) + _ = logger + activeSyncRequests.Inc() defer activeSyncRequests.Dec() rp.updateLastSeen(req, device) - currPos := rp.Notifier.CurrentPosition() - - if rp.shouldReturnImmediately(syncReq) { - syncData, err = rp.currentSyncForUser(*syncReq, currPos) - if err != nil { - logger.WithError(err).Error("rp.currentSyncForUser failed") - return jsonerror.InternalServerError() - } - logger.WithField("next", syncData.NextBatch).Info("Responding immediately") - return util.JSONResponse{ - Code: http.StatusOK, - JSON: syncData, - } + syncData = types.NewResponse() + filter := gomatrixserverlib.DefaultEventFilter() + syncData.NextBatch = types.StreamingToken{ + PDUPosition: rp.pduStream.StreamLatestPosition(syncReq.ctx), + TypingPosition: rp.typingStream.StreamLatestPosition(syncReq.ctx), } waitingSyncRequests.Inc() defer waitingSyncRequests.Dec() - // Otherwise, we wait for the notifier to tell us if something *may* have - // happened. We loop in case it turns out that nothing did happen. + if !rp.shouldReturnImmediately(syncReq) { + timer := time.NewTimer(syncReq.timeout) // case of timeout=0 is handled above + defer timer.Stop() - timer := time.NewTimer(syncReq.timeout) // case of timeout=0 is handled above - defer timer.Stop() - - userStreamListener := rp.Notifier.GetListener(*syncReq) - defer userStreamListener.Close() - - // We need the loop in case userStreamListener wakes up even if there isn't - // anything to send down. In this case, we'll jump out of the select but - // don't want to send anything back until we get some actual content to - // respond with, so we skip the return an go back to waiting for content to - // be sent down or the request timing out. - var hasTimedOut bool - sincePos := syncReq.since - for { select { - // Wait for notifier to wake us up - case <-userStreamListener.GetNotifyChannel(sincePos): - currPos = userStreamListener.GetSyncPosition() - // Or for timeout to expire - case <-timer.C: - // We just need to ensure we get out of the select after reaching the - // timeout, but there's nothing specific we want to do in this case - // apart from that, so we do nothing except stating we're timing out - // and need to respond. - hasTimedOut = true - // Or for the request to be cancelled - case <-req.Context().Done(): - logger.WithError(err).Error("request cancelled") - return jsonerror.InternalServerError() - } - - // Note that we don't time out during calculation of sync - // response. This ensures that we don't waste the hard work - // of calculating the sync only to get timed out before we - // can respond - syncData, err = rp.currentSyncForUser(*syncReq, currPos) - if err != nil { - logger.WithError(err).Error("rp.currentSyncForUser failed") - return jsonerror.InternalServerError() - } - - if !syncData.IsEmpty() || hasTimedOut { - logger.WithField("next", syncData.NextBatch).WithField("timed_out", hasTimedOut).Info("Responding") + case <-syncReq.ctx.Done(): + // Caller gave up + logger.Println("Context expired") return util.JSONResponse{ Code: http.StatusOK, JSON: syncData, } + + case <-timer.C: + // Timeout reached + logger.Println("Timed out") + return util.JSONResponse{ + Code: http.StatusOK, + JSON: syncData, + } + + case <-rp.pduStream.StreamNotifyAfter(syncReq.ctx, syncReq.since): + logger.Println("PDU stream awake") + case <-rp.typingStream.StreamNotifyAfter(syncReq.ctx, syncReq.since): + logger.Println("Typing stream awake") } } + + syncData.NextBatch.PDUPosition = rp.pduStream.StreamRange(syncReq.ctx, syncData, device, syncReq.since, syncData.NextBatch, filter) + syncData.NextBatch.TypingPosition = rp.typingStream.StreamRange(syncReq.ctx, syncData, device, syncReq.since, syncData.NextBatch, filter) + + return util.JSONResponse{ + Code: http.StatusOK, + JSON: syncData, + } } func (rp *RequestPool) OnIncomingKeyChangeRequest(req *http.Request, device *userapi.Device) util.JSONResponse { @@ -274,6 +263,7 @@ func (rp *RequestPool) OnIncomingKeyChangeRequest(req *http.Request, device *use } // nolint:gocyclo +/* func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.StreamingToken) (*types.Response, error) { res := types.NewResponse() @@ -330,6 +320,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea res.NextBatch.SendToDevicePosition = lastPos return res, err } +*/ func (rp *RequestPool) appendDeviceLists( data *types.Response, userID string, since, to types.StreamingToken, diff --git a/syncapi/types/provider.go b/syncapi/types/provider.go new file mode 100644 index 000000000..3f81557fc --- /dev/null +++ b/syncapi/types/provider.go @@ -0,0 +1,37 @@ +package types + +import ( + "context" + + userapi "github.com/matrix-org/dendrite/userapi/api" + "github.com/matrix-org/gomatrixserverlib" +) + +type StreamProvider interface { + StreamSetup() + StreamAdvance(latest StreamPosition) + + // Range will update the response to include all updates between + // the from and to sync positions. It will always return immediately, + // making no changes if the range contains no updates. + StreamRange(ctx context.Context, res *Response, device *userapi.Device, from, to StreamingToken, filter gomatrixserverlib.EventFilter) StreamPosition + + // NotifyAfter returns a channel which will be closed once the + // stream advances past the "from" position. + StreamNotifyAfter(ctx context.Context, from StreamingToken) chan struct{} + + // LatestPosition returns the latest stream position for this stream. + StreamLatestPosition(ctx context.Context) StreamPosition +} + +type TopologyProvider interface { + // Range will update the response to include all updates between + // the from and to sync positions for the given room. It will always + // return immediately, making no changes if the range contains no + // updates. + TopologyRange(ctx context.Context, res *Response, roomID string, from, to TopologyToken, filter gomatrixserverlib.EventFilter) + + // LatestPosition returns the latest stream position for this stream + // for the given room. + TopologyLatestPosition(ctx context.Context, roomID string) StreamPosition +}