WIP hooking up key changes

This commit is contained in:
Kegan Dougal 2020-07-29 19:23:03 +01:00
parent 0fdd4f14d1
commit fee53940ae
9 changed files with 110 additions and 15 deletions

View file

@ -77,6 +77,7 @@ func (m *Monolith) AddAllPublicRoutes(publicMux *mux.Router) {
)
mediaapi.AddPublicRoutes(publicMux, m.Config, m.UserAPI, m.Client)
syncapi.AddPublicRoutes(
publicMux, m.KafkaConsumer, m.UserAPI, m.RoomserverAPI, m.FedClient, m.Config,
publicMux, m.KafkaConsumer, m.UserAPI, m.RoomserverAPI,
m.KeyAPI, m.StateAPI, m.FedClient, m.Config,
)
}

View file

@ -143,6 +143,8 @@ type QueryKeyChangesRequest struct {
type QueryKeyChangesResponse struct {
// The set of users who have had their keys change.
UserIDs []string
// The partition being served - useful if the partition is unknown at request time
Partition int32
// The latest offset represented in this response.
Offset int64
// Set if there was a problem handling the request.

View file

@ -41,6 +41,9 @@ type KeyInternalAPI struct {
}
func (a *KeyInternalAPI) QueryKeyChanges(ctx context.Context, req *api.QueryKeyChangesRequest, res *api.QueryKeyChangesResponse) {
if req.Partition < 0 {
req.Partition = a.Producer.DefaultPartition()
}
userIDs, latest, err := a.DB.KeyChanges(ctx, req.Partition, req.Offset)
if err != nil {
res.Error = &api.KeyError{
@ -48,6 +51,7 @@ func (a *KeyInternalAPI) QueryKeyChanges(ctx context.Context, req *api.QueryKeyC
}
}
res.Offset = latest
res.Partition = req.Partition
res.UserIDs = userIDs
}

View file

@ -31,6 +31,15 @@ type KeyChange struct {
DB storage.Database
}
// DefaultPartition returns the default partition this process is sending key changes to.
// NB: A keyserver MUST send key changes to only 1 partition or else query operations will
// become inconsistent. Partitions can be sharded (e.g by hash of user ID of key change) but
// then all keyservers must be queried to calculate the entire set of key changes between
// two sync tokens.
func (p *KeyChange) DefaultPartition() int32 {
return 0
}
// ProduceKeyChanges creates new change events for each key
func (p *KeyChange) ProduceKeyChanges(keys []api.DeviceKeys) error {
for _, key := range keys {

View file

@ -24,12 +24,15 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/syncapi/storage"
syncapi "github.com/matrix-org/dendrite/syncapi/sync"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
log "github.com/sirupsen/logrus"
)
const deviceListLogName = "dl"
// OutputKeyChangeEventConsumer consumes events that originated in the key server.
type OutputKeyChangeEventConsumer struct {
keyChangeConsumer *internal.ContinualConsumer
@ -39,6 +42,7 @@ type OutputKeyChangeEventConsumer struct {
keyAPI api.KeyInternalAPI
partitionToOffset map[int32]int64
partitionToOffsetMu sync.Mutex
notifier *syncapi.Notifier
}
// NewOutputKeyChangeEventConsumer creates a new OutputKeyChangeEventConsumer.
@ -47,6 +51,7 @@ func NewOutputKeyChangeEventConsumer(
serverName gomatrixserverlib.ServerName,
topic string,
kafkaConsumer sarama.Consumer,
n *syncapi.Notifier,
keyAPI api.KeyInternalAPI,
currentStateAPI currentstateAPI.CurrentStateInternalAPI,
store storage.Database,
@ -66,6 +71,7 @@ func NewOutputKeyChangeEventConsumer(
currentStateAPI: currentStateAPI,
partitionToOffset: make(map[int32]int64),
partitionToOffsetMu: sync.Mutex{},
notifier: n,
}
consumer.ProcessMessage = s.onMessage
@ -110,6 +116,15 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er
return err
}
// TODO: f.e queryRes.UserIDsToCount : notify users by waking up streams
posUpdate := types.NewStreamToken(0, 0, map[string]*types.LogPosition{
deviceListLogName: &types.LogPosition{
Offset: msg.Offset,
Partition: msg.Partition,
},
})
for userID := range queryRes.UserIDsToCount {
s.notifier.OnNewKeyChange(posUpdate, userID, output.UserID)
}
return nil
}
@ -133,9 +148,19 @@ func (s *OutputKeyChangeEventConsumer) Catchup(
}
// now also track users who we already share rooms with but who have updated their devices between the two tokens
// TODO: Extract partition/offset from sync token
var partition int32
var offset int64
// Extract partition/offset from sync token
// TODO: In a world where keyserver is sharded there will be multiple partitions and hence multiple QueryKeyChanges to make.
logOffset := tok.Log(deviceListLogName)
if logOffset != nil {
partition = logOffset.Partition
offset = logOffset.Offset
} else {
partition = -1
offset = sarama.OffsetOldest
}
var queryRes api.QueryKeyChangesResponse
s.keyAPI.QueryKeyChanges(ctx, &api.QueryKeyChangesRequest{
Partition: partition,
@ -144,8 +169,8 @@ func (s *OutputKeyChangeEventConsumer) Catchup(
if queryRes.Error != nil {
// don't fail the catchup because we may have got useful information by tracking membership
util.GetLogger(ctx).WithError(queryRes.Error).Error("QueryKeyChanges failed")
} else {
// TODO: Make a new streaming token using the new offset
return
}
userSet := make(map[string]bool)
for _, userID := range res.DeviceLists.Changed {
userSet[userID] = true
@ -153,9 +178,15 @@ func (s *OutputKeyChangeEventConsumer) Catchup(
for _, userID := range queryRes.UserIDs {
if !userSet[userID] {
res.DeviceLists.Changed = append(res.DeviceLists.Changed, userID)
hasNew = true
}
}
}
// Make a new streaming token using the new offset
tok.SetLog(deviceListLogName, &types.LogPosition{
Offset: queryRes.Offset,
Partition: queryRes.Partition,
})
newTok = &tok
return
}

View file

@ -132,6 +132,16 @@ func (n *Notifier) OnNewSendToDevice(
n.wakeupUserDevice(userID, deviceIDs, latestPos)
}
func (n *Notifier) OnNewKeyChange(
posUpdate types.StreamingToken, wakeUserID, keyChangeUserID string,
) {
n.streamLock.Lock()
defer n.streamLock.Unlock()
latestPos := n.currPos.WithUpdates(posUpdate)
n.currPos = latestPos
n.wakeupUsers([]string{wakeUserID}, latestPos)
}
// GetListener returns a UserStreamListener that can be used to wait for
// updates for a user. Must be closed.
// notify for anything before sincePos

View file

@ -22,6 +22,7 @@ import (
"time"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/syncapi/consumers"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
@ -35,6 +36,7 @@ type RequestPool struct {
db storage.Database
userAPI userapi.UserInternalAPI
notifier *Notifier
keyChanges *consumers.OutputKeyChangeEventConsumer
}
// NewRequestPool makes a new RequestPool
@ -164,6 +166,10 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea
if err != nil {
return
}
res, err = rp.appendDeviceLists(res, req.device.UserID, since, latestPos)
if err != nil {
return
}
// Before we return the sync response, make sure that we take action on
// any send-to-device database updates or deletions that we need to do.
@ -192,6 +198,22 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea
return
}
func (rp *RequestPool) appendDeviceLists(
data *types.Response, userID string, since, latest types.StreamingToken,
) (*types.Response, error) {
// TODO: Currently this code will race which may result in duplicates but not missing data.
// This happens because, whilst we are told the range to fetch here (since / latest) the
// QueryKeyChanges API only exposes a "from" value (on purpose to avoid racing, which then
// returns the latest position with which the response has authority on). We'd need to tweak
// the API to expose a "to" value to fix this.
_, _, err := rp.keyChanges.Catchup(context.Background(), userID, data, since)
if err != nil {
return nil, err
}
return data, nil
}
// nolint:gocyclo
func (rp *RequestPool) appendAccountData(
data *types.Response, userID string, req syncRequest, currentPos types.StreamPosition,

View file

@ -21,7 +21,9 @@ import (
"github.com/gorilla/mux"
"github.com/sirupsen/logrus"
currentstateapi "github.com/matrix-org/dendrite/currentstateserver/api"
"github.com/matrix-org/dendrite/internal/config"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/roomserver/api"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
@ -39,6 +41,8 @@ func AddPublicRoutes(
consumer sarama.Consumer,
userAPI userapi.UserInternalAPI,
rsAPI api.RoomserverInternalAPI,
keyAPI keyapi.KeyInternalAPI,
currentStateAPI currentstateapi.CurrentStateInternalAPI,
federation *gomatrixserverlib.FederationClient,
cfg *config.Dendrite,
) {
@ -88,5 +92,13 @@ func AddPublicRoutes(
logrus.WithError(err).Panicf("failed to start send-to-device consumer")
}
keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer(
cfg.Matrix.ServerName, string(cfg.Kafka.Topics.OutputKeyChangeEvent),
consumer, notifier, keyAPI, currentStateAPI, syncDB,
)
if err = keyChangeConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start key change consumer")
}
routing.Setup(router, requestPool, syncDB, userAPI, federation, rsAPI, cfg)
}

View file

@ -110,6 +110,10 @@ type StreamingToken struct {
logs map[string]*LogPosition
}
func (t *StreamingToken) SetLog(name string, lp *LogPosition) {
t.logs[name] = lp
}
func (t *StreamingToken) Log(name string) *LogPosition {
l, ok := t.logs[name]
if !ok {