parent
fb2e7d1b05
commit
98d3f88bfb
|
@ -47,7 +47,7 @@ type DatabaseTransaction interface {
|
||||||
MembershipCount(ctx context.Context, roomID, membership string, pos types.StreamPosition) (int, error)
|
MembershipCount(ctx context.Context, roomID, membership string, pos types.StreamPosition) (int, error)
|
||||||
GetRoomHeroes(ctx context.Context, roomID, userID string, memberships []string) ([]string, error)
|
GetRoomHeroes(ctx context.Context, roomID, userID string, memberships []string) ([]string, error)
|
||||||
RecentEvents(ctx context.Context, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error)
|
RecentEvents(ctx context.Context, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error)
|
||||||
GetBackwardTopologyPos(ctx context.Context, events []types.StreamEvent) (types.TopologyToken, error)
|
GetBackwardTopologyPos(ctx context.Context, events []*gomatrixserverlib.HeaderedEvent) (types.TopologyToken, error)
|
||||||
PositionInTopology(ctx context.Context, eventID string) (pos types.StreamPosition, spos types.StreamPosition, err error)
|
PositionInTopology(ctx context.Context, eventID string) (pos types.StreamPosition, spos types.StreamPosition, err error)
|
||||||
InviteEventsInRange(ctx context.Context, targetUserID string, r types.Range) (map[string]*gomatrixserverlib.HeaderedEvent, map[string]*gomatrixserverlib.HeaderedEvent, types.StreamPosition, error)
|
InviteEventsInRange(ctx context.Context, targetUserID string, r types.Range) (map[string]*gomatrixserverlib.HeaderedEvent, map[string]*gomatrixserverlib.HeaderedEvent, types.StreamPosition, error)
|
||||||
PeeksInRange(ctx context.Context, userID, deviceID string, r types.Range) (peeks []types.Peek, err error)
|
PeeksInRange(ctx context.Context, userID, deviceID string, r types.Range) (peeks []types.Peek, err error)
|
||||||
|
|
|
@ -259,7 +259,7 @@ func (d *DatabaseTransaction) StreamToTopologicalPosition(
|
||||||
// oldest event in the room's topology.
|
// oldest event in the room's topology.
|
||||||
func (d *DatabaseTransaction) GetBackwardTopologyPos(
|
func (d *DatabaseTransaction) GetBackwardTopologyPos(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
events []types.StreamEvent,
|
events []*gomatrixserverlib.HeaderedEvent,
|
||||||
) (types.TopologyToken, error) {
|
) (types.TopologyToken, error) {
|
||||||
zeroToken := types.TopologyToken{}
|
zeroToken := types.TopologyToken{}
|
||||||
if len(events) == 0 {
|
if len(events) == 0 {
|
||||||
|
|
|
@ -255,10 +255,6 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
|
||||||
snapshot.StreamEventsToEvents(device, recentStreamEvents),
|
snapshot.StreamEventsToEvents(device, recentStreamEvents),
|
||||||
gomatrixserverlib.TopologicalOrderByPrevEvents,
|
gomatrixserverlib.TopologicalOrderByPrevEvents,
|
||||||
)
|
)
|
||||||
prevBatch, err := snapshot.GetBackwardTopologyPos(ctx, recentStreamEvents)
|
|
||||||
if err != nil {
|
|
||||||
return r.From, fmt.Errorf("p.DB.GetBackwardTopologyPos: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// If we didn't return any events at all then don't bother doing anything else.
|
// If we didn't return any events at all then don't bother doing anything else.
|
||||||
if len(recentEvents) == 0 && len(delta.StateEvents) == 0 {
|
if len(recentEvents) == 0 && len(delta.StateEvents) == 0 {
|
||||||
|
@ -312,6 +308,11 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
|
||||||
limited = true
|
limited = true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
prevBatch, err := snapshot.GetBackwardTopologyPos(ctx, events)
|
||||||
|
if err != nil {
|
||||||
|
return r.From, fmt.Errorf("p.DB.GetBackwardTopologyPos: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
// Now that we've filtered the timeline, work out which state events are still
|
// Now that we've filtered the timeline, work out which state events are still
|
||||||
// left. Anything that appears in the filtered timeline will be removed from the
|
// left. Anything that appears in the filtered timeline will be removed from the
|
||||||
// "state" section and kept in "timeline".
|
// "state" section and kept in "timeline".
|
||||||
|
@ -489,28 +490,6 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Retrieve the backward topology position, i.e. the position of the
|
|
||||||
// oldest event in the room's topology.
|
|
||||||
var prevBatch *types.TopologyToken
|
|
||||||
if len(recentStreamEvents) > 0 {
|
|
||||||
var backwardTopologyPos, backwardStreamPos types.StreamPosition
|
|
||||||
event := recentStreamEvents[0]
|
|
||||||
// If this is the beginning of the room, we can't go back further. We're going to return
|
|
||||||
// the TopologyToken from the last event instead. (Synapse returns the /sync next_Batch)
|
|
||||||
if event.Type() == gomatrixserverlib.MRoomCreate && event.StateKeyEquals("") {
|
|
||||||
event = recentStreamEvents[len(recentStreamEvents)-1]
|
|
||||||
}
|
|
||||||
backwardTopologyPos, backwardStreamPos, err = snapshot.PositionInTopology(ctx, event.EventID())
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
prevBatch = &types.TopologyToken{
|
|
||||||
Depth: backwardTopologyPos,
|
|
||||||
PDUPosition: backwardStreamPos,
|
|
||||||
}
|
|
||||||
prevBatch.Decrement()
|
|
||||||
}
|
|
||||||
|
|
||||||
p.addRoomSummary(ctx, snapshot, jr, roomID, device.UserID, r.From)
|
p.addRoomSummary(ctx, snapshot, jr, roomID, device.UserID, r.From)
|
||||||
|
|
||||||
// We don't include a device here as we don't need to send down
|
// We don't include a device here as we don't need to send down
|
||||||
|
@ -545,6 +524,28 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Retrieve the backward topology position, i.e. the position of the
|
||||||
|
// oldest event in the room's topology.
|
||||||
|
var prevBatch *types.TopologyToken
|
||||||
|
if len(events) > 0 {
|
||||||
|
var backwardTopologyPos, backwardStreamPos types.StreamPosition
|
||||||
|
event := events[0]
|
||||||
|
// If this is the beginning of the room, we can't go back further. We're going to return
|
||||||
|
// the TopologyToken from the last event instead. (Synapse returns the /sync next_Batch)
|
||||||
|
if event.Type() == gomatrixserverlib.MRoomCreate && event.StateKeyEquals("") {
|
||||||
|
event = events[len(events)-1]
|
||||||
|
}
|
||||||
|
backwardTopologyPos, backwardStreamPos, err = snapshot.PositionInTopology(ctx, event.EventID())
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
prevBatch = &types.TopologyToken{
|
||||||
|
Depth: backwardTopologyPos,
|
||||||
|
PDUPosition: backwardStreamPos,
|
||||||
|
}
|
||||||
|
prevBatch.Decrement()
|
||||||
|
}
|
||||||
|
|
||||||
jr.Timeline.PrevBatch = prevBatch
|
jr.Timeline.PrevBatch = prevBatch
|
||||||
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(events, gomatrixserverlib.FormatSync)
|
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(events, gomatrixserverlib.FormatSync)
|
||||||
// If we are limited by the filter AND the history visibility filter
|
// If we are limited by the filter AND the history visibility filter
|
||||||
|
|
Loading…
Reference in a new issue