This commit is contained in:
Neil Alexander 2021-01-08 13:36:08 +00:00
parent e96185bbc0
commit d70ebaa1b9
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
10 changed files with 20 additions and 20 deletions

View file

@ -131,7 +131,6 @@ func (n *Notifier) OnNewPeek(
defer n.streamLock.Unlock()
n.addPeekingDevice(roomID, userID, deviceID)
//n.streams.PDUStreamProvider.Advance(posUpdate.PDUPosition)
// we don't wake up devices here given the roomserver consumer will do this shortly afterwards
// by calling OnNewEvent.
@ -144,7 +143,6 @@ func (n *Notifier) OnRetirePeek(
defer n.streamLock.Unlock()
n.removePeekingDevice(roomID, userID, deviceID)
//n.streams.PDUStreamProvider.Advance(posUpdate.PDUPosition)
// we don't wake up devices here given the roomserver consumer will do this shortly afterwards
// by calling OnRetireEvent.

View file

@ -92,7 +92,7 @@ func OnIncomingMessagesRequest(
if emptyFromSupplied {
// NOTSPEC: We will pretend they used the latest sync token if no ?from= was provided.
// We do this to allow clients to get messages without having to call `/sync` e.g Cerulean
currPos := types.TopologyToken{} // .Notifier.CurrentPosition()
currPos := srp.Notifier.CurrentPosition()
fromQuery = currPos.String()
}

View file

@ -66,7 +66,7 @@ func (p *AccountDataStreamProvider) IncrementalSync(
)
if err != nil {
req.Log.WithError(err).Error("p.DB.GetAccountDataInRange failed")
return to
return from
}
if len(dataTypes) == 0 {

View file

@ -30,11 +30,13 @@ func (p *DeviceListStreamProvider) IncrementalSync(
var err error
to, _, err = internal.DeviceListCatchup(context.Background(), p.keyAPI, p.rsAPI, req.Device.UserID, req.Response, from, to)
if err != nil {
return to // nil, fmt.Errorf("internal.DeviceListCatchup: %w", err)
req.Log.WithError(err).Error("internal.DeviceListCatchup failed")
return from
}
err = internal.DeviceOTKCounts(req.Context, p.keyAPI, req.Device.UserID, req.Device.ID, req.Response)
if err != nil {
return to // res, fmt.Errorf("internal.DeviceOTKCounts: %w", err)
req.Log.WithError(err).Error("internal.DeviceListCatchup failed")
return from
}
return to

View file

@ -45,7 +45,7 @@ func (p *InviteStreamProvider) IncrementalSync(
)
if err != nil {
req.Log.WithError(err).Error("p.DB.InviteEventsInRange failed")
return to
return from
}
for roomID, inviteEvent := range invites {

View file

@ -45,7 +45,7 @@ func (p *PDUStreamProvider) CompleteSync(
joinedRoomIDs, err := p.DB.RoomIDsWithMembership(ctx, req.Device.UserID, gomatrixserverlib.Join)
if err != nil {
req.Log.WithError(err).Error("p.DB.RoomIDsWithMembership failed")
return to
return from
}
stateFilter := gomatrixserverlib.DefaultStateFilter() // TODO: use filter provided in request
@ -58,7 +58,7 @@ func (p *PDUStreamProvider) CompleteSync(
)
if err != nil {
req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed")
return to
return from
}
req.Response.Rooms.Join[roomID] = *jr
req.Rooms[roomID] = gomatrixserverlib.Join
@ -68,7 +68,7 @@ func (p *PDUStreamProvider) CompleteSync(
peeks, err := p.DB.PeeksInRange(ctx, req.Device.UserID, req.Device.ID, r)
if err != nil {
req.Log.WithError(err).Error("p.DB.PeeksInRange failed")
return to
return from
}
for _, peek := range peeks {
if !peek.Deleted {
@ -78,7 +78,7 @@ func (p *PDUStreamProvider) CompleteSync(
)
if err != nil {
req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed")
return to
return from
}
req.Response.Rooms.Peek[peek.RoomID] = *jr
}

View file

@ -45,7 +45,7 @@ func (p *ReceiptStreamProvider) IncrementalSync(
lastPos, receipts, err := p.DB.RoomReceiptsAfter(ctx, joinedRooms, from)
if err != nil {
req.Log.WithError(err).Error("p.DB.RoomReceiptsAfter failed")
return to
return from
}
if len(receipts) == 0 || lastPos == 0 {
@ -80,7 +80,7 @@ func (p *ReceiptStreamProvider) IncrementalSync(
ev.Content, err = json.Marshal(content)
if err != nil {
req.Log.WithError(err).Error("json.Marshal failed")
return to
return from
}
jr.Ephemeral.Events = append(jr.Ephemeral.Events, ev)

View file

@ -26,7 +26,7 @@ func (p *SendToDeviceStreamProvider) IncrementalSync(
lastPos, events, updates, deletions, err := p.DB.SendToDeviceUpdatesForSync(req.Context, req.Device.UserID, req.Device.ID, req.Since)
if err != nil {
req.Log.WithError(err).Error("p.DB.SendToDeviceUpdatesForSync failed")
return to
return from
}
// Before we return the sync response, make sure that we take action on
@ -37,7 +37,7 @@ func (p *SendToDeviceStreamProvider) IncrementalSync(
err = p.DB.CleanSendToDeviceUpdates(context.Background(), updates, deletions, req.Since)
if err != nil {
req.Log.WithError(err).Error("p.DB.CleanSendToDeviceUpdates failed")
return to
return from
}
}
if len(events) > 0 {

View file

@ -45,7 +45,7 @@ func (p *TypingStreamProvider) IncrementalSync(
})
if err != nil {
req.Log.WithError(err).Error("json.Marshal failed")
return to
return from
}
jr.Ephemeral.Events = append(jr.Ephemeral.Events, ev)

View file

@ -46,7 +46,7 @@ type RequestPool struct {
rsAPI roomserverAPI.RoomserverInternalAPI
lastseen sync.Map
streams *streams.Streams
notifier *notifier.Notifier
Notifier *notifier.Notifier
}
// NewRequestPool makes a new RequestPool
@ -64,7 +64,7 @@ func NewRequestPool(
rsAPI: rsAPI,
lastseen: sync.Map{},
streams: streams,
notifier: notifier,
Notifier: notifier,
}
go rp.cleanLastSeen()
return rp
@ -154,13 +154,13 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
waitingSyncRequests.Inc()
defer waitingSyncRequests.Dec()
currentPos := rp.notifier.CurrentPosition()
currentPos := rp.Notifier.CurrentPosition()
if !rp.shouldReturnImmediately(syncReq) {
timer := time.NewTimer(syncReq.Timeout) // case of timeout=0 is handled above
defer timer.Stop()
userStreamListener := rp.notifier.GetListener(*syncReq)
userStreamListener := rp.Notifier.GetListener(*syncReq)
defer userStreamListener.Close()
giveup := func() util.JSONResponse {