mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-03 12:13:09 -06:00
Make missing tests pass
This commit is contained in:
parent
f0ccdeb109
commit
1e57c01837
|
|
@ -32,13 +32,16 @@ func NewLazyLoadCache() (*LazyLoadCache, error) {
|
||||||
return &LazyLoadCache{cache}, err
|
return &LazyLoadCache{cache}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *LazyLoadCache) StoreLazyLoadedUser(reqUser, deviceID, roomID, userID string) {
|
func (c *LazyLoadCache) StoreLazyLoadedUser(reqUser, deviceID, roomID, userID, eventID string) {
|
||||||
cacheKey := fmt.Sprintf("%s/%s/%s/%s", reqUser, deviceID, roomID, userID)
|
cacheKey := fmt.Sprintf("%s/%s/%s/%s", reqUser, deviceID, roomID, userID)
|
||||||
c.Set(cacheKey, true)
|
c.Set(cacheKey, eventID)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *LazyLoadCache) IsLazyLoadedUserCached(reqUser, deviceID, roomID, userID string) bool {
|
func (c *LazyLoadCache) IsLazyLoadedUserCached(reqUser, deviceID, roomID, userID string) (string, bool) {
|
||||||
cacheKey := fmt.Sprintf("%s/%s/%s/%s", reqUser, deviceID, roomID, userID)
|
cacheKey := fmt.Sprintf("%s/%s/%s/%s", reqUser, deviceID, roomID, userID)
|
||||||
_, ok := c.Get(cacheKey)
|
val, ok := c.Get(cacheKey)
|
||||||
return ok
|
if !ok {
|
||||||
|
return "", ok
|
||||||
|
}
|
||||||
|
return val.(string), ok
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -271,7 +271,11 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return r.From, err
|
return r.From, err
|
||||||
}
|
}
|
||||||
delta.StateEvents, err = p.lazyLoadMembers(ctx, delta.RoomID, true, stateFilter, device, recentEvents, cache, delta.StateEvents)
|
delta.StateEvents, err = p.lazyLoadMembers(
|
||||||
|
ctx, delta.RoomID, true, limited, stateFilter.IncludeRedundantMembers,
|
||||||
|
device, cache,
|
||||||
|
recentEvents, delta.StateEvents,
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return r.From, err
|
return r.From, err
|
||||||
}
|
}
|
||||||
|
|
@ -422,7 +426,11 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
stateEvents, err = p.lazyLoadMembers(ctx, roomID, false, stateFilter, device, recentEvents, cache, stateEvents)
|
stateEvents, err = p.lazyLoadMembers(ctx, roomID,
|
||||||
|
false, limited, stateFilter.IncludeRedundantMembers,
|
||||||
|
device, cache,
|
||||||
|
recentEvents, stateEvents,
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
@ -439,39 +447,40 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *PDUStreamProvider) lazyLoadMembers(
|
func (p *PDUStreamProvider) lazyLoadMembers(
|
||||||
ctx context.Context, roomID string, isIncremental bool,
|
ctx context.Context, roomID string,
|
||||||
stateFilter *gomatrixserverlib.StateFilter, device *userapi.Device,
|
incremental, limited, includeRedundant bool,
|
||||||
timelineEvents []*gomatrixserverlib.HeaderedEvent, cache *caching.LazyLoadCache,
|
device *userapi.Device,
|
||||||
stateEvents []*gomatrixserverlib.HeaderedEvent,
|
cache *caching.LazyLoadCache,
|
||||||
|
timelineEvents, stateEvents []*gomatrixserverlib.HeaderedEvent,
|
||||||
) ([]*gomatrixserverlib.HeaderedEvent, error) {
|
) ([]*gomatrixserverlib.HeaderedEvent, error) {
|
||||||
if len(timelineEvents) == 0 {
|
if len(timelineEvents) == 0 {
|
||||||
return stateEvents, nil
|
return stateEvents, nil
|
||||||
}
|
}
|
||||||
// Work out if we need to include membership events
|
// Work out which memberships to include
|
||||||
timelineUsers := make(map[string]struct{})
|
timelineUsers := make(map[string]struct{})
|
||||||
|
if !incremental {
|
||||||
if !isIncremental {
|
|
||||||
timelineUsers[device.UserID] = struct{}{}
|
timelineUsers[device.UserID] = struct{}{}
|
||||||
}
|
}
|
||||||
// add all users the client doesn't know about yet to a list
|
// Add all users the client doesn't know about yet to a list
|
||||||
for _, event := range timelineEvents {
|
for _, event := range timelineEvents {
|
||||||
if event.Type() == gomatrixserverlib.MRoomMember && event.StateKey() != nil {
|
// Membership is not yet cached, add it to the list
|
||||||
continue
|
if _, ok := cache.IsLazyLoadedUserCached(device.UserID, device.ID, roomID, event.Sender()); !ok {
|
||||||
}
|
|
||||||
// membership is not yet cached, add it to the list
|
|
||||||
if !cache.IsLazyLoadedUserCached(device.UserID, device.ID, roomID, event.Sender()) {
|
|
||||||
timelineUsers[event.Sender()] = struct{}{}
|
timelineUsers[event.Sender()] = struct{}{}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// remove existing membership events we don't care about, e.g. users not in the timeline.events
|
// Remove existing membership events we don't care about, e.g. users not in the timeline.events
|
||||||
newStateEvents := []*gomatrixserverlib.HeaderedEvent{}
|
newStateEvents := []*gomatrixserverlib.HeaderedEvent{}
|
||||||
for _, event := range stateEvents {
|
for _, event := range stateEvents {
|
||||||
if event.Type() == gomatrixserverlib.MRoomMember && event.StateKey() != nil {
|
if event.Type() == gomatrixserverlib.MRoomMember && event.StateKey() != nil {
|
||||||
|
// If this is a gapped incremental sync, we still want this membership
|
||||||
|
isGappedIncremental := limited && incremental
|
||||||
// We want this users membership event, keep it in the list
|
// We want this users membership event, keep it in the list
|
||||||
if _, ok := timelineUsers[event.Sender()]; ok {
|
_, ok := timelineUsers[event.Sender()]
|
||||||
|
wantMembership := ok || isGappedIncremental
|
||||||
|
if wantMembership {
|
||||||
newStateEvents = append(newStateEvents, event)
|
newStateEvents = append(newStateEvents, event)
|
||||||
if !stateFilter.IncludeRedundantMembers {
|
if !includeRedundant {
|
||||||
cache.StoreLazyLoadedUser(device.UserID, device.ID, roomID, event.Sender())
|
cache.StoreLazyLoadedUser(device.UserID, device.ID, roomID, event.Sender(), event.EventID())
|
||||||
}
|
}
|
||||||
delete(timelineUsers, event.Sender())
|
delete(timelineUsers, event.Sender())
|
||||||
}
|
}
|
||||||
|
|
@ -479,7 +488,6 @@ func (p *PDUStreamProvider) lazyLoadMembers(
|
||||||
newStateEvents = append(newStateEvents, event)
|
newStateEvents = append(newStateEvents, event)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
wantUsers := make([]string, 0, len(timelineUsers))
|
wantUsers := make([]string, 0, len(timelineUsers))
|
||||||
for userID := range timelineUsers {
|
for userID := range timelineUsers {
|
||||||
wantUsers = append(wantUsers, userID)
|
wantUsers = append(wantUsers, userID)
|
||||||
|
|
@ -495,12 +503,13 @@ func (p *PDUStreamProvider) lazyLoadMembers(
|
||||||
}
|
}
|
||||||
// cache the membership events
|
// cache the membership events
|
||||||
for _, membership := range memberships {
|
for _, membership := range memberships {
|
||||||
cache.StoreLazyLoadedUser(device.UserID, device.ID, roomID, membership.Sender())
|
cache.StoreLazyLoadedUser(device.UserID, device.ID, roomID, membership.Sender(), membership.EventID())
|
||||||
}
|
}
|
||||||
stateEvents = append(newStateEvents, memberships...)
|
stateEvents = append(newStateEvents, memberships...)
|
||||||
return stateEvents, nil
|
return stateEvents, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// getLazyLoadCache gets/creates a lazy load cache for a given device.
|
||||||
func (p *PDUStreamProvider) getLazyLoadCache(device *userapi.Device) (*caching.LazyLoadCache, error) {
|
func (p *PDUStreamProvider) getLazyLoadCache(device *userapi.Device) (*caching.LazyLoadCache, error) {
|
||||||
var err error
|
var err error
|
||||||
cacheKey := device.UserID + device.ID
|
cacheKey := device.UserID + device.ID
|
||||||
|
|
|
||||||
|
|
@ -703,4 +703,8 @@ The only membership state included in an initial sync is for all the senders in
|
||||||
The only membership state included in an incremental sync is for senders in the timeline
|
The only membership state included in an incremental sync is for senders in the timeline
|
||||||
Old members are included in gappy incr LL sync if they start speaking
|
Old members are included in gappy incr LL sync if they start speaking
|
||||||
We do send redundant membership state across incremental syncs if asked
|
We do send redundant membership state across incremental syncs if asked
|
||||||
Rejecting invite over federation doesn't break incremental /sync
|
Rejecting invite over federation doesn't break incremental /sync
|
||||||
|
Gapped incremental syncs include all state changes
|
||||||
|
Old leaves are present in gapped incremental syncs
|
||||||
|
Leaves are present in non-gapped incremental syncs
|
||||||
|
Members from the gap are included in gappy incr LL sync
|
||||||
Loading…
Reference in a new issue