Add InputDeviceListUpdate
to the keyserver, remove old input API (#2536)
* Add `InputDeviceListUpdate` to the keyserver, remove old input API * Fix copyright * Log more information when a device list update fails
This commit is contained in:
parent
1b90cc9536
commit
7120eb6bc9
|
@ -63,6 +63,7 @@ func AddPublicRoutes(
|
||||||
TopicSendToDeviceEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
|
TopicSendToDeviceEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
|
||||||
TopicTypingEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent),
|
TopicTypingEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent),
|
||||||
TopicPresenceEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent),
|
TopicPresenceEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent),
|
||||||
|
TopicDeviceListUpdate: cfg.Matrix.JetStream.Prefixed(jetstream.InputDeviceListUpdate),
|
||||||
ServerName: cfg.Matrix.ServerName,
|
ServerName: cfg.Matrix.ServerName,
|
||||||
UserAPI: userAPI,
|
UserAPI: userAPI,
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,6 +17,7 @@ package producers
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -34,6 +35,7 @@ type SyncAPIProducer struct {
|
||||||
TopicSendToDeviceEvent string
|
TopicSendToDeviceEvent string
|
||||||
TopicTypingEvent string
|
TopicTypingEvent string
|
||||||
TopicPresenceEvent string
|
TopicPresenceEvent string
|
||||||
|
TopicDeviceListUpdate string
|
||||||
JetStream nats.JetStreamContext
|
JetStream nats.JetStreamContext
|
||||||
ServerName gomatrixserverlib.ServerName
|
ServerName gomatrixserverlib.ServerName
|
||||||
UserAPI userapi.UserInternalAPI
|
UserAPI userapi.UserInternalAPI
|
||||||
|
@ -161,3 +163,18 @@ func (p *SyncAPIProducer) SendPresence(
|
||||||
_, err := p.JetStream.PublishMsg(m, nats.Context(ctx))
|
_, err := p.JetStream.PublishMsg(m, nats.Context(ctx))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *SyncAPIProducer) SendDeviceListUpdate(
|
||||||
|
ctx context.Context, deviceListUpdate *gomatrixserverlib.DeviceListUpdateEvent,
|
||||||
|
) (err error) {
|
||||||
|
m := nats.NewMsg(p.TopicDeviceListUpdate)
|
||||||
|
m.Header.Set(jetstream.UserID, deviceListUpdate.UserID)
|
||||||
|
m.Data, err = json.Marshal(deviceListUpdate)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("json.Marshal: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Debugf("Sending device list update: %+v", m.Header)
|
||||||
|
_, err = p.JetStream.PublishMsg(m, nats.Context(ctx))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
|
@ -501,11 +501,7 @@ func (t *txnReq) processDeviceListUpdate(ctx context.Context, e gomatrixserverli
|
||||||
} else if serverName != t.Origin {
|
} else if serverName != t.Origin {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
var inputRes keyapi.InputDeviceListUpdateResponse
|
if err := t.producer.SendDeviceListUpdate(ctx, &payload); err != nil {
|
||||||
t.keyAPI.InputDeviceListUpdate(context.Background(), &keyapi.InputDeviceListUpdateRequest{
|
util.GetLogger(ctx).WithError(err).WithField("user_id", payload.UserID).Error("failed to InputDeviceListUpdate")
|
||||||
Event: payload,
|
|
||||||
}, &inputRes)
|
|
||||||
if inputRes.Error != nil {
|
|
||||||
util.GetLogger(ctx).WithError(inputRes.Error).WithField("user_id", payload.UserID).Error("failed to InputDeviceListUpdate")
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -62,8 +62,6 @@ type FederationKeyAPI interface {
|
||||||
QueryKeys(ctx context.Context, req *QueryKeysRequest, res *QueryKeysResponse)
|
QueryKeys(ctx context.Context, req *QueryKeysRequest, res *QueryKeysResponse)
|
||||||
QuerySignatures(ctx context.Context, req *QuerySignaturesRequest, res *QuerySignaturesResponse)
|
QuerySignatures(ctx context.Context, req *QuerySignaturesRequest, res *QuerySignaturesResponse)
|
||||||
QueryDeviceMessages(ctx context.Context, req *QueryDeviceMessagesRequest, res *QueryDeviceMessagesResponse)
|
QueryDeviceMessages(ctx context.Context, req *QueryDeviceMessagesRequest, res *QueryDeviceMessagesResponse)
|
||||||
// InputDeviceListUpdate from a federated server EDU
|
|
||||||
InputDeviceListUpdate(ctx context.Context, req *InputDeviceListUpdateRequest, res *InputDeviceListUpdateResponse)
|
|
||||||
PerformUploadDeviceKeys(ctx context.Context, req *PerformUploadDeviceKeysRequest, res *PerformUploadDeviceKeysResponse)
|
PerformUploadDeviceKeys(ctx context.Context, req *PerformUploadDeviceKeysRequest, res *PerformUploadDeviceKeysResponse)
|
||||||
PerformClaimKeys(ctx context.Context, req *PerformClaimKeysRequest, res *PerformClaimKeysResponse)
|
PerformClaimKeys(ctx context.Context, req *PerformClaimKeysRequest, res *PerformClaimKeysResponse)
|
||||||
}
|
}
|
||||||
|
@ -337,11 +335,3 @@ type QuerySignaturesResponse struct {
|
||||||
// The request error, if any
|
// The request error, if any
|
||||||
Error *KeyError
|
Error *KeyError
|
||||||
}
|
}
|
||||||
|
|
||||||
type InputDeviceListUpdateRequest struct {
|
|
||||||
Event gomatrixserverlib.DeviceListUpdateEvent
|
|
||||||
}
|
|
||||||
|
|
||||||
type InputDeviceListUpdateResponse struct {
|
|
||||||
Error *KeyError
|
|
||||||
}
|
|
||||||
|
|
82
keyserver/consumers/devicelistupdate.go
Normal file
82
keyserver/consumers/devicelistupdate.go
Normal file
|
@ -0,0 +1,82 @@
|
||||||
|
// Copyright 2022 The Matrix.org Foundation C.I.C.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package consumers
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/keyserver/internal"
|
||||||
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
|
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||||
|
"github.com/matrix-org/dendrite/setup/process"
|
||||||
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
"github.com/nats-io/nats.go"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
// DeviceListUpdateConsumer consumes device list updates that came in over federation.
|
||||||
|
type DeviceListUpdateConsumer struct {
|
||||||
|
ctx context.Context
|
||||||
|
jetstream nats.JetStreamContext
|
||||||
|
durable string
|
||||||
|
topic string
|
||||||
|
updater *internal.DeviceListUpdater
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewDeviceListUpdateConsumer creates a new DeviceListConsumer. Call Start() to begin consuming from key servers.
|
||||||
|
func NewDeviceListUpdateConsumer(
|
||||||
|
process *process.ProcessContext,
|
||||||
|
cfg *config.KeyServer,
|
||||||
|
js nats.JetStreamContext,
|
||||||
|
updater *internal.DeviceListUpdater,
|
||||||
|
) *DeviceListUpdateConsumer {
|
||||||
|
return &DeviceListUpdateConsumer{
|
||||||
|
ctx: process.Context(),
|
||||||
|
jetstream: js,
|
||||||
|
durable: cfg.Matrix.JetStream.Prefixed("KeyServerInputDeviceListConsumer"),
|
||||||
|
topic: cfg.Matrix.JetStream.Prefixed(jetstream.InputDeviceListUpdate),
|
||||||
|
updater: updater,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start consuming from key servers
|
||||||
|
func (t *DeviceListUpdateConsumer) Start() error {
|
||||||
|
return jetstream.JetStreamConsumer(
|
||||||
|
t.ctx, t.jetstream, t.topic, t.durable, t.onMessage,
|
||||||
|
nats.DeliverAll(), nats.ManualAck(),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// onMessage is called in response to a message received on the
|
||||||
|
// key change events topic from the key server.
|
||||||
|
func (t *DeviceListUpdateConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
|
||||||
|
var m gomatrixserverlib.DeviceListUpdateEvent
|
||||||
|
if err := json.Unmarshal(msg.Data, &m); err != nil {
|
||||||
|
logrus.WithError(err).Errorf("Failed to read from device list update input topic")
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
err := t.updater.Update(ctx, m)
|
||||||
|
if err != nil {
|
||||||
|
logrus.WithFields(logrus.Fields{
|
||||||
|
"user_id": m.UserID,
|
||||||
|
"device_id": m.DeviceID,
|
||||||
|
"stream_id": m.StreamID,
|
||||||
|
"prev_id": m.PrevID,
|
||||||
|
}).WithError(err).Errorf("Failed to update device list")
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
|
@ -47,17 +47,6 @@ func (a *KeyInternalAPI) SetUserAPI(i userapi.KeyserverUserAPI) {
|
||||||
a.UserAPI = i
|
a.UserAPI = i
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *KeyInternalAPI) InputDeviceListUpdate(
|
|
||||||
ctx context.Context, req *api.InputDeviceListUpdateRequest, res *api.InputDeviceListUpdateResponse,
|
|
||||||
) {
|
|
||||||
err := a.Updater.Update(ctx, req.Event)
|
|
||||||
if err != nil {
|
|
||||||
res.Error = &api.KeyError{
|
|
||||||
Err: fmt.Sprintf("failed to update device list: %s", err),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a *KeyInternalAPI) QueryKeyChanges(ctx context.Context, req *api.QueryKeyChangesRequest, res *api.QueryKeyChangesResponse) {
|
func (a *KeyInternalAPI) QueryKeyChanges(ctx context.Context, req *api.QueryKeyChangesRequest, res *api.QueryKeyChangesResponse) {
|
||||||
userIDs, latest, err := a.DB.KeyChanges(ctx, req.Offset, req.ToOffset)
|
userIDs, latest, err := a.DB.KeyChanges(ctx, req.Offset, req.ToOffset)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -63,20 +63,6 @@ type httpKeyInternalAPI struct {
|
||||||
func (h *httpKeyInternalAPI) SetUserAPI(i userapi.KeyserverUserAPI) {
|
func (h *httpKeyInternalAPI) SetUserAPI(i userapi.KeyserverUserAPI) {
|
||||||
// no-op: doesn't need it
|
// no-op: doesn't need it
|
||||||
}
|
}
|
||||||
func (h *httpKeyInternalAPI) InputDeviceListUpdate(
|
|
||||||
ctx context.Context, req *api.InputDeviceListUpdateRequest, res *api.InputDeviceListUpdateResponse,
|
|
||||||
) {
|
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "InputDeviceListUpdate")
|
|
||||||
defer span.Finish()
|
|
||||||
|
|
||||||
apiURL := h.apiURL + InputDeviceListUpdatePath
|
|
||||||
err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, req, res)
|
|
||||||
if err != nil {
|
|
||||||
res.Error = &api.KeyError{
|
|
||||||
Err: err.Error(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *httpKeyInternalAPI) PerformClaimKeys(
|
func (h *httpKeyInternalAPI) PerformClaimKeys(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
|
|
|
@ -25,17 +25,6 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func AddRoutes(internalAPIMux *mux.Router, s api.KeyInternalAPI) {
|
func AddRoutes(internalAPIMux *mux.Router, s api.KeyInternalAPI) {
|
||||||
internalAPIMux.Handle(InputDeviceListUpdatePath,
|
|
||||||
httputil.MakeInternalAPI("inputDeviceListUpdate", func(req *http.Request) util.JSONResponse {
|
|
||||||
request := api.InputDeviceListUpdateRequest{}
|
|
||||||
response := api.InputDeviceListUpdateResponse{}
|
|
||||||
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
|
|
||||||
return util.MessageResponse(http.StatusBadRequest, err.Error())
|
|
||||||
}
|
|
||||||
s.InputDeviceListUpdate(req.Context(), &request, &response)
|
|
||||||
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
|
||||||
}),
|
|
||||||
)
|
|
||||||
internalAPIMux.Handle(PerformClaimKeysPath,
|
internalAPIMux.Handle(PerformClaimKeysPath,
|
||||||
httputil.MakeInternalAPI("performClaimKeys", func(req *http.Request) util.JSONResponse {
|
httputil.MakeInternalAPI("performClaimKeys", func(req *http.Request) util.JSONResponse {
|
||||||
request := api.PerformClaimKeysRequest{}
|
request := api.PerformClaimKeysRequest{}
|
||||||
|
|
|
@ -18,6 +18,7 @@ import (
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
fedsenderapi "github.com/matrix-org/dendrite/federationapi/api"
|
fedsenderapi "github.com/matrix-org/dendrite/federationapi/api"
|
||||||
"github.com/matrix-org/dendrite/keyserver/api"
|
"github.com/matrix-org/dendrite/keyserver/api"
|
||||||
|
"github.com/matrix-org/dendrite/keyserver/consumers"
|
||||||
"github.com/matrix-org/dendrite/keyserver/internal"
|
"github.com/matrix-org/dendrite/keyserver/internal"
|
||||||
"github.com/matrix-org/dendrite/keyserver/inthttp"
|
"github.com/matrix-org/dendrite/keyserver/inthttp"
|
||||||
"github.com/matrix-org/dendrite/keyserver/producers"
|
"github.com/matrix-org/dendrite/keyserver/producers"
|
||||||
|
@ -59,10 +60,17 @@ func NewInternalAPI(
|
||||||
updater := internal.NewDeviceListUpdater(db, ap, keyChangeProducer, fedClient, 8) // 8 workers TODO: configurable
|
updater := internal.NewDeviceListUpdater(db, ap, keyChangeProducer, fedClient, 8) // 8 workers TODO: configurable
|
||||||
ap.Updater = updater
|
ap.Updater = updater
|
||||||
go func() {
|
go func() {
|
||||||
if err := updater.Start(); err != nil {
|
if err = updater.Start(); err != nil {
|
||||||
logrus.WithError(err).Panicf("failed to start device list updater")
|
logrus.WithError(err).Panicf("failed to start device list updater")
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
dlConsumer := consumers.NewDeviceListUpdateConsumer(
|
||||||
|
base.ProcessContext, cfg, js, updater,
|
||||||
|
)
|
||||||
|
if err = dlConsumer.Start(); err != nil {
|
||||||
|
logrus.WithError(err).Panic("failed to start device list consumer")
|
||||||
|
}
|
||||||
|
|
||||||
return ap
|
return ap
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,6 +16,7 @@ const (
|
||||||
|
|
||||||
var (
|
var (
|
||||||
InputRoomEvent = "InputRoomEvent"
|
InputRoomEvent = "InputRoomEvent"
|
||||||
|
InputDeviceListUpdate = "InputDeviceListUpdate"
|
||||||
OutputRoomEvent = "OutputRoomEvent"
|
OutputRoomEvent = "OutputRoomEvent"
|
||||||
OutputSendToDeviceEvent = "OutputSendToDeviceEvent"
|
OutputSendToDeviceEvent = "OutputSendToDeviceEvent"
|
||||||
OutputKeyChangeEvent = "OutputKeyChangeEvent"
|
OutputKeyChangeEvent = "OutputKeyChangeEvent"
|
||||||
|
@ -45,6 +46,11 @@ var streams = []*nats.StreamConfig{
|
||||||
Retention: nats.InterestPolicy,
|
Retention: nats.InterestPolicy,
|
||||||
Storage: nats.FileStorage,
|
Storage: nats.FileStorage,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
Name: InputDeviceListUpdate,
|
||||||
|
Retention: nats.InterestPolicy,
|
||||||
|
Storage: nats.FileStorage,
|
||||||
|
},
|
||||||
{
|
{
|
||||||
Name: OutputRoomEvent,
|
Name: OutputRoomEvent,
|
||||||
Retention: nats.InterestPolicy,
|
Retention: nats.InterestPolicy,
|
||||||
|
|
|
@ -43,9 +43,6 @@ func (k *mockKeyAPI) QueryOneTimeKeys(ctx context.Context, req *keyapi.QueryOneT
|
||||||
}
|
}
|
||||||
func (k *mockKeyAPI) QueryDeviceMessages(ctx context.Context, req *keyapi.QueryDeviceMessagesRequest, res *keyapi.QueryDeviceMessagesResponse) {
|
func (k *mockKeyAPI) QueryDeviceMessages(ctx context.Context, req *keyapi.QueryDeviceMessagesRequest, res *keyapi.QueryDeviceMessagesResponse) {
|
||||||
|
|
||||||
}
|
|
||||||
func (k *mockKeyAPI) InputDeviceListUpdate(ctx context.Context, req *keyapi.InputDeviceListUpdateRequest, res *keyapi.InputDeviceListUpdateResponse) {
|
|
||||||
|
|
||||||
}
|
}
|
||||||
func (k *mockKeyAPI) QuerySignatures(ctx context.Context, req *keyapi.QuerySignaturesRequest, res *keyapi.QuerySignaturesResponse) {
|
func (k *mockKeyAPI) QuerySignatures(ctx context.Context, req *keyapi.QuerySignaturesRequest, res *keyapi.QuerySignaturesResponse) {
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue