mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-26 00:03:09 -06:00
Implement recognition of 'include_leave' parameter in filter for sync endpoint
This commit is contained in:
parent
5d65a879a5
commit
2c0273a36e
|
|
@ -67,7 +67,7 @@ type Database interface {
|
||||||
IncrementalSync(ctx context.Context, res *types.Response, device userapi.Device, fromPos, toPos types.StreamingToken, numRecentEventsPerRoom int, wantFullState bool) (*types.Response, error)
|
IncrementalSync(ctx context.Context, res *types.Response, device userapi.Device, fromPos, toPos types.StreamingToken, numRecentEventsPerRoom int, wantFullState bool) (*types.Response, error)
|
||||||
// CompleteSync returns a complete /sync API response for the given user. A response object
|
// CompleteSync returns a complete /sync API response for the given user. A response object
|
||||||
// must be provided for CompleteSync to populate - it will not create one.
|
// must be provided for CompleteSync to populate - it will not create one.
|
||||||
CompleteSync(ctx context.Context, res *types.Response, device userapi.Device, numRecentEventsPerRoom int) (*types.Response, error)
|
CompleteSync(ctx context.Context, res *types.Response, device userapi.Device, numRecentEventsPerRoom int, includeLeave bool) (*types.Response, error)
|
||||||
// GetAccountDataInRange returns all account data for a given user inserted or
|
// GetAccountDataInRange returns all account data for a given user inserted or
|
||||||
// updated between two given positions
|
// updated between two given positions
|
||||||
// Returns a map following the format data[roomID] = []dataTypes
|
// Returns a map following the format data[roomID] = []dataTypes
|
||||||
|
|
|
||||||
|
|
@ -745,6 +745,7 @@ func (d *Database) getResponseWithPDUsForCompleteSync(
|
||||||
ctx context.Context, res *types.Response,
|
ctx context.Context, res *types.Response,
|
||||||
userID string, device userapi.Device,
|
userID string, device userapi.Device,
|
||||||
numRecentEventsPerRoom int,
|
numRecentEventsPerRoom int,
|
||||||
|
includeLeave bool,
|
||||||
) (
|
) (
|
||||||
toPos types.StreamingToken,
|
toPos types.StreamingToken,
|
||||||
joinedRoomIDs []string,
|
joinedRoomIDs []string,
|
||||||
|
|
@ -811,6 +812,19 @@ func (d *Database) getResponseWithPDUsForCompleteSync(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//Add left rooms
|
||||||
|
leftRoomIDs, err := d.CurrentRoomState.SelectRoomIDsWithMembership(ctx, txn, userID, gomatrixserverlib.Leave)
|
||||||
|
for _, roomID := range leftRoomIDs {
|
||||||
|
var lr *types.LeaveResponse
|
||||||
|
lr, err = d.getLeaveResponseForCompleteSync(
|
||||||
|
ctx, txn, roomID, r, &stateFilter, numRecentEventsPerRoom, device,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
res.Rooms.Leave[roomID] = *lr
|
||||||
|
}
|
||||||
|
|
||||||
if err = d.addInvitesToResponse(ctx, txn, userID, r, res); err != nil {
|
if err = d.addInvitesToResponse(ctx, txn, userID, r, res); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
@ -896,12 +910,97 @@ func (d *Database) getJoinResponseForCompleteSync(
|
||||||
return jr, nil
|
return jr, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *Database) getLeaveResponseForCompleteSync(
|
||||||
|
ctx context.Context, txn *sql.Tx,
|
||||||
|
roomID string,
|
||||||
|
r types.Range,
|
||||||
|
stateFilter *gomatrixserverlib.StateFilter,
|
||||||
|
numRecentEventsPerRoom int, device userapi.Device,
|
||||||
|
) (lr *types.LeaveResponse, err error) {
|
||||||
|
var stateEvents []*gomatrixserverlib.HeaderedEvent
|
||||||
|
stateEvents, err = d.CurrentRoomState.SelectCurrentState(ctx, txn, roomID, stateFilter)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// TODO: When filters are added, we may need to call this multiple times to get enough events.
|
||||||
|
// See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316
|
||||||
|
var recentStreamEvents []types.StreamEvent
|
||||||
|
var limited bool
|
||||||
|
recentStreamEvents, limited, err = d.OutputEvents.SelectRecentEvents(
|
||||||
|
ctx, txn, roomID, r, numRecentEventsPerRoom, true, true,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO FIXME: We don't fully implement history visibility yet. To avoid leaking events which the
|
||||||
|
// user shouldn't see, we check the recent events and remove any prior to the join event of the user
|
||||||
|
// which is equiv to history_visibility: joined
|
||||||
|
joinEventIndex := -1
|
||||||
|
leaveEventIndex := -1
|
||||||
|
for i := len(recentStreamEvents) - 1; i >= 0; i-- {
|
||||||
|
ev := recentStreamEvents[i]
|
||||||
|
if ev.Type() == gomatrixserverlib.MRoomMember && ev.StateKeyEquals(device.UserID) {
|
||||||
|
membership, _ := ev.Membership()
|
||||||
|
if leaveEventIndex == -1 && membership != "leave" {
|
||||||
|
continue
|
||||||
|
} else if membership == "leave"{
|
||||||
|
leaveEventIndex = i
|
||||||
|
} else if membership == "join" {
|
||||||
|
joinEventIndex = i
|
||||||
|
if i > 0 {
|
||||||
|
// the create event happens before the first join, so we should cut it at that point instead
|
||||||
|
if recentStreamEvents[i-1].Type() == gomatrixserverlib.MRoomCreate && recentStreamEvents[i-1].StateKeyEquals("") {
|
||||||
|
joinEventIndex = i - 1
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
recentStreamEvents = recentStreamEvents[:leaveEventIndex+1]
|
||||||
|
if joinEventIndex != -1 {
|
||||||
|
// cut all events earlier than the join (but not the join itself)
|
||||||
|
recentStreamEvents = recentStreamEvents[joinEventIndex:]
|
||||||
|
limited = false // so clients know not to try to backpaginate
|
||||||
|
}
|
||||||
|
|
||||||
|
// Retrieve the backward topology position, i.e. the position of the
|
||||||
|
// oldest event in the room's topology.
|
||||||
|
var prevBatchStr string
|
||||||
|
if len(recentStreamEvents) > 0 {
|
||||||
|
var backwardTopologyPos, backwardStreamPos types.StreamPosition
|
||||||
|
backwardTopologyPos, backwardStreamPos, err = d.Topology.SelectPositionInTopology(ctx, txn, recentStreamEvents[0].EventID())
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
prevBatch := types.NewTopologyToken(backwardTopologyPos, backwardStreamPos)
|
||||||
|
prevBatch.Decrement()
|
||||||
|
prevBatchStr = prevBatch.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
// We don't include a device here as we don't need to send down
|
||||||
|
// transaction IDs for complete syncs, but we do it anyway because Sytest demands it for:
|
||||||
|
// "Can sync a room with a message with a transaction id" - which does a complete sync to check.
|
||||||
|
recentEvents := d.StreamEventsToEvents(&device, recentStreamEvents)
|
||||||
|
stateEvents = removeDuplicates(stateEvents, recentEvents)
|
||||||
|
lr = types.NewLeaveResponse()
|
||||||
|
lr.Timeline.PrevBatch = prevBatchStr
|
||||||
|
lr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
||||||
|
lr.Timeline.Limited = limited
|
||||||
|
lr.State.Events = gomatrixserverlib.HeaderedToClientEvents(stateEvents, gomatrixserverlib.FormatSync)
|
||||||
|
return lr, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
func (d *Database) CompleteSync(
|
func (d *Database) CompleteSync(
|
||||||
ctx context.Context, res *types.Response,
|
ctx context.Context, res *types.Response,
|
||||||
device userapi.Device, numRecentEventsPerRoom int,
|
device userapi.Device, numRecentEventsPerRoom int,
|
||||||
|
includeLeave bool,
|
||||||
) (*types.Response, error) {
|
) (*types.Response, error) {
|
||||||
toPos, joinedRoomIDs, err := d.getResponseWithPDUsForCompleteSync(
|
toPos, joinedRoomIDs, err := d.getResponseWithPDUsForCompleteSync(
|
||||||
ctx, res, device.UserID, device, numRecentEventsPerRoom,
|
ctx, res, device.UserID, device, numRecentEventsPerRoom, includeLeave,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("d.getResponseWithPDUsForCompleteSync: %w", err)
|
return nil, fmt.Errorf("d.getResponseWithPDUsForCompleteSync: %w", err)
|
||||||
|
|
@ -948,8 +1047,10 @@ func (d *Database) addInvitesToResponse(
|
||||||
}
|
}
|
||||||
for roomID := range retiredInvites {
|
for roomID := range retiredInvites {
|
||||||
if _, ok := res.Rooms.Join[roomID]; !ok {
|
if _, ok := res.Rooms.Join[roomID]; !ok {
|
||||||
|
/*
|
||||||
lr := types.NewLeaveResponse()
|
lr := types.NewLeaveResponse()
|
||||||
res.Rooms.Leave[roomID] = *lr
|
res.Rooms.Leave[roomID] = *lr
|
||||||
|
*/
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
|
||||||
|
|
@ -195,7 +195,7 @@ func TestSyncResponse(t *testing.T) {
|
||||||
DoSync: func() (*types.Response, error) {
|
DoSync: func() (*types.Response, error) {
|
||||||
res := types.NewResponse()
|
res := types.NewResponse()
|
||||||
// limit set to 5
|
// limit set to 5
|
||||||
return db.CompleteSync(ctx, res, testUserDeviceA, 5)
|
return db.CompleteSync(ctx, res, testUserDeviceA, 5, false)
|
||||||
},
|
},
|
||||||
// want the last 5 events
|
// want the last 5 events
|
||||||
WantTimeline: events[len(events)-5:],
|
WantTimeline: events[len(events)-5:],
|
||||||
|
|
@ -208,7 +208,7 @@ func TestSyncResponse(t *testing.T) {
|
||||||
Name: "CompleteSync",
|
Name: "CompleteSync",
|
||||||
DoSync: func() (*types.Response, error) {
|
DoSync: func() (*types.Response, error) {
|
||||||
res := types.NewResponse()
|
res := types.NewResponse()
|
||||||
return db.CompleteSync(ctx, res, testUserDeviceA, len(events)+1)
|
return db.CompleteSync(ctx, res, testUserDeviceA, len(events)+1, false)
|
||||||
},
|
},
|
||||||
WantTimeline: events,
|
WantTimeline: events,
|
||||||
// We want no state at all as that field in /sync is the delta between the token (beginning of time)
|
// We want no state at all as that field in /sync is the delta between the token (beginning of time)
|
||||||
|
|
|
||||||
|
|
@ -31,6 +31,7 @@ import (
|
||||||
|
|
||||||
const defaultSyncTimeout = time.Duration(0)
|
const defaultSyncTimeout = time.Duration(0)
|
||||||
const DefaultTimelineLimit = 20
|
const DefaultTimelineLimit = 20
|
||||||
|
const DefaultIncludeLeave = false
|
||||||
|
|
||||||
type filter struct {
|
type filter struct {
|
||||||
Room struct {
|
Room struct {
|
||||||
|
|
@ -46,6 +47,7 @@ type syncRequest struct {
|
||||||
device userapi.Device
|
device userapi.Device
|
||||||
limit int
|
limit int
|
||||||
timeout time.Duration
|
timeout time.Duration
|
||||||
|
includeLeave bool
|
||||||
since *types.StreamingToken // nil means that no since token was supplied
|
since *types.StreamingToken // nil means that no since token was supplied
|
||||||
wantFullState bool
|
wantFullState bool
|
||||||
log *log.Entry
|
log *log.Entry
|
||||||
|
|
@ -69,6 +71,7 @@ func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Dat
|
||||||
since = &tok
|
since = &tok
|
||||||
}
|
}
|
||||||
timelineLimit := DefaultTimelineLimit
|
timelineLimit := DefaultTimelineLimit
|
||||||
|
includeLeave := DefaultIncludeLeave
|
||||||
// TODO: read from stored filters too
|
// TODO: read from stored filters too
|
||||||
filterQuery := req.URL.Query().Get("filter")
|
filterQuery := req.URL.Query().Get("filter")
|
||||||
if filterQuery != "" {
|
if filterQuery != "" {
|
||||||
|
|
@ -88,7 +91,11 @@ func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Dat
|
||||||
}
|
}
|
||||||
f, err := syncDB.GetFilter(req.Context(), localpart, filterQuery)
|
f, err := syncDB.GetFilter(req.Context(), localpart, filterQuery)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
util.GetLogger(req.Context()).Infof("Filter was retrieved")
|
||||||
timelineLimit = f.Room.Timeline.Limit
|
timelineLimit = f.Room.Timeline.Limit
|
||||||
|
includeLeave = f.Room.IncludeLeave
|
||||||
|
} else {
|
||||||
|
util.GetLogger(req.Context()).Infof("Filter could not be retrieved")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -97,6 +104,7 @@ func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Dat
|
||||||
ctx: req.Context(),
|
ctx: req.Context(),
|
||||||
device: device,
|
device: device,
|
||||||
timeout: timeout,
|
timeout: timeout,
|
||||||
|
includeLeave: includeLeave,
|
||||||
since: since,
|
since: since,
|
||||||
wantFullState: wantFullState,
|
wantFullState: wantFullState,
|
||||||
limit: timelineLimit,
|
limit: timelineLimit,
|
||||||
|
|
|
||||||
|
|
@ -255,7 +255,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea
|
||||||
|
|
||||||
// TODO: handle ignored users
|
// TODO: handle ignored users
|
||||||
if req.since.PDUPosition() == 0 && req.since.EDUPosition() == 0 {
|
if req.since.PDUPosition() == 0 && req.since.EDUPosition() == 0 {
|
||||||
res, err = rp.db.CompleteSync(req.ctx, res, req.device, req.limit)
|
res, err = rp.db.CompleteSync(req.ctx, res, req.device, req.limit, req.includeLeave)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return res, fmt.Errorf("rp.db.CompleteSync: %w", err)
|
return res, fmt.Errorf("rp.db.CompleteSync: %w", err)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue