Consume key change events in fedsender

Don't yet send them to destinations as we haven't worked them out yet
This commit is contained in:
Kegan Dougal 2020-08-03 18:17:16 +01:00
parent fa8e639497
commit b67e0b3374
5 changed files with 143 additions and 17 deletions

View file

@ -13,10 +13,11 @@
package routing
import (
"encoding/json"
"net/http"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
userapi "github.com/matrix-org/dendrite/userapi/api"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
)
@ -24,30 +25,35 @@ import (
// GetUserDevices for the given user id
func GetUserDevices(
req *http.Request,
userAPI userapi.UserInternalAPI,
keyAPI keyapi.KeyInternalAPI,
userID string,
) util.JSONResponse {
response := gomatrixserverlib.RespUserDevices{
UserID: userID,
// TODO: we should return an incrementing stream ID each time the device
// list changes for delta changes to be recognised
StreamID: 0,
}
var res userapi.QueryDevicesResponse
err := userAPI.QueryDevices(req.Context(), &userapi.QueryDevicesRequest{
var res keyapi.QueryDeviceMessagesResponse
keyAPI.QueryDeviceMessages(req.Context(), &keyapi.QueryDeviceMessagesRequest{
UserID: userID,
}, &res)
if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("userAPI.QueryDevices failed")
if res.Error != nil {
util.GetLogger(req.Context()).WithError(res.Error).Error("keyAPI.QueryDeviceMessages failed")
return jsonerror.InternalServerError()
}
response := gomatrixserverlib.RespUserDevices{
UserID: userID,
StreamID: res.StreamID,
}
for _, dev := range res.Devices {
var key gomatrixserverlib.RespUserDeviceKeys
err := json.Unmarshal(dev.DeviceKeys.KeyJSON, &key)
if err != nil {
util.GetLogger(req.Context()).WithError(err).Warnf("malformed device key: %s", string(dev.DeviceKeys.KeyJSON))
continue
}
device := gomatrixserverlib.RespUserDevice{
DeviceID: dev.ID,
DeviceID: dev.DeviceID,
DisplayName: dev.DisplayName,
Keys: []gomatrixserverlib.RespUserDeviceKeys{},
Keys: []gomatrixserverlib.RespUserDeviceKeys{key},
}
response.Devices = append(response.Devices, device)
}

View file

@ -186,7 +186,7 @@ func Setup(
"federation_user_devices", cfg.Matrix.ServerName, keys, wakeup,
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest, vars map[string]string) util.JSONResponse {
return GetUserDevices(
httpReq, userAPI, vars["userID"],
httpReq, keyAPI, vars["userID"],
)
},
)).Methods(http.MethodGet)

View file

@ -0,0 +1,116 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package consumers
import (
"encoding/json"
"fmt"
"github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/federationsender/queue"
"github.com/matrix-org/dendrite/federationsender/storage"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus"
)
// KeyChangeConsumer consumes events that originate in key server.
type KeyChangeConsumer struct {
consumer *internal.ContinualConsumer
db storage.Database
queues *queue.OutgoingQueues
serverName gomatrixserverlib.ServerName
}
// NewKeyChangeConsumer creates a new KeyChangeConsumer. Call Start() to begin consuming from key servers.
func NewKeyChangeConsumer(
cfg *config.Dendrite,
kafkaConsumer sarama.Consumer,
queues *queue.OutgoingQueues,
store storage.Database,
) *KeyChangeConsumer {
c := &KeyChangeConsumer{
consumer: &internal.ContinualConsumer{
Topic: string(cfg.Kafka.Topics.OutputKeyChangeEvent),
Consumer: kafkaConsumer,
PartitionStore: store,
},
queues: queues,
db: store,
serverName: cfg.Matrix.ServerName,
}
c.consumer.ProcessMessage = c.onMessage
return c
}
// Start consuming from key servers
func (t *KeyChangeConsumer) Start() error {
if err := t.consumer.Start(); err != nil {
return fmt.Errorf("t.consumer.Start: %w", err)
}
return nil
}
// onMessage is called in response to a message received on the
// key change events topic from the key server.
func (t *KeyChangeConsumer) onMessage(msg *sarama.ConsumerMessage) error {
var m api.DeviceMessage
if err := json.Unmarshal(msg.Value, &m); err != nil {
log.WithError(err).Errorf("failed to read device message from key change topic")
return nil
}
// only send key change events which originated from us
_, originServerName, err := gomatrixserverlib.SplitID('@', m.UserID)
if err != nil {
log.WithError(err).WithField("user_id", m.UserID).Error("Failed to extract domain from key change event")
return nil
}
if originServerName != t.serverName {
return nil
}
// TODO: send this key change to all users who share rooms with this user.
var destinations []gomatrixserverlib.ServerName
// Pack the EDU and marshal it
edu := &gomatrixserverlib.EDU{
Type: gomatrixserverlib.MDeviceListUpdate,
Origin: string(t.serverName),
}
event := gomatrixserverlib.DeviceListUpdateEvent{
UserID: m.UserID,
DeviceID: m.DeviceID,
DeviceDisplayName: m.DisplayName,
StreamID: m.StreamID,
PrevID: prevID(m.StreamID),
Deleted: len(m.KeyJSON) == 0,
Keys: m.KeyJSON,
}
if edu.Content, err = json.Marshal(event); err != nil {
return err
}
log.Infof("Sending device list update message to %q", destinations)
return t.queues.SendEDU(edu, t.serverName, destinations)
}
func prevID(streamID int) []int {
if streamID <= 1 {
return nil
}
return []int{streamID - 1}
}

2
go.mod
View file

@ -21,7 +21,7 @@ require (
github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4
github.com/matrix-org/go-sqlite3-js v0.0.0-20200522092705-bc8506ccbcf3
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26
github.com/matrix-org/gomatrixserverlib v0.0.0-20200722124340-16fba816840d
github.com/matrix-org/gomatrixserverlib v0.0.0-20200803165739-3bd1ef0f0852
github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7
github.com/mattn/go-sqlite3 v2.0.2+incompatible

4
go.sum
View file

@ -425,6 +425,10 @@ github.com/matrix-org/gomatrixserverlib v0.0.0-20200721145051-cea6eafced2b h1:ul
github.com/matrix-org/gomatrixserverlib v0.0.0-20200721145051-cea6eafced2b/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200722124340-16fba816840d h1:WZXyd8YI+PQIDYjN8HxtqNRJ1DCckt9wPTi2P8cdnKM=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200722124340-16fba816840d/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200803165250-352235625587 h1:n2IZkm5LI4lACulOa5WU6QwWUhHUtBZez7YIFr1fCOs=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200803165250-352235625587/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200803165739-3bd1ef0f0852 h1:OBvHjLWaT2KS9kGarX2ES0yKBL/wMxAeQB39tRrAAls=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200803165739-3bd1ef0f0852/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU=
github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f h1:pRz4VTiRCO4zPlEMc3ESdUOcW4PXHH4Kj+YDz1XyE+Y=
github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f/go.mod h1:y0oDTjZDv5SM9a2rp3bl+CU+bvTRINQsdb7YlDql5Go=
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 h1:ntrLa/8xVzeSs8vHFHK25k0C+NV74sYMJnNSg5NoSRo=