Fix import cycle, get tests passing and binary compiling

This commit is contained in:
Kegan Dougal 2020-07-30 10:33:10 +01:00
parent fee53940ae
commit d8dd84f7b3
6 changed files with 284 additions and 254 deletions

View file

@ -29,7 +29,9 @@ func main() {
rsAPI := base.RoomserverHTTPClient() rsAPI := base.RoomserverHTTPClient()
syncapi.AddPublicRoutes(base.PublicAPIMux, base.KafkaConsumer, userAPI, rsAPI, federation, cfg) syncapi.AddPublicRoutes(
base.PublicAPIMux, base.KafkaConsumer, userAPI, rsAPI, base.KeyServerHTTPClient(), base.CurrentStateAPIClient(),
federation, cfg)
base.SetupAndServeHTTP(string(base.Cfg.Bind.SyncAPI), string(base.Cfg.Listen.SyncAPI)) base.SetupAndServeHTTP(string(base.Cfg.Bind.SyncAPI), string(base.Cfg.Listen.SyncAPI))

View file

@ -23,16 +23,14 @@ import (
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api" currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/keyserver/api"
syncinternal "github.com/matrix-org/dendrite/syncapi/internal"
"github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/storage"
syncapi "github.com/matrix-org/dendrite/syncapi/sync" syncapi "github.com/matrix-org/dendrite/syncapi/sync"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
const deviceListLogName = "dl"
// OutputKeyChangeEventConsumer consumes events that originated in the key server. // OutputKeyChangeEventConsumer consumes events that originated in the key server.
type OutputKeyChangeEventConsumer struct { type OutputKeyChangeEventConsumer struct {
keyChangeConsumer *internal.ContinualConsumer keyChangeConsumer *internal.ContinualConsumer
@ -117,7 +115,7 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er
} }
// TODO: f.e queryRes.UserIDsToCount : notify users by waking up streams // TODO: f.e queryRes.UserIDsToCount : notify users by waking up streams
posUpdate := types.NewStreamToken(0, 0, map[string]*types.LogPosition{ posUpdate := types.NewStreamToken(0, 0, map[string]*types.LogPosition{
deviceListLogName: &types.LogPosition{ syncinternal.DeviceListLogName: &types.LogPosition{
Offset: msg.Offset, Offset: msg.Offset,
Partition: msg.Partition, Partition: msg.Partition,
}, },
@ -128,72 +126,10 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er
return nil return nil
} }
// Catchup fills in the given response for the given user ID to bring it up-to-date with device lists. hasNew=true if the response
// was filled in, else false if there are no new device list changes because there is nothing to catch up on. The response MUST
// be already filled in with join/leave information.
func (s *OutputKeyChangeEventConsumer) Catchup(
ctx context.Context, userID string, res *types.Response, tok types.StreamingToken,
) (newTok *types.StreamingToken, hasNew bool, err error) {
// Track users who we didn't track before but now do by virtue of sharing a room with them, or not.
newlyJoinedRooms := joinedRooms(res, userID)
newlyLeftRooms := leftRooms(res)
if len(newlyJoinedRooms) > 0 || len(newlyLeftRooms) > 0 {
changed, left, err := s.trackChangedUsers(ctx, userID, newlyJoinedRooms, newlyLeftRooms)
if err != nil {
return nil, false, err
}
res.DeviceLists.Changed = changed
res.DeviceLists.Left = left
hasNew = len(changed) > 0 || len(left) > 0
}
// now also track users who we already share rooms with but who have updated their devices between the two tokens
var partition int32
var offset int64
// Extract partition/offset from sync token
// TODO: In a world where keyserver is sharded there will be multiple partitions and hence multiple QueryKeyChanges to make.
logOffset := tok.Log(deviceListLogName)
if logOffset != nil {
partition = logOffset.Partition
offset = logOffset.Offset
} else {
partition = -1
offset = sarama.OffsetOldest
}
var queryRes api.QueryKeyChangesResponse
s.keyAPI.QueryKeyChanges(ctx, &api.QueryKeyChangesRequest{
Partition: partition,
Offset: offset,
}, &queryRes)
if queryRes.Error != nil {
// don't fail the catchup because we may have got useful information by tracking membership
util.GetLogger(ctx).WithError(queryRes.Error).Error("QueryKeyChanges failed")
return
}
userSet := make(map[string]bool)
for _, userID := range res.DeviceLists.Changed {
userSet[userID] = true
}
for _, userID := range queryRes.UserIDs {
if !userSet[userID] {
res.DeviceLists.Changed = append(res.DeviceLists.Changed, userID)
hasNew = true
}
}
// Make a new streaming token using the new offset
tok.SetLog(deviceListLogName, &types.LogPosition{
Offset: queryRes.Offset,
Partition: queryRes.Partition,
})
newTok = &tok
return
}
func (s *OutputKeyChangeEventConsumer) OnJoinEvent(ev *gomatrixserverlib.HeaderedEvent) { func (s *OutputKeyChangeEventConsumer) OnJoinEvent(ev *gomatrixserverlib.HeaderedEvent) {
// work out who we are now sharing rooms with which we previously were not and notify them about the joining // work out who we are now sharing rooms with which we previously were not and notify them about the joining
// users keys: // users keys:
changed, _, err := s.trackChangedUsers(context.Background(), *ev.StateKey(), []string{ev.RoomID()}, nil) changed, _, err := syncinternal.TrackChangedUsers(context.Background(), s.currentStateAPI, *ev.StateKey(), []string{ev.RoomID()}, nil)
if err != nil { if err != nil {
log.WithError(err).Error("OnJoinEvent: failed to work out changed users") log.WithError(err).Error("OnJoinEvent: failed to work out changed users")
return return
@ -206,7 +142,7 @@ func (s *OutputKeyChangeEventConsumer) OnJoinEvent(ev *gomatrixserverlib.Headere
func (s *OutputKeyChangeEventConsumer) OnLeaveEvent(ev *gomatrixserverlib.HeaderedEvent) { func (s *OutputKeyChangeEventConsumer) OnLeaveEvent(ev *gomatrixserverlib.HeaderedEvent) {
// work out who we are no longer sharing any rooms with and notify them about the leaving user // work out who we are no longer sharing any rooms with and notify them about the leaving user
_, left, err := s.trackChangedUsers(context.Background(), *ev.StateKey(), nil, []string{ev.RoomID()}) _, left, err := syncinternal.TrackChangedUsers(context.Background(), s.currentStateAPI, *ev.StateKey(), nil, []string{ev.RoomID()})
if err != nil { if err != nil {
log.WithError(err).Error("OnLeaveEvent: failed to work out left users") log.WithError(err).Error("OnLeaveEvent: failed to work out left users")
return return
@ -217,129 +153,3 @@ func (s *OutputKeyChangeEventConsumer) OnLeaveEvent(ev *gomatrixserverlib.Header
} }
} }
// nolint:gocyclo
func (s *OutputKeyChangeEventConsumer) trackChangedUsers(
ctx context.Context, userID string, newlyJoinedRooms, newlyLeftRooms []string,
) (changed, left []string, err error) {
// process leaves first, then joins afterwards so if we join/leave/join/leave we err on the side of including users.
// Leave algorithm:
// - Get set of users and number of times they appear in rooms prior to leave. - QuerySharedUsersRequest with 'IncludeRoomID'.
// - Get users in newly left room. - QueryCurrentState
// - Loop set of users and decrement by 1 for each user in newly left room.
// - If count=0 then they share no more rooms so inform BOTH parties of this via 'left'=[...] in /sync.
var queryRes currentstateAPI.QuerySharedUsersResponse
err = s.currentStateAPI.QuerySharedUsers(ctx, &currentstateAPI.QuerySharedUsersRequest{
UserID: userID,
IncludeRoomIDs: newlyLeftRooms,
}, &queryRes)
if err != nil {
return nil, nil, err
}
var stateRes currentstateAPI.QueryBulkStateContentResponse
err = s.currentStateAPI.QueryBulkStateContent(ctx, &currentstateAPI.QueryBulkStateContentRequest{
RoomIDs: newlyLeftRooms,
StateTuples: []gomatrixserverlib.StateKeyTuple{
{
EventType: gomatrixserverlib.MRoomMember,
StateKey: "*",
},
},
AllowWildcards: true,
}, &stateRes)
if err != nil {
return nil, nil, err
}
for _, state := range stateRes.Rooms {
for tuple, membership := range state {
if membership != gomatrixserverlib.Join {
continue
}
queryRes.UserIDsToCount[tuple.StateKey]--
}
}
for userID, count := range queryRes.UserIDsToCount {
if count <= 0 {
left = append(left, userID) // left is returned
}
}
// Join algorithm:
// - Get the set of all joined users prior to joining room - QuerySharedUsersRequest with 'ExcludeRoomID'.
// - Get users in newly joined room - QueryCurrentState
// - Loop set of users in newly joined room, do they appear in the set of users prior to joining?
// - If yes: then they already shared a room in common, do nothing.
// - If no: then they are a brand new user so inform BOTH parties of this via 'changed=[...]'
err = s.currentStateAPI.QuerySharedUsers(ctx, &currentstateAPI.QuerySharedUsersRequest{
UserID: userID,
ExcludeRoomIDs: newlyJoinedRooms,
}, &queryRes)
if err != nil {
return nil, left, err
}
err = s.currentStateAPI.QueryBulkStateContent(ctx, &currentstateAPI.QueryBulkStateContentRequest{
RoomIDs: newlyJoinedRooms,
StateTuples: []gomatrixserverlib.StateKeyTuple{
{
EventType: gomatrixserverlib.MRoomMember,
StateKey: "*",
},
},
AllowWildcards: true,
}, &stateRes)
if err != nil {
return nil, left, err
}
for _, state := range stateRes.Rooms {
for tuple, membership := range state {
if membership != gomatrixserverlib.Join {
continue
}
// new user who we weren't previously sharing rooms with
if _, ok := queryRes.UserIDsToCount[tuple.StateKey]; !ok {
changed = append(changed, tuple.StateKey) // changed is returned
}
}
}
return changed, left, nil
}
func joinedRooms(res *types.Response, userID string) []string {
var roomIDs []string
for roomID, join := range res.Rooms.Join {
// we would expect to see our join event somewhere if we newly joined the room.
// Normal events get put in the join section so it's not enough to know the room ID is present in 'join'.
newlyJoined := membershipEventPresent(join.State.Events, userID)
if newlyJoined {
roomIDs = append(roomIDs, roomID)
continue
}
newlyJoined = membershipEventPresent(join.Timeline.Events, userID)
if newlyJoined {
roomIDs = append(roomIDs, roomID)
}
}
return roomIDs
}
func leftRooms(res *types.Response) []string {
roomIDs := make([]string, len(res.Rooms.Leave))
i := 0
for roomID := range res.Rooms.Leave {
roomIDs[i] = roomID
i++
}
return roomIDs
}
func membershipEventPresent(events []gomatrixserverlib.ClientEvent, userID string) bool {
for _, ev := range events {
// it's enough to know that we have our member event here, don't need to check membership content
// as it's implied by being in the respective section of the sync response.
if ev.Type == gomatrixserverlib.MRoomMember && ev.StateKey != nil && *ev.StateKey == userID {
return true
}
}
return false
}

View file

@ -0,0 +1,219 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package internal
import (
"context"
"github.com/Shopify/sarama"
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
"github.com/matrix-org/dendrite/keyserver/api"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
)
const DeviceListLogName = "dl"
// DeviceListCatchup fills in the given response for the given user ID to bring it up-to-date with device lists. hasNew=true if the response
// was filled in, else false if there are no new device list changes because there is nothing to catch up on. The response MUST
// be already filled in with join/leave information.
func DeviceListCatchup(
ctx context.Context, keyAPI keyapi.KeyInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI,
userID string, res *types.Response, tok types.StreamingToken,
) (newTok *types.StreamingToken, hasNew bool, err error) {
// Track users who we didn't track before but now do by virtue of sharing a room with them, or not.
newlyJoinedRooms := joinedRooms(res, userID)
newlyLeftRooms := leftRooms(res)
if len(newlyJoinedRooms) > 0 || len(newlyLeftRooms) > 0 {
changed, left, err := TrackChangedUsers(ctx, stateAPI, userID, newlyJoinedRooms, newlyLeftRooms)
if err != nil {
return nil, false, err
}
res.DeviceLists.Changed = changed
res.DeviceLists.Left = left
hasNew = len(changed) > 0 || len(left) > 0
}
// now also track users who we already share rooms with but who have updated their devices between the two tokens
var partition int32
var offset int64
// Extract partition/offset from sync token
// TODO: In a world where keyserver is sharded there will be multiple partitions and hence multiple QueryKeyChanges to make.
logOffset := tok.Log(DeviceListLogName)
if logOffset != nil {
partition = logOffset.Partition
offset = logOffset.Offset
} else {
partition = -1
offset = sarama.OffsetOldest
}
var queryRes api.QueryKeyChangesResponse
keyAPI.QueryKeyChanges(ctx, &api.QueryKeyChangesRequest{
Partition: partition,
Offset: offset,
}, &queryRes)
if queryRes.Error != nil {
// don't fail the catchup because we may have got useful information by tracking membership
util.GetLogger(ctx).WithError(queryRes.Error).Error("QueryKeyChanges failed")
return
}
userSet := make(map[string]bool)
for _, userID := range res.DeviceLists.Changed {
userSet[userID] = true
}
for _, userID := range queryRes.UserIDs {
if !userSet[userID] {
res.DeviceLists.Changed = append(res.DeviceLists.Changed, userID)
hasNew = true
}
}
// Make a new streaming token using the new offset
tok.SetLog(DeviceListLogName, &types.LogPosition{
Offset: queryRes.Offset,
Partition: queryRes.Partition,
})
newTok = &tok
return
}
// TrackChangedUsers calculates the values of device_lists.changed|left in the /sync response.
// nolint:gocyclo
func TrackChangedUsers(
ctx context.Context, stateAPI currentstateAPI.CurrentStateInternalAPI, userID string, newlyJoinedRooms, newlyLeftRooms []string,
) (changed, left []string, err error) {
// process leaves first, then joins afterwards so if we join/leave/join/leave we err on the side of including users.
// Leave algorithm:
// - Get set of users and number of times they appear in rooms prior to leave. - QuerySharedUsersRequest with 'IncludeRoomID'.
// - Get users in newly left room. - QueryCurrentState
// - Loop set of users and decrement by 1 for each user in newly left room.
// - If count=0 then they share no more rooms so inform BOTH parties of this via 'left'=[...] in /sync.
var queryRes currentstateAPI.QuerySharedUsersResponse
err = stateAPI.QuerySharedUsers(ctx, &currentstateAPI.QuerySharedUsersRequest{
UserID: userID,
IncludeRoomIDs: newlyLeftRooms,
}, &queryRes)
if err != nil {
return nil, nil, err
}
var stateRes currentstateAPI.QueryBulkStateContentResponse
err = stateAPI.QueryBulkStateContent(ctx, &currentstateAPI.QueryBulkStateContentRequest{
RoomIDs: newlyLeftRooms,
StateTuples: []gomatrixserverlib.StateKeyTuple{
{
EventType: gomatrixserverlib.MRoomMember,
StateKey: "*",
},
},
AllowWildcards: true,
}, &stateRes)
if err != nil {
return nil, nil, err
}
for _, state := range stateRes.Rooms {
for tuple, membership := range state {
if membership != gomatrixserverlib.Join {
continue
}
queryRes.UserIDsToCount[tuple.StateKey]--
}
}
for userID, count := range queryRes.UserIDsToCount {
if count <= 0 {
left = append(left, userID) // left is returned
}
}
// Join algorithm:
// - Get the set of all joined users prior to joining room - QuerySharedUsersRequest with 'ExcludeRoomID'.
// - Get users in newly joined room - QueryCurrentState
// - Loop set of users in newly joined room, do they appear in the set of users prior to joining?
// - If yes: then they already shared a room in common, do nothing.
// - If no: then they are a brand new user so inform BOTH parties of this via 'changed=[...]'
err = stateAPI.QuerySharedUsers(ctx, &currentstateAPI.QuerySharedUsersRequest{
UserID: userID,
ExcludeRoomIDs: newlyJoinedRooms,
}, &queryRes)
if err != nil {
return nil, left, err
}
err = stateAPI.QueryBulkStateContent(ctx, &currentstateAPI.QueryBulkStateContentRequest{
RoomIDs: newlyJoinedRooms,
StateTuples: []gomatrixserverlib.StateKeyTuple{
{
EventType: gomatrixserverlib.MRoomMember,
StateKey: "*",
},
},
AllowWildcards: true,
}, &stateRes)
if err != nil {
return nil, left, err
}
for _, state := range stateRes.Rooms {
for tuple, membership := range state {
if membership != gomatrixserverlib.Join {
continue
}
// new user who we weren't previously sharing rooms with
if _, ok := queryRes.UserIDsToCount[tuple.StateKey]; !ok {
changed = append(changed, tuple.StateKey) // changed is returned
}
}
}
return changed, left, nil
}
func joinedRooms(res *types.Response, userID string) []string {
var roomIDs []string
for roomID, join := range res.Rooms.Join {
// we would expect to see our join event somewhere if we newly joined the room.
// Normal events get put in the join section so it's not enough to know the room ID is present in 'join'.
newlyJoined := membershipEventPresent(join.State.Events, userID)
if newlyJoined {
roomIDs = append(roomIDs, roomID)
continue
}
newlyJoined = membershipEventPresent(join.Timeline.Events, userID)
if newlyJoined {
roomIDs = append(roomIDs, roomID)
}
}
return roomIDs
}
func leftRooms(res *types.Response) []string {
roomIDs := make([]string, len(res.Rooms.Leave))
i := 0
for roomID := range res.Rooms.Leave {
roomIDs[i] = roomID
i++
}
return roomIDs
}
func membershipEventPresent(events []gomatrixserverlib.ClientEvent, userID string) bool {
for _, ev := range events {
// it's enough to know that we have our member event here, don't need to check membership content
// as it's implied by being in the respective section of the sync response.
if ev.Type == gomatrixserverlib.MRoomMember && ev.StateKey != nil && *ev.StateKey == userID {
return true
}
}
return false
}

View file

@ -1,4 +1,4 @@
package consumers package internal
import ( import (
"context" "context"
@ -159,18 +159,17 @@ func leaveResponseWithRooms(syncResponse *types.Response, userID string, roomIDs
func TestKeyChangeCatchupOnJoinShareNewUser(t *testing.T) { func TestKeyChangeCatchupOnJoinShareNewUser(t *testing.T) {
newShareUser := "@bill:localhost" newShareUser := "@bill:localhost"
newlyJoinedRoom := "!TestKeyChangeCatchupOnJoinShareNewUser:bar" newlyJoinedRoom := "!TestKeyChangeCatchupOnJoinShareNewUser:bar"
consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{ syncResponse := types.NewResponse()
syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom})
_, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
roomIDToJoinedMembers: map[string][]string{ roomIDToJoinedMembers: map[string][]string{
newlyJoinedRoom: {syncingUser, newShareUser}, newlyJoinedRoom: {syncingUser, newShareUser},
"!another:room": {syncingUser}, "!another:room": {syncingUser},
}, },
}, nil) }, syncingUser, syncResponse, emptyToken)
syncResponse := types.NewResponse()
syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom})
_, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken)
if err != nil { if err != nil {
t.Fatalf("Catchup returned an error: %s", err) t.Fatalf("DeviceListCatchup returned an error: %s", err)
} }
assertCatchup(t, hasNew, syncResponse, wantCatchup{ assertCatchup(t, hasNew, syncResponse, wantCatchup{
hasNew: true, hasNew: true,
@ -182,18 +181,17 @@ func TestKeyChangeCatchupOnJoinShareNewUser(t *testing.T) {
func TestKeyChangeCatchupOnLeaveShareLeftUser(t *testing.T) { func TestKeyChangeCatchupOnLeaveShareLeftUser(t *testing.T) {
removeUser := "@bill:localhost" removeUser := "@bill:localhost"
newlyLeftRoom := "!TestKeyChangeCatchupOnLeaveShareLeftUser:bar" newlyLeftRoom := "!TestKeyChangeCatchupOnLeaveShareLeftUser:bar"
consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{ syncResponse := types.NewResponse()
syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom})
_, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
roomIDToJoinedMembers: map[string][]string{ roomIDToJoinedMembers: map[string][]string{
newlyLeftRoom: {removeUser}, newlyLeftRoom: {removeUser},
"!another:room": {syncingUser}, "!another:room": {syncingUser},
}, },
}, nil) }, syncingUser, syncResponse, emptyToken)
syncResponse := types.NewResponse()
syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom})
_, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken)
if err != nil { if err != nil {
t.Fatalf("Catchup returned an error: %s", err) t.Fatalf("DeviceListCatchup returned an error: %s", err)
} }
assertCatchup(t, hasNew, syncResponse, wantCatchup{ assertCatchup(t, hasNew, syncResponse, wantCatchup{
hasNew: true, hasNew: true,
@ -205,16 +203,15 @@ func TestKeyChangeCatchupOnLeaveShareLeftUser(t *testing.T) {
func TestKeyChangeCatchupOnJoinShareNoNewUsers(t *testing.T) { func TestKeyChangeCatchupOnJoinShareNoNewUsers(t *testing.T) {
existingUser := "@bob:localhost" existingUser := "@bob:localhost"
newlyJoinedRoom := "!TestKeyChangeCatchupOnJoinShareNoNewUsers:bar" newlyJoinedRoom := "!TestKeyChangeCatchupOnJoinShareNoNewUsers:bar"
consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{ syncResponse := types.NewResponse()
syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom})
_, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
roomIDToJoinedMembers: map[string][]string{ roomIDToJoinedMembers: map[string][]string{
newlyJoinedRoom: {syncingUser, existingUser}, newlyJoinedRoom: {syncingUser, existingUser},
"!another:room": {syncingUser, existingUser}, "!another:room": {syncingUser, existingUser},
}, },
}, nil) }, syncingUser, syncResponse, emptyToken)
syncResponse := types.NewResponse()
syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom})
_, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken)
if err != nil { if err != nil {
t.Fatalf("Catchup returned an error: %s", err) t.Fatalf("Catchup returned an error: %s", err)
} }
@ -227,18 +224,17 @@ func TestKeyChangeCatchupOnJoinShareNoNewUsers(t *testing.T) {
func TestKeyChangeCatchupOnLeaveShareNoUsers(t *testing.T) { func TestKeyChangeCatchupOnLeaveShareNoUsers(t *testing.T) {
existingUser := "@bob:localhost" existingUser := "@bob:localhost"
newlyLeftRoom := "!TestKeyChangeCatchupOnLeaveShareNoUsers:bar" newlyLeftRoom := "!TestKeyChangeCatchupOnLeaveShareNoUsers:bar"
consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{ syncResponse := types.NewResponse()
syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom})
_, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
roomIDToJoinedMembers: map[string][]string{ roomIDToJoinedMembers: map[string][]string{
newlyLeftRoom: {existingUser}, newlyLeftRoom: {existingUser},
"!another:room": {syncingUser, existingUser}, "!another:room": {syncingUser, existingUser},
}, },
}, nil) }, syncingUser, syncResponse, emptyToken)
syncResponse := types.NewResponse()
syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom})
_, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken)
if err != nil { if err != nil {
t.Fatalf("Catchup returned an error: %s", err) t.Fatalf("DeviceListCatchup returned an error: %s", err)
} }
assertCatchup(t, hasNew, syncResponse, wantCatchup{ assertCatchup(t, hasNew, syncResponse, wantCatchup{
hasNew: false, hasNew: false,
@ -249,11 +245,6 @@ func TestKeyChangeCatchupOnLeaveShareNoUsers(t *testing.T) {
func TestKeyChangeCatchupNoNewJoinsButMessages(t *testing.T) { func TestKeyChangeCatchupNoNewJoinsButMessages(t *testing.T) {
existingUser := "@bob1:localhost" existingUser := "@bob1:localhost"
roomID := "!TestKeyChangeCatchupNoNewJoinsButMessages:bar" roomID := "!TestKeyChangeCatchupNoNewJoinsButMessages:bar"
consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{
roomIDToJoinedMembers: map[string][]string{
roomID: {syncingUser, existingUser},
},
}, nil)
syncResponse := types.NewResponse() syncResponse := types.NewResponse()
empty := "" empty := ""
roomStateEvents := []gomatrixserverlib.ClientEvent{ roomStateEvents := []gomatrixserverlib.ClientEvent{
@ -295,9 +286,13 @@ func TestKeyChangeCatchupNoNewJoinsButMessages(t *testing.T) {
jr.Timeline.Events = roomTimelineEvents jr.Timeline.Events = roomTimelineEvents
syncResponse.Rooms.Join[roomID] = jr syncResponse.Rooms.Join[roomID] = jr
_, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken) _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
roomIDToJoinedMembers: map[string][]string{
roomID: {syncingUser, existingUser},
},
}, syncingUser, syncResponse, emptyToken)
if err != nil { if err != nil {
t.Fatalf("Catchup returned an error: %s", err) t.Fatalf("DeviceListCatchup returned an error: %s", err)
} }
assertCatchup(t, hasNew, syncResponse, wantCatchup{ assertCatchup(t, hasNew, syncResponse, wantCatchup{
hasNew: false, hasNew: false,
@ -312,18 +307,17 @@ func TestKeyChangeCatchupChangeAndLeft(t *testing.T) {
newlyLeftUser2 := "@debra:localhost" newlyLeftUser2 := "@debra:localhost"
newlyJoinedRoom := "!join:bar" newlyJoinedRoom := "!join:bar"
newlyLeftRoom := "!left:bar" newlyLeftRoom := "!left:bar"
consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{ syncResponse := types.NewResponse()
syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom})
syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom})
_, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
roomIDToJoinedMembers: map[string][]string{ roomIDToJoinedMembers: map[string][]string{
newlyJoinedRoom: {syncingUser, newShareUser, newShareUser2}, newlyJoinedRoom: {syncingUser, newShareUser, newShareUser2},
newlyLeftRoom: {newlyLeftUser, newlyLeftUser2}, newlyLeftRoom: {newlyLeftUser, newlyLeftUser2},
"!another:room": {syncingUser}, "!another:room": {syncingUser},
}, },
}, nil) }, syncingUser, syncResponse, emptyToken)
syncResponse := types.NewResponse()
syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom})
syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom})
_, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken)
if err != nil { if err != nil {
t.Fatalf("Catchup returned an error: %s", err) t.Fatalf("Catchup returned an error: %s", err)
} }
@ -348,12 +342,6 @@ func TestKeyChangeCatchupChangeAndLeftSameRoom(t *testing.T) {
newShareUser := "@berta:localhost" newShareUser := "@berta:localhost"
newShareUser2 := "@bobby:localhost" newShareUser2 := "@bobby:localhost"
roomID := "!join:bar" roomID := "!join:bar"
consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{
roomIDToJoinedMembers: map[string][]string{
roomID: {newShareUser, newShareUser2},
"!another:room": {syncingUser},
},
}, nil)
syncResponse := types.NewResponse() syncResponse := types.NewResponse()
roomEvents := []gomatrixserverlib.ClientEvent{ roomEvents := []gomatrixserverlib.ClientEvent{
{ {
@ -408,9 +396,14 @@ func TestKeyChangeCatchupChangeAndLeftSameRoom(t *testing.T) {
lr.Timeline.Events = roomEvents lr.Timeline.Events = roomEvents
syncResponse.Rooms.Leave[roomID] = lr syncResponse.Rooms.Leave[roomID] = lr
_, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken) _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
roomIDToJoinedMembers: map[string][]string{
roomID: {newShareUser, newShareUser2},
"!another:room": {syncingUser},
},
}, syncingUser, syncResponse, emptyToken)
if err != nil { if err != nil {
t.Fatalf("Catchup returned an error: %s", err) t.Fatalf("DeviceListCatchup returned an error: %s", err)
} }
assertCatchup(t, hasNew, syncResponse, wantCatchup{ assertCatchup(t, hasNew, syncResponse, wantCatchup{
hasNew: true, hasNew: true,

View file

@ -22,7 +22,9 @@ import (
"time" "time"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/syncapi/consumers" currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/syncapi/internal"
"github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api" userapi "github.com/matrix-org/dendrite/userapi/api"
@ -33,15 +35,19 @@ import (
// RequestPool manages HTTP long-poll connections for /sync // RequestPool manages HTTP long-poll connections for /sync
type RequestPool struct { type RequestPool struct {
db storage.Database db storage.Database
userAPI userapi.UserInternalAPI userAPI userapi.UserInternalAPI
notifier *Notifier notifier *Notifier
keyChanges *consumers.OutputKeyChangeEventConsumer keyAPI keyapi.KeyInternalAPI
stateAPI currentstateAPI.CurrentStateInternalAPI
} }
// NewRequestPool makes a new RequestPool // NewRequestPool makes a new RequestPool
func NewRequestPool(db storage.Database, n *Notifier, userAPI userapi.UserInternalAPI) *RequestPool { func NewRequestPool(
return &RequestPool{db, userAPI, n} db storage.Database, n *Notifier, userAPI userapi.UserInternalAPI, keyAPI keyapi.KeyInternalAPI,
stateAPI currentstateAPI.CurrentStateInternalAPI,
) *RequestPool {
return &RequestPool{db, userAPI, n, keyAPI, stateAPI}
} }
// OnIncomingSyncRequest is called when a client makes a /sync request. This function MUST be // OnIncomingSyncRequest is called when a client makes a /sync request. This function MUST be
@ -206,7 +212,7 @@ func (rp *RequestPool) appendDeviceLists(
// QueryKeyChanges API only exposes a "from" value (on purpose to avoid racing, which then // QueryKeyChanges API only exposes a "from" value (on purpose to avoid racing, which then
// returns the latest position with which the response has authority on). We'd need to tweak // returns the latest position with which the response has authority on). We'd need to tweak
// the API to expose a "to" value to fix this. // the API to expose a "to" value to fix this.
_, _, err := rp.keyChanges.Catchup(context.Background(), userID, data, since) _, _, err := internal.DeviceListCatchup(context.Background(), rp.keyAPI, rp.stateAPI, userID, data, since)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -62,7 +62,7 @@ func AddPublicRoutes(
logrus.WithError(err).Panicf("failed to start notifier") logrus.WithError(err).Panicf("failed to start notifier")
} }
requestPool := sync.NewRequestPool(syncDB, notifier, userAPI) requestPool := sync.NewRequestPool(syncDB, notifier, userAPI, keyAPI, currentStateAPI)
roomConsumer := consumers.NewOutputRoomEventConsumer( roomConsumer := consumers.NewOutputRoomEventConsumer(
cfg, consumer, notifier, syncDB, rsAPI, cfg, consumer, notifier, syncDB, rsAPI,