From 6063a8497c976c74d8d2a4f863d559fdf9d4391f Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Wed, 13 May 2020 11:17:35 +0100 Subject: [PATCH] final tweaks/hacks --- syncapi/routing/messages.go | 31 +++++++++++++++++++++----- syncapi/storage/postgres/syncserver.go | 18 ++++++--------- syncapi/types/types.go | 3 +++ 3 files changed, 35 insertions(+), 17 deletions(-) diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go index dd5bed6a1..4e57ef2bb 100644 --- a/syncapi/routing/messages.go +++ b/syncapi/routing/messages.go @@ -40,6 +40,7 @@ type messagesReq struct { roomID string from *types.TopologyToken to *types.TopologyToken + fromStream *types.StreamingToken wasToProvided bool limit int backwardOrdering bool @@ -66,11 +67,16 @@ func OnIncomingMessagesRequest( // Extract parameters from the request's URL. // Pagination tokens. + var fromStream *types.StreamingToken from, err := types.NewTopologyTokenFromString(req.URL.Query().Get("from")) if err != nil { - return util.JSONResponse{ - Code: http.StatusBadRequest, - JSON: jsonerror.InvalidArgumentValue("Invalid from parameter: " + err.Error()), + fs, err2 := types.NewStreamTokenFromString(req.URL.Query().Get("from")) + fromStream = &fs + if err2 != nil { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.InvalidArgumentValue("Invalid from parameter: " + err2.Error()), + } } } @@ -141,6 +147,7 @@ func OnIncomingMessagesRequest( roomID: roomID, from: &from, to: &to, + fromStream: fromStream, wasToProvided: wasToProvided, limit: limit, backwardOrdering: backwardOrdering, @@ -181,9 +188,21 @@ func (r *messagesReq) retrieveEvents() ( end types.TopologyToken, err error, ) { // Retrieve the events from the local database. - streamEvents, err := r.db.GetEventsInRange( - r.ctx, r.from, r.to, r.roomID, r.limit, r.backwardOrdering, - ) + var streamEvents []types.StreamEvent + if r.fromStream != nil { + toStream := r.to.StreamToken() + util.GetLogger(r.ctx).Infof("quack fromStream positions %v", r.fromStream.Positions) + util.GetLogger(r.ctx).Infof("quack toStream positions %v", toStream.Positions) + streamEvents, err = r.db.GetEventsInStreamingRange( + r.ctx, r.fromStream, &toStream, r.roomID, r.limit, r.backwardOrdering, + ) + } else { + util.GetLogger(r.ctx).Infof("quack from positions %v", r.from.Positions) + util.GetLogger(r.ctx).Infof("quack to positions %v", r.to.Positions) + streamEvents, err = r.db.GetEventsInTopologicalRange( + r.ctx, r.from, r.to, r.roomID, r.limit, r.backwardOrdering, + ) + } if err != nil { err = fmt.Errorf("GetEventsInRange: %w", err) return diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go index 4436cd186..d45bc09e5 100644 --- a/syncapi/storage/postgres/syncserver.go +++ b/syncapi/storage/postgres/syncserver.go @@ -263,16 +263,14 @@ func (d *SyncServerDatasource) GetEventsInTopologicalRange( return } +// GetEventsInStreamingRange retrieves all of the events on a given ordering using the +// given extremities and limit. func (d *SyncServerDatasource) GetEventsInStreamingRange( ctx context.Context, from, to *types.StreamingToken, roomID string, limit int, backwardOrdering bool, ) (events []types.StreamEvent, err error) { - // If the pagination token's type is types.PaginationTokenTypeStream, the - // events must be retrieved from the table contaning the syncapi server's - // whole stream of events. - if backwardOrdering { // When using backward ordering, we want the most recent events first. if events, err = d.events.selectRecentEvents( @@ -288,8 +286,7 @@ func (d *SyncServerDatasource) GetEventsInStreamingRange( return } } - - return + return events, err } func (d *SyncServerDatasource) SyncPosition(ctx context.Context) (types.StreamingToken, error) { @@ -589,18 +586,17 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync( // oldest event in the room's topology. var backwardTopologyPos, backwardStreamPos types.StreamPosition backwardTopologyPos, backwardStreamPos, err = d.topology.selectPositionInTopology(ctx, recentStreamEvents[0].EventID()) - if backwardTopologyPos-1 <= 0 { - backwardTopologyPos = types.StreamPosition(1) - } else { - backwardTopologyPos-- + if err != nil { + return } + prevBatch := types.NewTopologyToken(backwardTopologyPos, backwardStreamPos) + prevBatch.Decrement() // We don't include a device here as we don't need to send down // transaction IDs for complete syncs recentEvents := d.StreamEventsToEvents(nil, recentStreamEvents) stateEvents = removeDuplicates(stateEvents, recentEvents) jr := types.NewJoinResponse() - prevBatch := types.NewTopologyToken(backwardTopologyPos, backwardStreamPos) jr.Timeline.PrevBatch = prevBatch.String() jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) jr.Timeline.Limited = true diff --git a/syncapi/types/types.go b/syncapi/types/types.go index 51308c495..86f527875 100644 --- a/syncapi/types/types.go +++ b/syncapi/types/types.go @@ -107,6 +107,9 @@ func (t *TopologyToken) Depth() StreamPosition { func (t *TopologyToken) PDUPosition() StreamPosition { return t.Positions[1] } +func (t *TopologyToken) StreamToken() StreamingToken { + return NewStreamToken(t.PDUPosition(), 0) +} func (t *TopologyToken) String() string { return t.syncToken.String() }