From d70ebaa1b922ae653a80a15d080a987c33d61daf Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Fri, 8 Jan 2021 13:36:08 +0000 Subject: [PATCH] Fixes --- syncapi/notifier/notifier.go | 2 -- syncapi/routing/messages.go | 2 +- syncapi/streams/stream_accountdata.go | 2 +- syncapi/streams/stream_devicelist.go | 6 ++++-- syncapi/streams/stream_invite.go | 2 +- syncapi/streams/stream_pdu.go | 8 ++++---- syncapi/streams/stream_receipt.go | 4 ++-- syncapi/streams/stream_sendtodevice.go | 4 ++-- syncapi/streams/stream_typing.go | 2 +- syncapi/sync/requestpool.go | 8 ++++---- 10 files changed, 20 insertions(+), 20 deletions(-) diff --git a/syncapi/notifier/notifier.go b/syncapi/notifier/notifier.go index 561c6f0c0..47bf04bf6 100644 --- a/syncapi/notifier/notifier.go +++ b/syncapi/notifier/notifier.go @@ -131,7 +131,6 @@ func (n *Notifier) OnNewPeek( defer n.streamLock.Unlock() n.addPeekingDevice(roomID, userID, deviceID) - //n.streams.PDUStreamProvider.Advance(posUpdate.PDUPosition) // we don't wake up devices here given the roomserver consumer will do this shortly afterwards // by calling OnNewEvent. @@ -144,7 +143,6 @@ func (n *Notifier) OnRetirePeek( defer n.streamLock.Unlock() n.removePeekingDevice(roomID, userID, deviceID) - //n.streams.PDUStreamProvider.Advance(posUpdate.PDUPosition) // we don't wake up devices here given the roomserver consumer will do this shortly afterwards // by calling OnRetireEvent. diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go index 224517e00..14389ebbf 100644 --- a/syncapi/routing/messages.go +++ b/syncapi/routing/messages.go @@ -92,7 +92,7 @@ func OnIncomingMessagesRequest( if emptyFromSupplied { // NOTSPEC: We will pretend they used the latest sync token if no ?from= was provided. // We do this to allow clients to get messages without having to call `/sync` e.g Cerulean - currPos := types.TopologyToken{} // .Notifier.CurrentPosition() + currPos := srp.Notifier.CurrentPosition() fromQuery = currPos.String() } diff --git a/syncapi/streams/stream_accountdata.go b/syncapi/streams/stream_accountdata.go index 2e85e9e9b..dd9b520cb 100644 --- a/syncapi/streams/stream_accountdata.go +++ b/syncapi/streams/stream_accountdata.go @@ -66,7 +66,7 @@ func (p *AccountDataStreamProvider) IncrementalSync( ) if err != nil { req.Log.WithError(err).Error("p.DB.GetAccountDataInRange failed") - return to + return from } if len(dataTypes) == 0 { diff --git a/syncapi/streams/stream_devicelist.go b/syncapi/streams/stream_devicelist.go index d50361ecb..c43d50a49 100644 --- a/syncapi/streams/stream_devicelist.go +++ b/syncapi/streams/stream_devicelist.go @@ -30,11 +30,13 @@ func (p *DeviceListStreamProvider) IncrementalSync( var err error to, _, err = internal.DeviceListCatchup(context.Background(), p.keyAPI, p.rsAPI, req.Device.UserID, req.Response, from, to) if err != nil { - return to // nil, fmt.Errorf("internal.DeviceListCatchup: %w", err) + req.Log.WithError(err).Error("internal.DeviceListCatchup failed") + return from } err = internal.DeviceOTKCounts(req.Context, p.keyAPI, req.Device.UserID, req.Device.ID, req.Response) if err != nil { - return to // res, fmt.Errorf("internal.DeviceOTKCounts: %w", err) + req.Log.WithError(err).Error("internal.DeviceListCatchup failed") + return from } return to diff --git a/syncapi/streams/stream_invite.go b/syncapi/streams/stream_invite.go index 636dc22ca..7d5df1931 100644 --- a/syncapi/streams/stream_invite.go +++ b/syncapi/streams/stream_invite.go @@ -45,7 +45,7 @@ func (p *InviteStreamProvider) IncrementalSync( ) if err != nil { req.Log.WithError(err).Error("p.DB.InviteEventsInRange failed") - return to + return from } for roomID, inviteEvent := range invites { diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index 7bfd94891..b247c9677 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -45,7 +45,7 @@ func (p *PDUStreamProvider) CompleteSync( joinedRoomIDs, err := p.DB.RoomIDsWithMembership(ctx, req.Device.UserID, gomatrixserverlib.Join) if err != nil { req.Log.WithError(err).Error("p.DB.RoomIDsWithMembership failed") - return to + return from } stateFilter := gomatrixserverlib.DefaultStateFilter() // TODO: use filter provided in request @@ -58,7 +58,7 @@ func (p *PDUStreamProvider) CompleteSync( ) if err != nil { req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed") - return to + return from } req.Response.Rooms.Join[roomID] = *jr req.Rooms[roomID] = gomatrixserverlib.Join @@ -68,7 +68,7 @@ func (p *PDUStreamProvider) CompleteSync( peeks, err := p.DB.PeeksInRange(ctx, req.Device.UserID, req.Device.ID, r) if err != nil { req.Log.WithError(err).Error("p.DB.PeeksInRange failed") - return to + return from } for _, peek := range peeks { if !peek.Deleted { @@ -78,7 +78,7 @@ func (p *PDUStreamProvider) CompleteSync( ) if err != nil { req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed") - return to + return from } req.Response.Rooms.Peek[peek.RoomID] = *jr } diff --git a/syncapi/streams/stream_receipt.go b/syncapi/streams/stream_receipt.go index 876e2b1db..bba47a877 100644 --- a/syncapi/streams/stream_receipt.go +++ b/syncapi/streams/stream_receipt.go @@ -45,7 +45,7 @@ func (p *ReceiptStreamProvider) IncrementalSync( lastPos, receipts, err := p.DB.RoomReceiptsAfter(ctx, joinedRooms, from) if err != nil { req.Log.WithError(err).Error("p.DB.RoomReceiptsAfter failed") - return to + return from } if len(receipts) == 0 || lastPos == 0 { @@ -80,7 +80,7 @@ func (p *ReceiptStreamProvider) IncrementalSync( ev.Content, err = json.Marshal(content) if err != nil { req.Log.WithError(err).Error("json.Marshal failed") - return to + return from } jr.Ephemeral.Events = append(jr.Ephemeral.Events, ev) diff --git a/syncapi/streams/stream_sendtodevice.go b/syncapi/streams/stream_sendtodevice.go index 73be8304e..804f525dc 100644 --- a/syncapi/streams/stream_sendtodevice.go +++ b/syncapi/streams/stream_sendtodevice.go @@ -26,7 +26,7 @@ func (p *SendToDeviceStreamProvider) IncrementalSync( lastPos, events, updates, deletions, err := p.DB.SendToDeviceUpdatesForSync(req.Context, req.Device.UserID, req.Device.ID, req.Since) if err != nil { req.Log.WithError(err).Error("p.DB.SendToDeviceUpdatesForSync failed") - return to + return from } // Before we return the sync response, make sure that we take action on @@ -37,7 +37,7 @@ func (p *SendToDeviceStreamProvider) IncrementalSync( err = p.DB.CleanSendToDeviceUpdates(context.Background(), updates, deletions, req.Since) if err != nil { req.Log.WithError(err).Error("p.DB.CleanSendToDeviceUpdates failed") - return to + return from } } if len(events) > 0 { diff --git a/syncapi/streams/stream_typing.go b/syncapi/streams/stream_typing.go index 3a63b2677..60d5acf4d 100644 --- a/syncapi/streams/stream_typing.go +++ b/syncapi/streams/stream_typing.go @@ -45,7 +45,7 @@ func (p *TypingStreamProvider) IncrementalSync( }) if err != nil { req.Log.WithError(err).Error("json.Marshal failed") - return to + return from } jr.Ephemeral.Events = append(jr.Ephemeral.Events, ev) diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index 785a3af82..cedd433b1 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -46,7 +46,7 @@ type RequestPool struct { rsAPI roomserverAPI.RoomserverInternalAPI lastseen sync.Map streams *streams.Streams - notifier *notifier.Notifier + Notifier *notifier.Notifier } // NewRequestPool makes a new RequestPool @@ -64,7 +64,7 @@ func NewRequestPool( rsAPI: rsAPI, lastseen: sync.Map{}, streams: streams, - notifier: notifier, + Notifier: notifier, } go rp.cleanLastSeen() return rp @@ -154,13 +154,13 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. waitingSyncRequests.Inc() defer waitingSyncRequests.Dec() - currentPos := rp.notifier.CurrentPosition() + currentPos := rp.Notifier.CurrentPosition() if !rp.shouldReturnImmediately(syncReq) { timer := time.NewTimer(syncReq.Timeout) // case of timeout=0 is handled above defer timer.Stop() - userStreamListener := rp.notifier.GetListener(*syncReq) + userStreamListener := rp.Notifier.GetListener(*syncReq) defer userStreamListener.Close() giveup := func() util.JSONResponse {