Use addRoomDeltaToResponse

This commit is contained in:
Neil Alexander 2021-01-06 16:07:59 +00:00
parent af332f24ae
commit 9ead2d96e2
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944

View file

@ -118,7 +118,7 @@ func (p *PDUStreamProvider) IncrementalSync(
newPos = to newPos = to
var err error var err error
var events []types.StreamEvent //var events []types.StreamEvent
var stateDeltas []stateDelta var stateDeltas []stateDelta
var joinedRooms []string var joinedRooms []string
@ -139,55 +139,85 @@ func (p *PDUStreamProvider) IncrementalSync(
req.Rooms[roomID] = gomatrixserverlib.Join req.Rooms[roomID] = gomatrixserverlib.Join
} }
for _, stateDelta := range stateDeltas { for _, delta := range stateDeltas {
roomID := stateDelta.roomID err = p.addRoomDeltaToResponse(ctx, req.Device, nil, r, delta, 20, req.Response)
room := types.JoinResponse{} if err != nil {
return newPos // nil, fmt.Errorf("d.addRoomDeltaToResponse: %w", err)
if r.Backwards {
// When using backward ordering, we want the most recent events first.
if events, _, err = p.DB.OutputEvents.SelectRecentEvents(ctx, nil, roomID, r, req.Filter.Limit, false, false); err != nil {
return
}
} else {
// When using forward ordering, we want the least recent events first.
if events, err = p.DB.OutputEvents.SelectEarlyEvents(ctx, nil, roomID, r, req.Filter.Limit); err != nil {
return
}
} }
for _, event := range p.DB.StreamEventsToEvents(req.Device, events) {
room.Timeline.Events = append(
room.Timeline.Events,
gomatrixserverlib.ToClientEvent(
event.Event,
gomatrixserverlib.FormatSync,
),
)
}
for _, event := range events {
if event.StreamPosition > newPos {
newPos = event.StreamPosition
}
}
room.State.Events = gomatrixserverlib.HeaderedToClientEvents(
stateDelta.stateEvents,
gomatrixserverlib.FormatSync,
)
if len(events) > 0 {
prevBatch, err := p.DB.getBackwardTopologyPos(ctx, nil, events)
if err != nil {
return
}
room.Timeline.PrevBatch = &prevBatch
}
req.Response.Rooms.Join[roomID] = room
} }
return newPos return r.To
}
func (p *PDUStreamProvider) addRoomDeltaToResponse(
ctx context.Context,
device *userapi.Device,
txn *sql.Tx,
r types.Range,
delta stateDelta,
numRecentEventsPerRoom int,
res *types.Response,
) error {
if delta.membershipPos > 0 && delta.membership == gomatrixserverlib.Leave {
// make sure we don't leak recent events after the leave event.
// TODO: History visibility makes this somewhat complex to handle correctly. For example:
// TODO: This doesn't work for join -> leave in a single /sync request (see events prior to join).
// TODO: This will fail on join -> leave -> sensitive msg -> join -> leave
// in a single /sync request
// This is all "okay" assuming history_visibility == "shared" which it is by default.
r.To = delta.membershipPos
}
recentStreamEvents, limited, err := p.DB.OutputEvents.SelectRecentEvents(
ctx, txn, delta.roomID, r,
numRecentEventsPerRoom, true, true,
)
if err != nil {
return err
}
recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents)
delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back
prevBatch, err := p.DB.getBackwardTopologyPos(ctx, txn, recentStreamEvents)
if err != nil {
return err
}
// XXX: should we ever get this far if we have no recent events or state in this room?
// in practice we do for peeks, but possibly not joins?
if len(recentEvents) == 0 && len(delta.stateEvents) == 0 {
return nil
}
switch delta.membership {
case gomatrixserverlib.Join:
jr := types.NewJoinResponse()
jr.Timeline.PrevBatch = &prevBatch
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = limited
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Join[delta.roomID] = *jr
case gomatrixserverlib.Peek:
jr := types.NewJoinResponse()
jr.Timeline.PrevBatch = &prevBatch
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = limited
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Peek[delta.roomID] = *jr
case gomatrixserverlib.Leave:
fallthrough // transitions to leave are the same as ban
case gomatrixserverlib.Ban:
// TODO: recentEvents may contain events that this user is not allowed to see because they are
// no longer in the room.
lr := types.NewLeaveResponse()
lr.Timeline.PrevBatch = &prevBatch
lr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
lr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Leave[delta.roomID] = *lr
}
return nil
} }
func (p *PDUStreamProvider) getJoinResponseForCompleteSync( func (p *PDUStreamProvider) getJoinResponseForCompleteSync(