From fee53940ae097542a960c237b30bea01fdc45cab Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Wed, 29 Jul 2020 19:23:03 +0100 Subject: [PATCH] WIP hooking up key changes --- internal/setup/monolith.go | 3 +- keyserver/api/api.go | 2 ++ keyserver/internal/internal.go | 4 +++ keyserver/producers/keychange.go | 9 ++++++ syncapi/consumers/keychange.go | 53 +++++++++++++++++++++++++------- syncapi/sync/notifier.go | 10 ++++++ syncapi/sync/requestpool.go | 28 +++++++++++++++-- syncapi/syncapi.go | 12 ++++++++ syncapi/types/types.go | 4 +++ 9 files changed, 110 insertions(+), 15 deletions(-) diff --git a/internal/setup/monolith.go b/internal/setup/monolith.go index 1f6d9a761..f33f97ee4 100644 --- a/internal/setup/monolith.go +++ b/internal/setup/monolith.go @@ -77,6 +77,7 @@ func (m *Monolith) AddAllPublicRoutes(publicMux *mux.Router) { ) mediaapi.AddPublicRoutes(publicMux, m.Config, m.UserAPI, m.Client) syncapi.AddPublicRoutes( - publicMux, m.KafkaConsumer, m.UserAPI, m.RoomserverAPI, m.FedClient, m.Config, + publicMux, m.KafkaConsumer, m.UserAPI, m.RoomserverAPI, + m.KeyAPI, m.StateAPI, m.FedClient, m.Config, ) } diff --git a/keyserver/api/api.go b/keyserver/api/api.go index 406a252d5..c9afb09cc 100644 --- a/keyserver/api/api.go +++ b/keyserver/api/api.go @@ -143,6 +143,8 @@ type QueryKeyChangesRequest struct { type QueryKeyChangesResponse struct { // The set of users who have had their keys change. UserIDs []string + // The partition being served - useful if the partition is unknown at request time + Partition int32 // The latest offset represented in this response. Offset int64 // Set if there was a problem handling the request. diff --git a/keyserver/internal/internal.go b/keyserver/internal/internal.go index 240a56403..9a41e44fc 100644 --- a/keyserver/internal/internal.go +++ b/keyserver/internal/internal.go @@ -41,6 +41,9 @@ type KeyInternalAPI struct { } func (a *KeyInternalAPI) QueryKeyChanges(ctx context.Context, req *api.QueryKeyChangesRequest, res *api.QueryKeyChangesResponse) { + if req.Partition < 0 { + req.Partition = a.Producer.DefaultPartition() + } userIDs, latest, err := a.DB.KeyChanges(ctx, req.Partition, req.Offset) if err != nil { res.Error = &api.KeyError{ @@ -48,6 +51,7 @@ func (a *KeyInternalAPI) QueryKeyChanges(ctx context.Context, req *api.QueryKeyC } } res.Offset = latest + res.Partition = req.Partition res.UserIDs = userIDs } diff --git a/keyserver/producers/keychange.go b/keyserver/producers/keychange.go index d59dd2002..c51d9f55d 100644 --- a/keyserver/producers/keychange.go +++ b/keyserver/producers/keychange.go @@ -31,6 +31,15 @@ type KeyChange struct { DB storage.Database } +// DefaultPartition returns the default partition this process is sending key changes to. +// NB: A keyserver MUST send key changes to only 1 partition or else query operations will +// become inconsistent. Partitions can be sharded (e.g by hash of user ID of key change) but +// then all keyservers must be queried to calculate the entire set of key changes between +// two sync tokens. +func (p *KeyChange) DefaultPartition() int32 { + return 0 +} + // ProduceKeyChanges creates new change events for each key func (p *KeyChange) ProduceKeyChanges(keys []api.DeviceKeys) error { for _, key := range keys { diff --git a/syncapi/consumers/keychange.go b/syncapi/consumers/keychange.go index 78aff6011..8e8c19d2d 100644 --- a/syncapi/consumers/keychange.go +++ b/syncapi/consumers/keychange.go @@ -24,12 +24,15 @@ import ( "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/syncapi/storage" + syncapi "github.com/matrix-org/dendrite/syncapi/sync" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" log "github.com/sirupsen/logrus" ) +const deviceListLogName = "dl" + // OutputKeyChangeEventConsumer consumes events that originated in the key server. type OutputKeyChangeEventConsumer struct { keyChangeConsumer *internal.ContinualConsumer @@ -39,6 +42,7 @@ type OutputKeyChangeEventConsumer struct { keyAPI api.KeyInternalAPI partitionToOffset map[int32]int64 partitionToOffsetMu sync.Mutex + notifier *syncapi.Notifier } // NewOutputKeyChangeEventConsumer creates a new OutputKeyChangeEventConsumer. @@ -47,6 +51,7 @@ func NewOutputKeyChangeEventConsumer( serverName gomatrixserverlib.ServerName, topic string, kafkaConsumer sarama.Consumer, + n *syncapi.Notifier, keyAPI api.KeyInternalAPI, currentStateAPI currentstateAPI.CurrentStateInternalAPI, store storage.Database, @@ -66,6 +71,7 @@ func NewOutputKeyChangeEventConsumer( currentStateAPI: currentStateAPI, partitionToOffset: make(map[int32]int64), partitionToOffsetMu: sync.Mutex{}, + notifier: n, } consumer.ProcessMessage = s.onMessage @@ -110,6 +116,15 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er return err } // TODO: f.e queryRes.UserIDsToCount : notify users by waking up streams + posUpdate := types.NewStreamToken(0, 0, map[string]*types.LogPosition{ + deviceListLogName: &types.LogPosition{ + Offset: msg.Offset, + Partition: msg.Partition, + }, + }) + for userID := range queryRes.UserIDsToCount { + s.notifier.OnNewKeyChange(posUpdate, userID, output.UserID) + } return nil } @@ -133,9 +148,19 @@ func (s *OutputKeyChangeEventConsumer) Catchup( } // now also track users who we already share rooms with but who have updated their devices between the two tokens - // TODO: Extract partition/offset from sync token + var partition int32 var offset int64 + // Extract partition/offset from sync token + // TODO: In a world where keyserver is sharded there will be multiple partitions and hence multiple QueryKeyChanges to make. + logOffset := tok.Log(deviceListLogName) + if logOffset != nil { + partition = logOffset.Partition + offset = logOffset.Offset + } else { + partition = -1 + offset = sarama.OffsetOldest + } var queryRes api.QueryKeyChangesResponse s.keyAPI.QueryKeyChanges(ctx, &api.QueryKeyChangesRequest{ Partition: partition, @@ -144,18 +169,24 @@ func (s *OutputKeyChangeEventConsumer) Catchup( if queryRes.Error != nil { // don't fail the catchup because we may have got useful information by tracking membership util.GetLogger(ctx).WithError(queryRes.Error).Error("QueryKeyChanges failed") - } else { - // TODO: Make a new streaming token using the new offset - userSet := make(map[string]bool) - for _, userID := range res.DeviceLists.Changed { - userSet[userID] = true - } - for _, userID := range queryRes.UserIDs { - if !userSet[userID] { - res.DeviceLists.Changed = append(res.DeviceLists.Changed, userID) - } + return + } + userSet := make(map[string]bool) + for _, userID := range res.DeviceLists.Changed { + userSet[userID] = true + } + for _, userID := range queryRes.UserIDs { + if !userSet[userID] { + res.DeviceLists.Changed = append(res.DeviceLists.Changed, userID) + hasNew = true } } + // Make a new streaming token using the new offset + tok.SetLog(deviceListLogName, &types.LogPosition{ + Offset: queryRes.Offset, + Partition: queryRes.Partition, + }) + newTok = &tok return } diff --git a/syncapi/sync/notifier.go b/syncapi/sync/notifier.go index 325e75351..df23a2f4a 100644 --- a/syncapi/sync/notifier.go +++ b/syncapi/sync/notifier.go @@ -132,6 +132,16 @@ func (n *Notifier) OnNewSendToDevice( n.wakeupUserDevice(userID, deviceIDs, latestPos) } +func (n *Notifier) OnNewKeyChange( + posUpdate types.StreamingToken, wakeUserID, keyChangeUserID string, +) { + n.streamLock.Lock() + defer n.streamLock.Unlock() + latestPos := n.currPos.WithUpdates(posUpdate) + n.currPos = latestPos + n.wakeupUsers([]string{wakeUserID}, latestPos) +} + // GetListener returns a UserStreamListener that can be used to wait for // updates for a user. Must be closed. // notify for anything before sincePos diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index bf6a9e01f..65ad2f3a3 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -22,6 +22,7 @@ import ( "time" "github.com/matrix-org/dendrite/clientapi/jsonerror" + "github.com/matrix-org/dendrite/syncapi/consumers" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" userapi "github.com/matrix-org/dendrite/userapi/api" @@ -32,9 +33,10 @@ import ( // RequestPool manages HTTP long-poll connections for /sync type RequestPool struct { - db storage.Database - userAPI userapi.UserInternalAPI - notifier *Notifier + db storage.Database + userAPI userapi.UserInternalAPI + notifier *Notifier + keyChanges *consumers.OutputKeyChangeEventConsumer } // NewRequestPool makes a new RequestPool @@ -164,6 +166,10 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea if err != nil { return } + res, err = rp.appendDeviceLists(res, req.device.UserID, since, latestPos) + if err != nil { + return + } // Before we return the sync response, make sure that we take action on // any send-to-device database updates or deletions that we need to do. @@ -192,6 +198,22 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea return } +func (rp *RequestPool) appendDeviceLists( + data *types.Response, userID string, since, latest types.StreamingToken, +) (*types.Response, error) { + // TODO: Currently this code will race which may result in duplicates but not missing data. + // This happens because, whilst we are told the range to fetch here (since / latest) the + // QueryKeyChanges API only exposes a "from" value (on purpose to avoid racing, which then + // returns the latest position with which the response has authority on). We'd need to tweak + // the API to expose a "to" value to fix this. + _, _, err := rp.keyChanges.Catchup(context.Background(), userID, data, since) + if err != nil { + return nil, err + } + + return data, nil +} + // nolint:gocyclo func (rp *RequestPool) appendAccountData( data *types.Response, userID string, req syncRequest, currentPos types.StreamPosition, diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index caf91e27e..570f53344 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -21,7 +21,9 @@ import ( "github.com/gorilla/mux" "github.com/sirupsen/logrus" + currentstateapi "github.com/matrix-org/dendrite/currentstateserver/api" "github.com/matrix-org/dendrite/internal/config" + keyapi "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/roomserver/api" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" @@ -39,6 +41,8 @@ func AddPublicRoutes( consumer sarama.Consumer, userAPI userapi.UserInternalAPI, rsAPI api.RoomserverInternalAPI, + keyAPI keyapi.KeyInternalAPI, + currentStateAPI currentstateapi.CurrentStateInternalAPI, federation *gomatrixserverlib.FederationClient, cfg *config.Dendrite, ) { @@ -88,5 +92,13 @@ func AddPublicRoutes( logrus.WithError(err).Panicf("failed to start send-to-device consumer") } + keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer( + cfg.Matrix.ServerName, string(cfg.Kafka.Topics.OutputKeyChangeEvent), + consumer, notifier, keyAPI, currentStateAPI, syncDB, + ) + if err = keyChangeConsumer.Start(); err != nil { + logrus.WithError(err).Panicf("failed to start key change consumer") + } + routing.Setup(router, requestPool, syncDB, userAPI, federation, rsAPI, cfg) } diff --git a/syncapi/types/types.go b/syncapi/types/types.go index 7bba8e522..f20c73bff 100644 --- a/syncapi/types/types.go +++ b/syncapi/types/types.go @@ -110,6 +110,10 @@ type StreamingToken struct { logs map[string]*LogPosition } +func (t *StreamingToken) SetLog(name string, lp *LogPosition) { + t.logs[name] = lp +} + func (t *StreamingToken) Log(name string) *LogPosition { l, ok := t.logs[name] if !ok {