Hook up device list updates to the sync notifier (#1231)

* WIP hooking up key changes

* Fix import cycle, get tests passing and binary compiling

* Linting and update whitelist
This commit is contained in:
Kegsay 2020-07-30 11:15:46 +01:00 committed by GitHub
parent 0fdd4f14d1
commit 9355fb5ac8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 356 additions and 230 deletions

View file

@ -29,7 +29,9 @@ func main() {
rsAPI := base.RoomserverHTTPClient() rsAPI := base.RoomserverHTTPClient()
syncapi.AddPublicRoutes(base.PublicAPIMux, base.KafkaConsumer, userAPI, rsAPI, federation, cfg) syncapi.AddPublicRoutes(
base.PublicAPIMux, base.KafkaConsumer, userAPI, rsAPI, base.KeyServerHTTPClient(), base.CurrentStateAPIClient(),
federation, cfg)
base.SetupAndServeHTTP(string(base.Cfg.Bind.SyncAPI), string(base.Cfg.Listen.SyncAPI)) base.SetupAndServeHTTP(string(base.Cfg.Bind.SyncAPI), string(base.Cfg.Listen.SyncAPI))

View file

@ -77,6 +77,7 @@ func (m *Monolith) AddAllPublicRoutes(publicMux *mux.Router) {
) )
mediaapi.AddPublicRoutes(publicMux, m.Config, m.UserAPI, m.Client) mediaapi.AddPublicRoutes(publicMux, m.Config, m.UserAPI, m.Client)
syncapi.AddPublicRoutes( syncapi.AddPublicRoutes(
publicMux, m.KafkaConsumer, m.UserAPI, m.RoomserverAPI, m.FedClient, m.Config, publicMux, m.KafkaConsumer, m.UserAPI, m.RoomserverAPI,
m.KeyAPI, m.StateAPI, m.FedClient, m.Config,
) )
} }

View file

@ -143,6 +143,8 @@ type QueryKeyChangesRequest struct {
type QueryKeyChangesResponse struct { type QueryKeyChangesResponse struct {
// The set of users who have had their keys change. // The set of users who have had their keys change.
UserIDs []string UserIDs []string
// The partition being served - useful if the partition is unknown at request time
Partition int32
// The latest offset represented in this response. // The latest offset represented in this response.
Offset int64 Offset int64
// Set if there was a problem handling the request. // Set if there was a problem handling the request.

View file

@ -41,6 +41,9 @@ type KeyInternalAPI struct {
} }
func (a *KeyInternalAPI) QueryKeyChanges(ctx context.Context, req *api.QueryKeyChangesRequest, res *api.QueryKeyChangesResponse) { func (a *KeyInternalAPI) QueryKeyChanges(ctx context.Context, req *api.QueryKeyChangesRequest, res *api.QueryKeyChangesResponse) {
if req.Partition < 0 {
req.Partition = a.Producer.DefaultPartition()
}
userIDs, latest, err := a.DB.KeyChanges(ctx, req.Partition, req.Offset) userIDs, latest, err := a.DB.KeyChanges(ctx, req.Partition, req.Offset)
if err != nil { if err != nil {
res.Error = &api.KeyError{ res.Error = &api.KeyError{
@ -48,6 +51,7 @@ func (a *KeyInternalAPI) QueryKeyChanges(ctx context.Context, req *api.QueryKeyC
} }
} }
res.Offset = latest res.Offset = latest
res.Partition = req.Partition
res.UserIDs = userIDs res.UserIDs = userIDs
} }

View file

@ -31,6 +31,15 @@ type KeyChange struct {
DB storage.Database DB storage.Database
} }
// DefaultPartition returns the default partition this process is sending key changes to.
// NB: A keyserver MUST send key changes to only 1 partition or else query operations will
// become inconsistent. Partitions can be sharded (e.g by hash of user ID of key change) but
// then all keyservers must be queried to calculate the entire set of key changes between
// two sync tokens.
func (p *KeyChange) DefaultPartition() int32 {
return 0
}
// ProduceKeyChanges creates new change events for each key // ProduceKeyChanges creates new change events for each key
func (p *KeyChange) ProduceKeyChanges(keys []api.DeviceKeys) error { func (p *KeyChange) ProduceKeyChanges(keys []api.DeviceKeys) error {
for _, key := range keys { for _, key := range keys {

View file

@ -23,10 +23,11 @@ import (
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api" currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/keyserver/api"
syncinternal "github.com/matrix-org/dendrite/syncapi/internal"
"github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/storage"
syncapi "github.com/matrix-org/dendrite/syncapi/sync"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
@ -39,6 +40,7 @@ type OutputKeyChangeEventConsumer struct {
keyAPI api.KeyInternalAPI keyAPI api.KeyInternalAPI
partitionToOffset map[int32]int64 partitionToOffset map[int32]int64
partitionToOffsetMu sync.Mutex partitionToOffsetMu sync.Mutex
notifier *syncapi.Notifier
} }
// NewOutputKeyChangeEventConsumer creates a new OutputKeyChangeEventConsumer. // NewOutputKeyChangeEventConsumer creates a new OutputKeyChangeEventConsumer.
@ -47,6 +49,7 @@ func NewOutputKeyChangeEventConsumer(
serverName gomatrixserverlib.ServerName, serverName gomatrixserverlib.ServerName,
topic string, topic string,
kafkaConsumer sarama.Consumer, kafkaConsumer sarama.Consumer,
n *syncapi.Notifier,
keyAPI api.KeyInternalAPI, keyAPI api.KeyInternalAPI,
currentStateAPI currentstateAPI.CurrentStateInternalAPI, currentStateAPI currentstateAPI.CurrentStateInternalAPI,
store storage.Database, store storage.Database,
@ -66,6 +69,7 @@ func NewOutputKeyChangeEventConsumer(
currentStateAPI: currentStateAPI, currentStateAPI: currentStateAPI,
partitionToOffset: make(map[int32]int64), partitionToOffset: make(map[int32]int64),
partitionToOffsetMu: sync.Mutex{}, partitionToOffsetMu: sync.Mutex{},
notifier: n,
} }
consumer.ProcessMessage = s.onMessage consumer.ProcessMessage = s.onMessage
@ -110,59 +114,22 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er
return err return err
} }
// TODO: f.e queryRes.UserIDsToCount : notify users by waking up streams // TODO: f.e queryRes.UserIDsToCount : notify users by waking up streams
posUpdate := types.NewStreamToken(0, 0, map[string]*types.LogPosition{
syncinternal.DeviceListLogName: &types.LogPosition{
Offset: msg.Offset,
Partition: msg.Partition,
},
})
for userID := range queryRes.UserIDsToCount {
s.notifier.OnNewKeyChange(posUpdate, userID, output.UserID)
}
return nil return nil
} }
// Catchup fills in the given response for the given user ID to bring it up-to-date with device lists. hasNew=true if the response
// was filled in, else false if there are no new device list changes because there is nothing to catch up on. The response MUST
// be already filled in with join/leave information.
func (s *OutputKeyChangeEventConsumer) Catchup(
ctx context.Context, userID string, res *types.Response, tok types.StreamingToken,
) (newTok *types.StreamingToken, hasNew bool, err error) {
// Track users who we didn't track before but now do by virtue of sharing a room with them, or not.
newlyJoinedRooms := joinedRooms(res, userID)
newlyLeftRooms := leftRooms(res)
if len(newlyJoinedRooms) > 0 || len(newlyLeftRooms) > 0 {
changed, left, err := s.trackChangedUsers(ctx, userID, newlyJoinedRooms, newlyLeftRooms)
if err != nil {
return nil, false, err
}
res.DeviceLists.Changed = changed
res.DeviceLists.Left = left
hasNew = len(changed) > 0 || len(left) > 0
}
// now also track users who we already share rooms with but who have updated their devices between the two tokens
// TODO: Extract partition/offset from sync token
var partition int32
var offset int64
var queryRes api.QueryKeyChangesResponse
s.keyAPI.QueryKeyChanges(ctx, &api.QueryKeyChangesRequest{
Partition: partition,
Offset: offset,
}, &queryRes)
if queryRes.Error != nil {
// don't fail the catchup because we may have got useful information by tracking membership
util.GetLogger(ctx).WithError(queryRes.Error).Error("QueryKeyChanges failed")
} else {
// TODO: Make a new streaming token using the new offset
userSet := make(map[string]bool)
for _, userID := range res.DeviceLists.Changed {
userSet[userID] = true
}
for _, userID := range queryRes.UserIDs {
if !userSet[userID] {
res.DeviceLists.Changed = append(res.DeviceLists.Changed, userID)
}
}
}
return
}
func (s *OutputKeyChangeEventConsumer) OnJoinEvent(ev *gomatrixserverlib.HeaderedEvent) { func (s *OutputKeyChangeEventConsumer) OnJoinEvent(ev *gomatrixserverlib.HeaderedEvent) {
// work out who we are now sharing rooms with which we previously were not and notify them about the joining // work out who we are now sharing rooms with which we previously were not and notify them about the joining
// users keys: // users keys:
changed, _, err := s.trackChangedUsers(context.Background(), *ev.StateKey(), []string{ev.RoomID()}, nil) changed, _, err := syncinternal.TrackChangedUsers(context.Background(), s.currentStateAPI, *ev.StateKey(), []string{ev.RoomID()}, nil)
if err != nil { if err != nil {
log.WithError(err).Error("OnJoinEvent: failed to work out changed users") log.WithError(err).Error("OnJoinEvent: failed to work out changed users")
return return
@ -175,7 +142,7 @@ func (s *OutputKeyChangeEventConsumer) OnJoinEvent(ev *gomatrixserverlib.Headere
func (s *OutputKeyChangeEventConsumer) OnLeaveEvent(ev *gomatrixserverlib.HeaderedEvent) { func (s *OutputKeyChangeEventConsumer) OnLeaveEvent(ev *gomatrixserverlib.HeaderedEvent) {
// work out who we are no longer sharing any rooms with and notify them about the leaving user // work out who we are no longer sharing any rooms with and notify them about the leaving user
_, left, err := s.trackChangedUsers(context.Background(), *ev.StateKey(), nil, []string{ev.RoomID()}) _, left, err := syncinternal.TrackChangedUsers(context.Background(), s.currentStateAPI, *ev.StateKey(), nil, []string{ev.RoomID()})
if err != nil { if err != nil {
log.WithError(err).Error("OnLeaveEvent: failed to work out left users") log.WithError(err).Error("OnLeaveEvent: failed to work out left users")
return return
@ -186,129 +153,3 @@ func (s *OutputKeyChangeEventConsumer) OnLeaveEvent(ev *gomatrixserverlib.Header
} }
} }
// nolint:gocyclo
func (s *OutputKeyChangeEventConsumer) trackChangedUsers(
ctx context.Context, userID string, newlyJoinedRooms, newlyLeftRooms []string,
) (changed, left []string, err error) {
// process leaves first, then joins afterwards so if we join/leave/join/leave we err on the side of including users.
// Leave algorithm:
// - Get set of users and number of times they appear in rooms prior to leave. - QuerySharedUsersRequest with 'IncludeRoomID'.
// - Get users in newly left room. - QueryCurrentState
// - Loop set of users and decrement by 1 for each user in newly left room.
// - If count=0 then they share no more rooms so inform BOTH parties of this via 'left'=[...] in /sync.
var queryRes currentstateAPI.QuerySharedUsersResponse
err = s.currentStateAPI.QuerySharedUsers(ctx, &currentstateAPI.QuerySharedUsersRequest{
UserID: userID,
IncludeRoomIDs: newlyLeftRooms,
}, &queryRes)
if err != nil {
return nil, nil, err
}
var stateRes currentstateAPI.QueryBulkStateContentResponse
err = s.currentStateAPI.QueryBulkStateContent(ctx, &currentstateAPI.QueryBulkStateContentRequest{
RoomIDs: newlyLeftRooms,
StateTuples: []gomatrixserverlib.StateKeyTuple{
{
EventType: gomatrixserverlib.MRoomMember,
StateKey: "*",
},
},
AllowWildcards: true,
}, &stateRes)
if err != nil {
return nil, nil, err
}
for _, state := range stateRes.Rooms {
for tuple, membership := range state {
if membership != gomatrixserverlib.Join {
continue
}
queryRes.UserIDsToCount[tuple.StateKey]--
}
}
for userID, count := range queryRes.UserIDsToCount {
if count <= 0 {
left = append(left, userID) // left is returned
}
}
// Join algorithm:
// - Get the set of all joined users prior to joining room - QuerySharedUsersRequest with 'ExcludeRoomID'.
// - Get users in newly joined room - QueryCurrentState
// - Loop set of users in newly joined room, do they appear in the set of users prior to joining?
// - If yes: then they already shared a room in common, do nothing.
// - If no: then they are a brand new user so inform BOTH parties of this via 'changed=[...]'
err = s.currentStateAPI.QuerySharedUsers(ctx, &currentstateAPI.QuerySharedUsersRequest{
UserID: userID,
ExcludeRoomIDs: newlyJoinedRooms,
}, &queryRes)
if err != nil {
return nil, left, err
}
err = s.currentStateAPI.QueryBulkStateContent(ctx, &currentstateAPI.QueryBulkStateContentRequest{
RoomIDs: newlyJoinedRooms,
StateTuples: []gomatrixserverlib.StateKeyTuple{
{
EventType: gomatrixserverlib.MRoomMember,
StateKey: "*",
},
},
AllowWildcards: true,
}, &stateRes)
if err != nil {
return nil, left, err
}
for _, state := range stateRes.Rooms {
for tuple, membership := range state {
if membership != gomatrixserverlib.Join {
continue
}
// new user who we weren't previously sharing rooms with
if _, ok := queryRes.UserIDsToCount[tuple.StateKey]; !ok {
changed = append(changed, tuple.StateKey) // changed is returned
}
}
}
return changed, left, nil
}
func joinedRooms(res *types.Response, userID string) []string {
var roomIDs []string
for roomID, join := range res.Rooms.Join {
// we would expect to see our join event somewhere if we newly joined the room.
// Normal events get put in the join section so it's not enough to know the room ID is present in 'join'.
newlyJoined := membershipEventPresent(join.State.Events, userID)
if newlyJoined {
roomIDs = append(roomIDs, roomID)
continue
}
newlyJoined = membershipEventPresent(join.Timeline.Events, userID)
if newlyJoined {
roomIDs = append(roomIDs, roomID)
}
}
return roomIDs
}
func leftRooms(res *types.Response) []string {
roomIDs := make([]string, len(res.Rooms.Leave))
i := 0
for roomID := range res.Rooms.Leave {
roomIDs[i] = roomID
i++
}
return roomIDs
}
func membershipEventPresent(events []gomatrixserverlib.ClientEvent, userID string) bool {
for _, ev := range events {
// it's enough to know that we have our member event here, don't need to check membership content
// as it's implied by being in the respective section of the sync response.
if ev.Type == gomatrixserverlib.MRoomMember && ev.StateKey != nil && *ev.StateKey == userID {
return true
}
}
return false
}

View file

@ -0,0 +1,219 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package internal
import (
"context"
"github.com/Shopify/sarama"
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
"github.com/matrix-org/dendrite/keyserver/api"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
)
const DeviceListLogName = "dl"
// DeviceListCatchup fills in the given response for the given user ID to bring it up-to-date with device lists. hasNew=true if the response
// was filled in, else false if there are no new device list changes because there is nothing to catch up on. The response MUST
// be already filled in with join/leave information.
func DeviceListCatchup(
ctx context.Context, keyAPI keyapi.KeyInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI,
userID string, res *types.Response, tok types.StreamingToken,
) (newTok *types.StreamingToken, hasNew bool, err error) {
// Track users who we didn't track before but now do by virtue of sharing a room with them, or not.
newlyJoinedRooms := joinedRooms(res, userID)
newlyLeftRooms := leftRooms(res)
if len(newlyJoinedRooms) > 0 || len(newlyLeftRooms) > 0 {
changed, left, err := TrackChangedUsers(ctx, stateAPI, userID, newlyJoinedRooms, newlyLeftRooms)
if err != nil {
return nil, false, err
}
res.DeviceLists.Changed = changed
res.DeviceLists.Left = left
hasNew = len(changed) > 0 || len(left) > 0
}
// now also track users who we already share rooms with but who have updated their devices between the two tokens
var partition int32
var offset int64
// Extract partition/offset from sync token
// TODO: In a world where keyserver is sharded there will be multiple partitions and hence multiple QueryKeyChanges to make.
logOffset := tok.Log(DeviceListLogName)
if logOffset != nil {
partition = logOffset.Partition
offset = logOffset.Offset
} else {
partition = -1
offset = sarama.OffsetOldest
}
var queryRes api.QueryKeyChangesResponse
keyAPI.QueryKeyChanges(ctx, &api.QueryKeyChangesRequest{
Partition: partition,
Offset: offset,
}, &queryRes)
if queryRes.Error != nil {
// don't fail the catchup because we may have got useful information by tracking membership
util.GetLogger(ctx).WithError(queryRes.Error).Error("QueryKeyChanges failed")
return
}
userSet := make(map[string]bool)
for _, userID := range res.DeviceLists.Changed {
userSet[userID] = true
}
for _, userID := range queryRes.UserIDs {
if !userSet[userID] {
res.DeviceLists.Changed = append(res.DeviceLists.Changed, userID)
hasNew = true
}
}
// Make a new streaming token using the new offset
tok.SetLog(DeviceListLogName, &types.LogPosition{
Offset: queryRes.Offset,
Partition: queryRes.Partition,
})
newTok = &tok
return
}
// TrackChangedUsers calculates the values of device_lists.changed|left in the /sync response.
// nolint:gocyclo
func TrackChangedUsers(
ctx context.Context, stateAPI currentstateAPI.CurrentStateInternalAPI, userID string, newlyJoinedRooms, newlyLeftRooms []string,
) (changed, left []string, err error) {
// process leaves first, then joins afterwards so if we join/leave/join/leave we err on the side of including users.
// Leave algorithm:
// - Get set of users and number of times they appear in rooms prior to leave. - QuerySharedUsersRequest with 'IncludeRoomID'.
// - Get users in newly left room. - QueryCurrentState
// - Loop set of users and decrement by 1 for each user in newly left room.
// - If count=0 then they share no more rooms so inform BOTH parties of this via 'left'=[...] in /sync.
var queryRes currentstateAPI.QuerySharedUsersResponse
err = stateAPI.QuerySharedUsers(ctx, &currentstateAPI.QuerySharedUsersRequest{
UserID: userID,
IncludeRoomIDs: newlyLeftRooms,
}, &queryRes)
if err != nil {
return nil, nil, err
}
var stateRes currentstateAPI.QueryBulkStateContentResponse
err = stateAPI.QueryBulkStateContent(ctx, &currentstateAPI.QueryBulkStateContentRequest{
RoomIDs: newlyLeftRooms,
StateTuples: []gomatrixserverlib.StateKeyTuple{
{
EventType: gomatrixserverlib.MRoomMember,
StateKey: "*",
},
},
AllowWildcards: true,
}, &stateRes)
if err != nil {
return nil, nil, err
}
for _, state := range stateRes.Rooms {
for tuple, membership := range state {
if membership != gomatrixserverlib.Join {
continue
}
queryRes.UserIDsToCount[tuple.StateKey]--
}
}
for userID, count := range queryRes.UserIDsToCount {
if count <= 0 {
left = append(left, userID) // left is returned
}
}
// Join algorithm:
// - Get the set of all joined users prior to joining room - QuerySharedUsersRequest with 'ExcludeRoomID'.
// - Get users in newly joined room - QueryCurrentState
// - Loop set of users in newly joined room, do they appear in the set of users prior to joining?
// - If yes: then they already shared a room in common, do nothing.
// - If no: then they are a brand new user so inform BOTH parties of this via 'changed=[...]'
err = stateAPI.QuerySharedUsers(ctx, &currentstateAPI.QuerySharedUsersRequest{
UserID: userID,
ExcludeRoomIDs: newlyJoinedRooms,
}, &queryRes)
if err != nil {
return nil, left, err
}
err = stateAPI.QueryBulkStateContent(ctx, &currentstateAPI.QueryBulkStateContentRequest{
RoomIDs: newlyJoinedRooms,
StateTuples: []gomatrixserverlib.StateKeyTuple{
{
EventType: gomatrixserverlib.MRoomMember,
StateKey: "*",
},
},
AllowWildcards: true,
}, &stateRes)
if err != nil {
return nil, left, err
}
for _, state := range stateRes.Rooms {
for tuple, membership := range state {
if membership != gomatrixserverlib.Join {
continue
}
// new user who we weren't previously sharing rooms with
if _, ok := queryRes.UserIDsToCount[tuple.StateKey]; !ok {
changed = append(changed, tuple.StateKey) // changed is returned
}
}
}
return changed, left, nil
}
func joinedRooms(res *types.Response, userID string) []string {
var roomIDs []string
for roomID, join := range res.Rooms.Join {
// we would expect to see our join event somewhere if we newly joined the room.
// Normal events get put in the join section so it's not enough to know the room ID is present in 'join'.
newlyJoined := membershipEventPresent(join.State.Events, userID)
if newlyJoined {
roomIDs = append(roomIDs, roomID)
continue
}
newlyJoined = membershipEventPresent(join.Timeline.Events, userID)
if newlyJoined {
roomIDs = append(roomIDs, roomID)
}
}
return roomIDs
}
func leftRooms(res *types.Response) []string {
roomIDs := make([]string, len(res.Rooms.Leave))
i := 0
for roomID := range res.Rooms.Leave {
roomIDs[i] = roomID
i++
}
return roomIDs
}
func membershipEventPresent(events []gomatrixserverlib.ClientEvent, userID string) bool {
for _, ev := range events {
// it's enough to know that we have our member event here, don't need to check membership content
// as it's implied by being in the respective section of the sync response.
if ev.Type == gomatrixserverlib.MRoomMember && ev.StateKey != nil && *ev.StateKey == userID {
return true
}
}
return false
}

View file

@ -1,4 +1,4 @@
package consumers package internal
import ( import (
"context" "context"
@ -159,18 +159,17 @@ func leaveResponseWithRooms(syncResponse *types.Response, userID string, roomIDs
func TestKeyChangeCatchupOnJoinShareNewUser(t *testing.T) { func TestKeyChangeCatchupOnJoinShareNewUser(t *testing.T) {
newShareUser := "@bill:localhost" newShareUser := "@bill:localhost"
newlyJoinedRoom := "!TestKeyChangeCatchupOnJoinShareNewUser:bar" newlyJoinedRoom := "!TestKeyChangeCatchupOnJoinShareNewUser:bar"
consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{ syncResponse := types.NewResponse()
syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom})
_, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
roomIDToJoinedMembers: map[string][]string{ roomIDToJoinedMembers: map[string][]string{
newlyJoinedRoom: {syncingUser, newShareUser}, newlyJoinedRoom: {syncingUser, newShareUser},
"!another:room": {syncingUser}, "!another:room": {syncingUser},
}, },
}, nil) }, syncingUser, syncResponse, emptyToken)
syncResponse := types.NewResponse()
syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom})
_, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken)
if err != nil { if err != nil {
t.Fatalf("Catchup returned an error: %s", err) t.Fatalf("DeviceListCatchup returned an error: %s", err)
} }
assertCatchup(t, hasNew, syncResponse, wantCatchup{ assertCatchup(t, hasNew, syncResponse, wantCatchup{
hasNew: true, hasNew: true,
@ -182,18 +181,17 @@ func TestKeyChangeCatchupOnJoinShareNewUser(t *testing.T) {
func TestKeyChangeCatchupOnLeaveShareLeftUser(t *testing.T) { func TestKeyChangeCatchupOnLeaveShareLeftUser(t *testing.T) {
removeUser := "@bill:localhost" removeUser := "@bill:localhost"
newlyLeftRoom := "!TestKeyChangeCatchupOnLeaveShareLeftUser:bar" newlyLeftRoom := "!TestKeyChangeCatchupOnLeaveShareLeftUser:bar"
consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{ syncResponse := types.NewResponse()
syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom})
_, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
roomIDToJoinedMembers: map[string][]string{ roomIDToJoinedMembers: map[string][]string{
newlyLeftRoom: {removeUser}, newlyLeftRoom: {removeUser},
"!another:room": {syncingUser}, "!another:room": {syncingUser},
}, },
}, nil) }, syncingUser, syncResponse, emptyToken)
syncResponse := types.NewResponse()
syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom})
_, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken)
if err != nil { if err != nil {
t.Fatalf("Catchup returned an error: %s", err) t.Fatalf("DeviceListCatchup returned an error: %s", err)
} }
assertCatchup(t, hasNew, syncResponse, wantCatchup{ assertCatchup(t, hasNew, syncResponse, wantCatchup{
hasNew: true, hasNew: true,
@ -205,16 +203,15 @@ func TestKeyChangeCatchupOnLeaveShareLeftUser(t *testing.T) {
func TestKeyChangeCatchupOnJoinShareNoNewUsers(t *testing.T) { func TestKeyChangeCatchupOnJoinShareNoNewUsers(t *testing.T) {
existingUser := "@bob:localhost" existingUser := "@bob:localhost"
newlyJoinedRoom := "!TestKeyChangeCatchupOnJoinShareNoNewUsers:bar" newlyJoinedRoom := "!TestKeyChangeCatchupOnJoinShareNoNewUsers:bar"
consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{ syncResponse := types.NewResponse()
syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom})
_, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
roomIDToJoinedMembers: map[string][]string{ roomIDToJoinedMembers: map[string][]string{
newlyJoinedRoom: {syncingUser, existingUser}, newlyJoinedRoom: {syncingUser, existingUser},
"!another:room": {syncingUser, existingUser}, "!another:room": {syncingUser, existingUser},
}, },
}, nil) }, syncingUser, syncResponse, emptyToken)
syncResponse := types.NewResponse()
syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom})
_, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken)
if err != nil { if err != nil {
t.Fatalf("Catchup returned an error: %s", err) t.Fatalf("Catchup returned an error: %s", err)
} }
@ -227,18 +224,17 @@ func TestKeyChangeCatchupOnJoinShareNoNewUsers(t *testing.T) {
func TestKeyChangeCatchupOnLeaveShareNoUsers(t *testing.T) { func TestKeyChangeCatchupOnLeaveShareNoUsers(t *testing.T) {
existingUser := "@bob:localhost" existingUser := "@bob:localhost"
newlyLeftRoom := "!TestKeyChangeCatchupOnLeaveShareNoUsers:bar" newlyLeftRoom := "!TestKeyChangeCatchupOnLeaveShareNoUsers:bar"
consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{ syncResponse := types.NewResponse()
syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom})
_, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
roomIDToJoinedMembers: map[string][]string{ roomIDToJoinedMembers: map[string][]string{
newlyLeftRoom: {existingUser}, newlyLeftRoom: {existingUser},
"!another:room": {syncingUser, existingUser}, "!another:room": {syncingUser, existingUser},
}, },
}, nil) }, syncingUser, syncResponse, emptyToken)
syncResponse := types.NewResponse()
syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom})
_, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken)
if err != nil { if err != nil {
t.Fatalf("Catchup returned an error: %s", err) t.Fatalf("DeviceListCatchup returned an error: %s", err)
} }
assertCatchup(t, hasNew, syncResponse, wantCatchup{ assertCatchup(t, hasNew, syncResponse, wantCatchup{
hasNew: false, hasNew: false,
@ -249,11 +245,6 @@ func TestKeyChangeCatchupOnLeaveShareNoUsers(t *testing.T) {
func TestKeyChangeCatchupNoNewJoinsButMessages(t *testing.T) { func TestKeyChangeCatchupNoNewJoinsButMessages(t *testing.T) {
existingUser := "@bob1:localhost" existingUser := "@bob1:localhost"
roomID := "!TestKeyChangeCatchupNoNewJoinsButMessages:bar" roomID := "!TestKeyChangeCatchupNoNewJoinsButMessages:bar"
consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{
roomIDToJoinedMembers: map[string][]string{
roomID: {syncingUser, existingUser},
},
}, nil)
syncResponse := types.NewResponse() syncResponse := types.NewResponse()
empty := "" empty := ""
roomStateEvents := []gomatrixserverlib.ClientEvent{ roomStateEvents := []gomatrixserverlib.ClientEvent{
@ -295,9 +286,13 @@ func TestKeyChangeCatchupNoNewJoinsButMessages(t *testing.T) {
jr.Timeline.Events = roomTimelineEvents jr.Timeline.Events = roomTimelineEvents
syncResponse.Rooms.Join[roomID] = jr syncResponse.Rooms.Join[roomID] = jr
_, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken) _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
roomIDToJoinedMembers: map[string][]string{
roomID: {syncingUser, existingUser},
},
}, syncingUser, syncResponse, emptyToken)
if err != nil { if err != nil {
t.Fatalf("Catchup returned an error: %s", err) t.Fatalf("DeviceListCatchup returned an error: %s", err)
} }
assertCatchup(t, hasNew, syncResponse, wantCatchup{ assertCatchup(t, hasNew, syncResponse, wantCatchup{
hasNew: false, hasNew: false,
@ -312,18 +307,17 @@ func TestKeyChangeCatchupChangeAndLeft(t *testing.T) {
newlyLeftUser2 := "@debra:localhost" newlyLeftUser2 := "@debra:localhost"
newlyJoinedRoom := "!join:bar" newlyJoinedRoom := "!join:bar"
newlyLeftRoom := "!left:bar" newlyLeftRoom := "!left:bar"
consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{ syncResponse := types.NewResponse()
syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom})
syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom})
_, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
roomIDToJoinedMembers: map[string][]string{ roomIDToJoinedMembers: map[string][]string{
newlyJoinedRoom: {syncingUser, newShareUser, newShareUser2}, newlyJoinedRoom: {syncingUser, newShareUser, newShareUser2},
newlyLeftRoom: {newlyLeftUser, newlyLeftUser2}, newlyLeftRoom: {newlyLeftUser, newlyLeftUser2},
"!another:room": {syncingUser}, "!another:room": {syncingUser},
}, },
}, nil) }, syncingUser, syncResponse, emptyToken)
syncResponse := types.NewResponse()
syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom})
syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom})
_, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken)
if err != nil { if err != nil {
t.Fatalf("Catchup returned an error: %s", err) t.Fatalf("Catchup returned an error: %s", err)
} }
@ -348,12 +342,6 @@ func TestKeyChangeCatchupChangeAndLeftSameRoom(t *testing.T) {
newShareUser := "@berta:localhost" newShareUser := "@berta:localhost"
newShareUser2 := "@bobby:localhost" newShareUser2 := "@bobby:localhost"
roomID := "!join:bar" roomID := "!join:bar"
consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{
roomIDToJoinedMembers: map[string][]string{
roomID: {newShareUser, newShareUser2},
"!another:room": {syncingUser},
},
}, nil)
syncResponse := types.NewResponse() syncResponse := types.NewResponse()
roomEvents := []gomatrixserverlib.ClientEvent{ roomEvents := []gomatrixserverlib.ClientEvent{
{ {
@ -408,9 +396,14 @@ func TestKeyChangeCatchupChangeAndLeftSameRoom(t *testing.T) {
lr.Timeline.Events = roomEvents lr.Timeline.Events = roomEvents
syncResponse.Rooms.Leave[roomID] = lr syncResponse.Rooms.Leave[roomID] = lr
_, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken) _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
roomIDToJoinedMembers: map[string][]string{
roomID: {newShareUser, newShareUser2},
"!another:room": {syncingUser},
},
}, syncingUser, syncResponse, emptyToken)
if err != nil { if err != nil {
t.Fatalf("Catchup returned an error: %s", err) t.Fatalf("DeviceListCatchup returned an error: %s", err)
} }
assertCatchup(t, hasNew, syncResponse, wantCatchup{ assertCatchup(t, hasNew, syncResponse, wantCatchup{
hasNew: true, hasNew: true,

View file

@ -132,6 +132,16 @@ func (n *Notifier) OnNewSendToDevice(
n.wakeupUserDevice(userID, deviceIDs, latestPos) n.wakeupUserDevice(userID, deviceIDs, latestPos)
} }
func (n *Notifier) OnNewKeyChange(
posUpdate types.StreamingToken, wakeUserID, keyChangeUserID string,
) {
n.streamLock.Lock()
defer n.streamLock.Unlock()
latestPos := n.currPos.WithUpdates(posUpdate)
n.currPos = latestPos
n.wakeupUsers([]string{wakeUserID}, latestPos)
}
// GetListener returns a UserStreamListener that can be used to wait for // GetListener returns a UserStreamListener that can be used to wait for
// updates for a user. Must be closed. // updates for a user. Must be closed.
// notify for anything before sincePos // notify for anything before sincePos

View file

@ -22,6 +22,9 @@ import (
"time" "time"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/syncapi/internal"
"github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api" userapi "github.com/matrix-org/dendrite/userapi/api"
@ -35,11 +38,16 @@ type RequestPool struct {
db storage.Database db storage.Database
userAPI userapi.UserInternalAPI userAPI userapi.UserInternalAPI
notifier *Notifier notifier *Notifier
keyAPI keyapi.KeyInternalAPI
stateAPI currentstateAPI.CurrentStateInternalAPI
} }
// NewRequestPool makes a new RequestPool // NewRequestPool makes a new RequestPool
func NewRequestPool(db storage.Database, n *Notifier, userAPI userapi.UserInternalAPI) *RequestPool { func NewRequestPool(
return &RequestPool{db, userAPI, n} db storage.Database, n *Notifier, userAPI userapi.UserInternalAPI, keyAPI keyapi.KeyInternalAPI,
stateAPI currentstateAPI.CurrentStateInternalAPI,
) *RequestPool {
return &RequestPool{db, userAPI, n, keyAPI, stateAPI}
} }
// OnIncomingSyncRequest is called when a client makes a /sync request. This function MUST be // OnIncomingSyncRequest is called when a client makes a /sync request. This function MUST be
@ -164,6 +172,10 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea
if err != nil { if err != nil {
return return
} }
res, err = rp.appendDeviceLists(res, req.device.UserID, since)
if err != nil {
return
}
// Before we return the sync response, make sure that we take action on // Before we return the sync response, make sure that we take action on
// any send-to-device database updates or deletions that we need to do. // any send-to-device database updates or deletions that we need to do.
@ -192,6 +204,22 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea
return return
} }
func (rp *RequestPool) appendDeviceLists(
data *types.Response, userID string, since types.StreamingToken,
) (*types.Response, error) {
// TODO: Currently this code will race which may result in duplicates but not missing data.
// This happens because, whilst we are told the range to fetch here (since / latest) the
// QueryKeyChanges API only exposes a "from" value (on purpose to avoid racing, which then
// returns the latest position with which the response has authority on). We'd need to tweak
// the API to expose a "to" value to fix this.
_, _, err := internal.DeviceListCatchup(context.Background(), rp.keyAPI, rp.stateAPI, userID, data, since)
if err != nil {
return nil, err
}
return data, nil
}
// nolint:gocyclo // nolint:gocyclo
func (rp *RequestPool) appendAccountData( func (rp *RequestPool) appendAccountData(
data *types.Response, userID string, req syncRequest, currentPos types.StreamPosition, data *types.Response, userID string, req syncRequest, currentPos types.StreamPosition,

View file

@ -21,7 +21,9 @@ import (
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
currentstateapi "github.com/matrix-org/dendrite/currentstateserver/api"
"github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/config"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
userapi "github.com/matrix-org/dendrite/userapi/api" userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
@ -39,6 +41,8 @@ func AddPublicRoutes(
consumer sarama.Consumer, consumer sarama.Consumer,
userAPI userapi.UserInternalAPI, userAPI userapi.UserInternalAPI,
rsAPI api.RoomserverInternalAPI, rsAPI api.RoomserverInternalAPI,
keyAPI keyapi.KeyInternalAPI,
currentStateAPI currentstateapi.CurrentStateInternalAPI,
federation *gomatrixserverlib.FederationClient, federation *gomatrixserverlib.FederationClient,
cfg *config.Dendrite, cfg *config.Dendrite,
) { ) {
@ -58,7 +62,7 @@ func AddPublicRoutes(
logrus.WithError(err).Panicf("failed to start notifier") logrus.WithError(err).Panicf("failed to start notifier")
} }
requestPool := sync.NewRequestPool(syncDB, notifier, userAPI) requestPool := sync.NewRequestPool(syncDB, notifier, userAPI, keyAPI, currentStateAPI)
roomConsumer := consumers.NewOutputRoomEventConsumer( roomConsumer := consumers.NewOutputRoomEventConsumer(
cfg, consumer, notifier, syncDB, rsAPI, cfg, consumer, notifier, syncDB, rsAPI,
@ -88,5 +92,13 @@ func AddPublicRoutes(
logrus.WithError(err).Panicf("failed to start send-to-device consumer") logrus.WithError(err).Panicf("failed to start send-to-device consumer")
} }
keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer(
cfg.Matrix.ServerName, string(cfg.Kafka.Topics.OutputKeyChangeEvent),
consumer, notifier, keyAPI, currentStateAPI, syncDB,
)
if err = keyChangeConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start key change consumer")
}
routing.Setup(router, requestPool, syncDB, userAPI, federation, rsAPI, cfg) routing.Setup(router, requestPool, syncDB, userAPI, federation, rsAPI, cfg)
} }

View file

@ -110,6 +110,10 @@ type StreamingToken struct {
logs map[string]*LogPosition logs map[string]*LogPosition
} }
func (t *StreamingToken) SetLog(name string, lp *LogPosition) {
t.logs[name] = lp
}
func (t *StreamingToken) Log(name string) *LogPosition { func (t *StreamingToken) Log(name string) *LogPosition {
l, ok := t.logs[name] l, ok := t.logs[name]
if !ok { if !ok {

View file

@ -127,6 +127,7 @@ Can query specific device keys using POST
query for user with no keys returns empty key dict query for user with no keys returns empty key dict
Can claim one time key using POST Can claim one time key using POST
Can claim remote one time key using POST Can claim remote one time key using POST
Local device key changes appear in v2 /sync
Can add account data Can add account data
Can add account data to room Can add account data to room
Can get account data without syncing Can get account data without syncing