Merge branch 'master' into neilalexander/config

This commit is contained in:
Neil Alexander 2020-08-04 14:04:04 +01:00
commit 3ec01bdb63
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
25 changed files with 344 additions and 50 deletions

View file

@ -131,17 +131,15 @@ func (m *DendriteMonolith) Start() {
) )
asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI) asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI)
stateAPI := currentstateserver.NewInternalAPI(base.Cfg, base.KafkaConsumer)
fsAPI := federationsender.NewInternalAPI( fsAPI := federationsender.NewInternalAPI(
base, federation, rsAPI, keyRing, base, federation, rsAPI, stateAPI, keyRing,
) )
// The underlying roomserver implementation needs to be able to call the fedsender. // The underlying roomserver implementation needs to be able to call the fedsender.
// This is different to rsAPI which can be the http client which doesn't need this dependency // This is different to rsAPI which can be the http client which doesn't need this dependency
rsAPI.SetFederationSenderAPI(fsAPI) rsAPI.SetFederationSenderAPI(fsAPI)
stateAPI := currentstateserver.NewInternalAPI(&base.Cfg.CurrentStateServer, base.KafkaConsumer)
monolith := setup.Monolith{ monolith := setup.Monolith{
Config: base.Cfg, Config: base.Cfg,
AccountDB: accountDB, AccountDB: accountDB,

View file

@ -156,6 +156,7 @@ func main() {
base, serverKeyAPI, base, serverKeyAPI,
) )
stateAPI := currentstateserver.NewInternalAPI(&base.Base.Cfg.CurrentStateServer, base.Base.KafkaConsumer)
rsAPI := roomserver.NewInternalAPI( rsAPI := roomserver.NewInternalAPI(
&base.Base, keyRing, federation, &base.Base, keyRing, federation,
) )
@ -164,10 +165,9 @@ func main() {
) )
asAPI := appservice.NewInternalAPI(&base.Base, userAPI, rsAPI) asAPI := appservice.NewInternalAPI(&base.Base, userAPI, rsAPI)
fsAPI := federationsender.NewInternalAPI( fsAPI := federationsender.NewInternalAPI(
&base.Base, federation, rsAPI, keyRing, &base.Base, federation, rsAPI, stateAPI, keyRing,
) )
rsAPI.SetFederationSenderAPI(fsAPI) rsAPI.SetFederationSenderAPI(fsAPI)
stateAPI := currentstateserver.NewInternalAPI(&base.Base.Cfg.CurrentStateServer, base.Base.KafkaConsumer)
provider := newPublicRoomsProvider(base.LibP2PPubsub, rsAPI, stateAPI) provider := newPublicRoomsProvider(base.LibP2PPubsub, rsAPI, stateAPI)
err = provider.Start() err = provider.Start()
if err != nil { if err != nil {

View file

@ -119,17 +119,15 @@ func main() {
) )
asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI) asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI)
stateAPI := currentstateserver.NewInternalAPI(&base.Cfg.CurrentStateServer, base.KafkaConsumer)
fsAPI := federationsender.NewInternalAPI( fsAPI := federationsender.NewInternalAPI(
base, federation, rsAPI, keyRing, base, federation, rsAPI, stateAPI, keyRing,
) )
rsComponent.SetFederationSenderAPI(fsAPI) rsComponent.SetFederationSenderAPI(fsAPI)
embed.Embed(base.BaseMux, *instancePort, "Yggdrasil Demo") embed.Embed(base.BaseMux, *instancePort, "Yggdrasil Demo")
stateAPI := currentstateserver.NewInternalAPI(&base.Cfg.CurrentStateServer, base.KafkaConsumer)
monolith := setup.Monolith{ monolith := setup.Monolith{
Config: base.Cfg, Config: base.Cfg,
AccountDB: accountDB, AccountDB: accountDB,

View file

@ -31,7 +31,7 @@ func main() {
rsAPI := base.RoomserverHTTPClient() rsAPI := base.RoomserverHTTPClient()
fsAPI := federationsender.NewInternalAPI( fsAPI := federationsender.NewInternalAPI(
base, federation, rsAPI, keyRing, base, federation, rsAPI, base.CurrentStateAPIClient(), keyRing,
) )
federationsender.AddInternalRoutes(base.InternalAPIMux, fsAPI) federationsender.AddInternalRoutes(base.InternalAPIMux, fsAPI)

View file

@ -111,8 +111,10 @@ func main() {
asAPI = base.AppserviceHTTPClient() asAPI = base.AppserviceHTTPClient()
} }
stateAPI := currentstateserver.NewInternalAPI(&base.Cfg.CurrentStateServer, base.KafkaConsumer)
fsAPI := federationsender.NewInternalAPI( fsAPI := federationsender.NewInternalAPI(
base, federation, rsAPI, keyRing, base, federation, rsAPI, stateAPI, keyRing,
) )
if base.UseHTTPAPIs { if base.UseHTTPAPIs {
federationsender.AddInternalRoutes(base.InternalAPIMux, fsAPI) federationsender.AddInternalRoutes(base.InternalAPIMux, fsAPI)
@ -122,8 +124,6 @@ func main() {
// This is different to rsAPI which can be the http client which doesn't need this dependency // This is different to rsAPI which can be the http client which doesn't need this dependency
rsImpl.SetFederationSenderAPI(fsAPI) rsImpl.SetFederationSenderAPI(fsAPI)
stateAPI := currentstateserver.NewInternalAPI(&base.Cfg.CurrentStateServer, base.KafkaConsumer)
monolith := setup.Monolith{ monolith := setup.Monolith{
Config: base.Cfg, Config: base.Cfg,
AccountDB: accountDB, AccountDB: accountDB,

View file

@ -208,17 +208,16 @@ func main() {
KeyDatabase: fetcher, KeyDatabase: fetcher,
} }
stateAPI := currentstateserver.NewInternalAPI(&base.Cfg.CurrentStateServer, base.KafkaConsumer)
rsAPI := roomserver.NewInternalAPI(base, keyRing, federation) rsAPI := roomserver.NewInternalAPI(base, keyRing, federation)
eduInputAPI := eduserver.NewInternalAPI(base, cache.New(), userAPI) eduInputAPI := eduserver.NewInternalAPI(base, cache.New(), userAPI)
asQuery := appservice.NewInternalAPI( asQuery := appservice.NewInternalAPI(
base, userAPI, rsAPI, base, userAPI, rsAPI,
) )
fedSenderAPI := federationsender.NewInternalAPI(base, federation, rsAPI, &keyRing) fedSenderAPI := federationsender.NewInternalAPI(base, federation, rsAPI, stateAPI, &keyRing)
rsAPI.SetFederationSenderAPI(fedSenderAPI) rsAPI.SetFederationSenderAPI(fedSenderAPI)
p2pPublicRoomProvider := NewLibP2PPublicRoomsProvider(node, fedSenderAPI, federation) p2pPublicRoomProvider := NewLibP2PPublicRoomsProvider(node, fedSenderAPI, federation)
stateAPI := currentstateserver.NewInternalAPI(&base.Cfg.CurrentStateServer, base.KafkaConsumer)
monolith := setup.Monolith{ monolith := setup.Monolith{
Config: base.Cfg, Config: base.Cfg,
AccountDB: accountDB, AccountDB: accountDB,

View file

@ -13,10 +13,11 @@
package routing package routing
import ( import (
"encoding/json"
"net/http" "net/http"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
userapi "github.com/matrix-org/dendrite/userapi/api" keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
) )
@ -24,30 +25,35 @@ import (
// GetUserDevices for the given user id // GetUserDevices for the given user id
func GetUserDevices( func GetUserDevices(
req *http.Request, req *http.Request,
userAPI userapi.UserInternalAPI, keyAPI keyapi.KeyInternalAPI,
userID string, userID string,
) util.JSONResponse { ) util.JSONResponse {
response := gomatrixserverlib.RespUserDevices{ var res keyapi.QueryDeviceMessagesResponse
UserID: userID, keyAPI.QueryDeviceMessages(req.Context(), &keyapi.QueryDeviceMessagesRequest{
// TODO: we should return an incrementing stream ID each time the device
// list changes for delta changes to be recognised
StreamID: 0,
}
var res userapi.QueryDevicesResponse
err := userAPI.QueryDevices(req.Context(), &userapi.QueryDevicesRequest{
UserID: userID, UserID: userID,
}, &res) }, &res)
if err != nil { if res.Error != nil {
util.GetLogger(req.Context()).WithError(err).Error("userAPI.QueryDevices failed") util.GetLogger(req.Context()).WithError(res.Error).Error("keyAPI.QueryDeviceMessages failed")
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
} }
response := gomatrixserverlib.RespUserDevices{
UserID: userID,
StreamID: res.StreamID,
}
for _, dev := range res.Devices { for _, dev := range res.Devices {
var key gomatrixserverlib.RespUserDeviceKeys
err := json.Unmarshal(dev.DeviceKeys.KeyJSON, &key)
if err != nil {
util.GetLogger(req.Context()).WithError(err).Warnf("malformed device key: %s", string(dev.DeviceKeys.KeyJSON))
continue
}
device := gomatrixserverlib.RespUserDevice{ device := gomatrixserverlib.RespUserDevice{
DeviceID: dev.ID, DeviceID: dev.DeviceID,
DisplayName: dev.DisplayName, DisplayName: dev.DisplayName,
Keys: []gomatrixserverlib.RespUserDeviceKeys{}, Keys: key,
} }
response.Devices = append(response.Devices, device) response.Devices = append(response.Devices, device)
} }

View file

@ -186,7 +186,7 @@ func Setup(
"federation_user_devices", cfg.Matrix.ServerName, keys, wakeup, "federation_user_devices", cfg.Matrix.ServerName, keys, wakeup,
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest, vars map[string]string) util.JSONResponse { func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest, vars map[string]string) util.JSONResponse {
return GetUserDevices( return GetUserDevices(
httpReq, userAPI, vars["userID"], httpReq, keyAPI, vars["userID"],
) )
}, },
)).Methods(http.MethodGet) )).Methods(http.MethodGet)

View file

@ -1,3 +1,5 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
// You may obtain a copy of the License at // You may obtain a copy of the License at

View file

@ -0,0 +1,137 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package consumers
import (
"context"
"encoding/json"
"fmt"
"github.com/Shopify/sarama"
stateapi "github.com/matrix-org/dendrite/currentstateserver/api"
"github.com/matrix-org/dendrite/federationsender/queue"
"github.com/matrix-org/dendrite/federationsender/storage"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus"
)
// KeyChangeConsumer consumes events that originate in key server.
type KeyChangeConsumer struct {
consumer *internal.ContinualConsumer
db storage.Database
queues *queue.OutgoingQueues
serverName gomatrixserverlib.ServerName
stateAPI stateapi.CurrentStateInternalAPI
}
// NewKeyChangeConsumer creates a new KeyChangeConsumer. Call Start() to begin consuming from key servers.
func NewKeyChangeConsumer(
cfg *config.Dendrite,
kafkaConsumer sarama.Consumer,
queues *queue.OutgoingQueues,
store storage.Database,
stateAPI stateapi.CurrentStateInternalAPI,
) *KeyChangeConsumer {
c := &KeyChangeConsumer{
consumer: &internal.ContinualConsumer{
Topic: string(cfg.Kafka.Topics.OutputKeyChangeEvent),
Consumer: kafkaConsumer,
PartitionStore: store,
},
queues: queues,
db: store,
serverName: cfg.Matrix.ServerName,
stateAPI: stateAPI,
}
c.consumer.ProcessMessage = c.onMessage
return c
}
// Start consuming from key servers
func (t *KeyChangeConsumer) Start() error {
if err := t.consumer.Start(); err != nil {
return fmt.Errorf("t.consumer.Start: %w", err)
}
return nil
}
// onMessage is called in response to a message received on the
// key change events topic from the key server.
func (t *KeyChangeConsumer) onMessage(msg *sarama.ConsumerMessage) error {
var m api.DeviceMessage
if err := json.Unmarshal(msg.Value, &m); err != nil {
log.WithError(err).Errorf("failed to read device message from key change topic")
return nil
}
logger := log.WithField("user_id", m.UserID)
// only send key change events which originated from us
_, originServerName, err := gomatrixserverlib.SplitID('@', m.UserID)
if err != nil {
logger.WithError(err).Error("Failed to extract domain from key change event")
return nil
}
if originServerName != t.serverName {
return nil
}
var queryRes stateapi.QueryRoomsForUserResponse
err = t.stateAPI.QueryRoomsForUser(context.Background(), &stateapi.QueryRoomsForUserRequest{
UserID: m.UserID,
WantMembership: "join",
}, &queryRes)
if err != nil {
logger.WithError(err).Error("failed to calculate joined rooms for user")
return nil
}
// send this key change to all servers who share rooms with this user.
destinations, err := t.db.GetJoinedHostsForRooms(context.Background(), queryRes.RoomIDs)
if err != nil {
logger.WithError(err).Error("failed to calculate joined hosts for rooms user is in")
return nil
}
// Pack the EDU and marshal it
edu := &gomatrixserverlib.EDU{
Type: gomatrixserverlib.MDeviceListUpdate,
Origin: string(t.serverName),
}
event := gomatrixserverlib.DeviceListUpdateEvent{
UserID: m.UserID,
DeviceID: m.DeviceID,
DeviceDisplayName: m.DisplayName,
StreamID: m.StreamID,
PrevID: prevID(m.StreamID),
Deleted: len(m.KeyJSON) == 0,
Keys: m.KeyJSON,
}
if edu.Content, err = json.Marshal(event); err != nil {
return err
}
log.Infof("Sending device list update message to %q", destinations)
return t.queues.SendEDU(edu, t.serverName, destinations)
}
func prevID(streamID int) []int {
if streamID <= 1 {
return nil
}
return []int{streamID - 1}
}

View file

@ -16,6 +16,7 @@ package federationsender
import ( import (
"github.com/gorilla/mux" "github.com/gorilla/mux"
stateapi "github.com/matrix-org/dendrite/currentstateserver/api"
"github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/federationsender/consumers" "github.com/matrix-org/dendrite/federationsender/consumers"
"github.com/matrix-org/dendrite/federationsender/internal" "github.com/matrix-org/dendrite/federationsender/internal"
@ -41,6 +42,7 @@ func NewInternalAPI(
base *setup.BaseDendrite, base *setup.BaseDendrite,
federation *gomatrixserverlib.FederationClient, federation *gomatrixserverlib.FederationClient,
rsAPI roomserverAPI.RoomserverInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI,
stateAPI stateapi.CurrentStateInternalAPI,
keyRing *gomatrixserverlib.KeyRing, keyRing *gomatrixserverlib.KeyRing,
) api.FederationSenderInternalAPI { ) api.FederationSenderInternalAPI {
cfg := &base.Cfg.FederationSender cfg := &base.Cfg.FederationSender
@ -78,6 +80,12 @@ func NewInternalAPI(
if err := tsConsumer.Start(); err != nil { if err := tsConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start typing server consumer") logrus.WithError(err).Panic("failed to start typing server consumer")
} }
keyConsumer := consumers.NewKeyChangeConsumer(
base.Cfg, base.KafkaConsumer, queues, federationSenderDB, stateAPI,
)
if err := keyConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start key server consumer")
}
return internal.NewFederationSenderInternalAPI(federationSenderDB, cfg, rsAPI, federation, keyRing, stats, queues) return internal.NewFederationSenderInternalAPI(federationSenderDB, cfg, rsAPI, federation, keyRing, stats, queues)
} }

View file

@ -30,6 +30,8 @@ type Database interface {
GetJoinedHosts(ctx context.Context, roomID string) ([]types.JoinedHost, error) GetJoinedHosts(ctx context.Context, roomID string) ([]types.JoinedHost, error)
GetAllJoinedHosts(ctx context.Context) ([]gomatrixserverlib.ServerName, error) GetAllJoinedHosts(ctx context.Context) ([]gomatrixserverlib.ServerName, error)
// GetJoinedHostsForRooms returns the complete set of servers in the rooms given.
GetJoinedHostsForRooms(ctx context.Context, roomIDs []string) ([]gomatrixserverlib.ServerName, error)
StoreJSON(ctx context.Context, js string) (*shared.Receipt, error) StoreJSON(ctx context.Context, js string) (*shared.Receipt, error)

View file

@ -60,12 +60,16 @@ const selectJoinedHostsSQL = "" +
const selectAllJoinedHostsSQL = "" + const selectAllJoinedHostsSQL = "" +
"SELECT DISTINCT server_name FROM federationsender_joined_hosts" "SELECT DISTINCT server_name FROM federationsender_joined_hosts"
const selectJoinedHostsForRoomsSQL = "" +
"SELECT DISTINCT server_name FROM federationsender_joined_hosts WHERE room_id = ANY($1)"
type joinedHostsStatements struct { type joinedHostsStatements struct {
db *sql.DB db *sql.DB
insertJoinedHostsStmt *sql.Stmt insertJoinedHostsStmt *sql.Stmt
deleteJoinedHostsStmt *sql.Stmt deleteJoinedHostsStmt *sql.Stmt
selectJoinedHostsStmt *sql.Stmt selectJoinedHostsStmt *sql.Stmt
selectAllJoinedHostsStmt *sql.Stmt selectAllJoinedHostsStmt *sql.Stmt
selectJoinedHostsForRoomsStmt *sql.Stmt
} }
func NewPostgresJoinedHostsTable(db *sql.DB) (s *joinedHostsStatements, err error) { func NewPostgresJoinedHostsTable(db *sql.DB) (s *joinedHostsStatements, err error) {
@ -88,6 +92,9 @@ func NewPostgresJoinedHostsTable(db *sql.DB) (s *joinedHostsStatements, err erro
if s.selectAllJoinedHostsStmt, err = s.db.Prepare(selectAllJoinedHostsSQL); err != nil { if s.selectAllJoinedHostsStmt, err = s.db.Prepare(selectAllJoinedHostsSQL); err != nil {
return return
} }
if s.selectJoinedHostsForRoomsStmt, err = s.db.Prepare(selectJoinedHostsForRoomsSQL); err != nil {
return
}
return return
} }
@ -144,6 +151,27 @@ func (s *joinedHostsStatements) SelectAllJoinedHosts(
return result, rows.Err() return result, rows.Err()
} }
func (s *joinedHostsStatements) SelectJoinedHostsForRooms(
ctx context.Context, roomIDs []string,
) ([]gomatrixserverlib.ServerName, error) {
rows, err := s.selectJoinedHostsForRoomsStmt.QueryContext(ctx, pq.StringArray(roomIDs))
if err != nil {
return nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "selectJoinedHostsForRoomsStmt: rows.close() failed")
var result []gomatrixserverlib.ServerName
for rows.Next() {
var serverName string
if err = rows.Scan(&serverName); err != nil {
return nil, err
}
result = append(result, gomatrixserverlib.ServerName(serverName))
}
return result, rows.Err()
}
func joinedHostsFromStmt( func joinedHostsFromStmt(
ctx context.Context, stmt *sql.Stmt, roomID string, ctx context.Context, stmt *sql.Stmt, roomID string,
) ([]types.JoinedHost, error) { ) ([]types.JoinedHost, error) {

View file

@ -123,6 +123,10 @@ func (d *Database) GetAllJoinedHosts(ctx context.Context) ([]gomatrixserverlib.S
return d.FederationSenderJoinedHosts.SelectAllJoinedHosts(ctx) return d.FederationSenderJoinedHosts.SelectAllJoinedHosts(ctx)
} }
func (d *Database) GetJoinedHostsForRooms(ctx context.Context, roomIDs []string) ([]gomatrixserverlib.ServerName, error) {
return d.FederationSenderJoinedHosts.SelectJoinedHostsForRooms(ctx, roomIDs)
}
// StoreJSON adds a JSON blob into the queue JSON table and returns // StoreJSON adds a JSON blob into the queue JSON table and returns
// a NID. The NID will then be used when inserting the per-destination // a NID. The NID will then be used when inserting the per-destination
// metadata entries. // metadata entries.

View file

@ -59,6 +59,9 @@ const selectJoinedHostsSQL = "" +
const selectAllJoinedHostsSQL = "" + const selectAllJoinedHostsSQL = "" +
"SELECT DISTINCT server_name FROM federationsender_joined_hosts" "SELECT DISTINCT server_name FROM federationsender_joined_hosts"
const selectJoinedHostsForRoomsSQL = "" +
"SELECT DISTINCT server_name FROM federationsender_joined_hosts WHERE room_id IN ($1)"
type joinedHostsStatements struct { type joinedHostsStatements struct {
db *sql.DB db *sql.DB
writer *sqlutil.TransactionWriter writer *sqlutil.TransactionWriter
@ -66,6 +69,7 @@ type joinedHostsStatements struct {
deleteJoinedHostsStmt *sql.Stmt deleteJoinedHostsStmt *sql.Stmt
selectJoinedHostsStmt *sql.Stmt selectJoinedHostsStmt *sql.Stmt
selectAllJoinedHostsStmt *sql.Stmt selectAllJoinedHostsStmt *sql.Stmt
selectJoinedHostsForRoomsStmt *sql.Stmt
} }
func NewSQLiteJoinedHostsTable(db *sql.DB) (s *joinedHostsStatements, err error) { func NewSQLiteJoinedHostsTable(db *sql.DB) (s *joinedHostsStatements, err error) {
@ -89,6 +93,9 @@ func NewSQLiteJoinedHostsTable(db *sql.DB) (s *joinedHostsStatements, err error)
if s.selectAllJoinedHostsStmt, err = db.Prepare(selectAllJoinedHostsSQL); err != nil { if s.selectAllJoinedHostsStmt, err = db.Prepare(selectAllJoinedHostsSQL); err != nil {
return return
} }
if s.selectJoinedHostsForRoomsStmt, err = db.Prepare(selectJoinedHostsForRoomsSQL); err != nil {
return
}
return return
} }
@ -153,6 +160,32 @@ func (s *joinedHostsStatements) SelectAllJoinedHosts(
return result, rows.Err() return result, rows.Err()
} }
func (s *joinedHostsStatements) SelectJoinedHostsForRooms(
ctx context.Context, roomIDs []string,
) ([]gomatrixserverlib.ServerName, error) {
iRoomIDs := make([]interface{}, len(roomIDs))
for i := range roomIDs {
iRoomIDs[i] = roomIDs[i]
}
rows, err := s.selectJoinedHostsForRoomsStmt.QueryContext(ctx, iRoomIDs...)
if err != nil {
return nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "selectJoinedHostsForRoomsStmt: rows.close() failed")
var result []gomatrixserverlib.ServerName
for rows.Next() {
var serverName string
if err = rows.Scan(&serverName); err != nil {
return nil, err
}
result = append(result, gomatrixserverlib.ServerName(serverName))
}
return result, rows.Err()
}
func joinedHostsFromStmt( func joinedHostsFromStmt(
ctx context.Context, stmt *sql.Stmt, roomID string, ctx context.Context, stmt *sql.Stmt, roomID string,
) ([]types.JoinedHost, error) { ) ([]types.JoinedHost, error) {

View file

@ -53,6 +53,7 @@ type FederationSenderJoinedHosts interface {
SelectJoinedHostsWithTx(ctx context.Context, txn *sql.Tx, roomID string) ([]types.JoinedHost, error) SelectJoinedHostsWithTx(ctx context.Context, txn *sql.Tx, roomID string) ([]types.JoinedHost, error)
SelectJoinedHosts(ctx context.Context, roomID string) ([]types.JoinedHost, error) SelectJoinedHosts(ctx context.Context, roomID string) ([]types.JoinedHost, error)
SelectAllJoinedHosts(ctx context.Context) ([]gomatrixserverlib.ServerName, error) SelectAllJoinedHosts(ctx context.Context) ([]gomatrixserverlib.ServerName, error)
SelectJoinedHostsForRooms(ctx context.Context, roomIDs []string) ([]gomatrixserverlib.ServerName, error)
} }
type FederationSenderRooms interface { type FederationSenderRooms interface {

2
go.mod
View file

@ -23,7 +23,7 @@ require (
github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4 github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4
github.com/matrix-org/go-sqlite3-js v0.0.0-20200522092705-bc8506ccbcf3 github.com/matrix-org/go-sqlite3-js v0.0.0-20200522092705-bc8506ccbcf3
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26
github.com/matrix-org/gomatrixserverlib v0.0.0-20200722124340-16fba816840d github.com/matrix-org/gomatrixserverlib v0.0.0-20200804124807-5012a626de1d
github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7
github.com/mattn/go-sqlite3 v2.0.2+incompatible github.com/mattn/go-sqlite3 v2.0.2+incompatible

8
go.sum
View file

@ -429,6 +429,14 @@ github.com/matrix-org/gomatrixserverlib v0.0.0-20200721145051-cea6eafced2b h1:ul
github.com/matrix-org/gomatrixserverlib v0.0.0-20200721145051-cea6eafced2b/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= github.com/matrix-org/gomatrixserverlib v0.0.0-20200721145051-cea6eafced2b/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200722124340-16fba816840d h1:WZXyd8YI+PQIDYjN8HxtqNRJ1DCckt9wPTi2P8cdnKM= github.com/matrix-org/gomatrixserverlib v0.0.0-20200722124340-16fba816840d h1:WZXyd8YI+PQIDYjN8HxtqNRJ1DCckt9wPTi2P8cdnKM=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200722124340-16fba816840d/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= github.com/matrix-org/gomatrixserverlib v0.0.0-20200722124340-16fba816840d/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200803165250-352235625587 h1:n2IZkm5LI4lACulOa5WU6QwWUhHUtBZez7YIFr1fCOs=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200803165250-352235625587/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200803165739-3bd1ef0f0852 h1:OBvHjLWaT2KS9kGarX2ES0yKBL/wMxAeQB39tRrAAls=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200803165739-3bd1ef0f0852/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200804110046-7abbc2918807 h1:ufr+e2FBDuxcO5t/7PMfoiQoma4uyYzS/sLuJSR6tng=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200804110046-7abbc2918807/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200804124807-5012a626de1d h1:zYk/bQ5bmHDsRqHBl57aBxo5bizsknWU3sunZf9WnWI=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200804124807-5012a626de1d/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU=
github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f h1:pRz4VTiRCO4zPlEMc3ESdUOcW4PXHH4Kj+YDz1XyE+Y= github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f h1:pRz4VTiRCO4zPlEMc3ESdUOcW4PXHH4Kj+YDz1XyE+Y=
github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f/go.mod h1:y0oDTjZDv5SM9a2rp3bl+CU+bvTRINQsdb7YlDql5Go= github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f/go.mod h1:y0oDTjZDv5SM9a2rp3bl+CU+bvTRINQsdb7YlDql5Go=
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 h1:ntrLa/8xVzeSs8vHFHK25k0C+NV74sYMJnNSg5NoSRo= github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 h1:ntrLa/8xVzeSs8vHFHK25k0C+NV74sYMJnNSg5NoSRo=

View file

@ -32,6 +32,7 @@ type KeyInternalAPI interface {
QueryKeys(ctx context.Context, req *QueryKeysRequest, res *QueryKeysResponse) QueryKeys(ctx context.Context, req *QueryKeysRequest, res *QueryKeysResponse)
QueryKeyChanges(ctx context.Context, req *QueryKeyChangesRequest, res *QueryKeyChangesResponse) QueryKeyChanges(ctx context.Context, req *QueryKeyChangesRequest, res *QueryKeyChangesResponse)
QueryOneTimeKeys(ctx context.Context, req *QueryOneTimeKeysRequest, res *QueryOneTimeKeysResponse) QueryOneTimeKeys(ctx context.Context, req *QueryOneTimeKeysRequest, res *QueryOneTimeKeysResponse)
QueryDeviceMessages(ctx context.Context, req *QueryDeviceMessagesRequest, res *QueryDeviceMessagesResponse)
} }
// KeyError is returned if there was a problem performing/querying the server // KeyError is returned if there was a problem performing/querying the server
@ -188,3 +189,14 @@ type QueryOneTimeKeysResponse struct {
Count OneTimeKeysCount Count OneTimeKeysCount
Error *KeyError Error *KeyError
} }
type QueryDeviceMessagesRequest struct {
UserID string
}
type QueryDeviceMessagesResponse struct {
// The latest stream ID
StreamID int
Devices []DeviceMessage
Error *KeyError
}

View file

@ -179,6 +179,24 @@ func (a *KeyInternalAPI) QueryOneTimeKeys(ctx context.Context, req *api.QueryOne
res.Count = *count res.Count = *count
} }
func (a *KeyInternalAPI) QueryDeviceMessages(ctx context.Context, req *api.QueryDeviceMessagesRequest, res *api.QueryDeviceMessagesResponse) {
msgs, err := a.DB.DeviceKeysForUser(ctx, req.UserID, nil)
if err != nil {
res.Error = &api.KeyError{
Err: fmt.Sprintf("failed to query DB for device keys: %s", err),
}
return
}
maxStreamID := 0
for _, m := range msgs {
if m.StreamID > maxStreamID {
maxStreamID = m.StreamID
}
}
res.Devices = msgs
res.StreamID = maxStreamID
}
func (a *KeyInternalAPI) QueryKeys(ctx context.Context, req *api.QueryKeysRequest, res *api.QueryKeysResponse) { func (a *KeyInternalAPI) QueryKeys(ctx context.Context, req *api.QueryKeysRequest, res *api.QueryKeysResponse) {
res.DeviceKeys = make(map[string]map[string]json.RawMessage) res.DeviceKeys = make(map[string]map[string]json.RawMessage)
res.Failures = make(map[string]interface{}) res.Failures = make(map[string]interface{})

View file

@ -32,6 +32,7 @@ const (
QueryKeysPath = "/keyserver/queryKeys" QueryKeysPath = "/keyserver/queryKeys"
QueryKeyChangesPath = "/keyserver/queryKeyChanges" QueryKeyChangesPath = "/keyserver/queryKeyChanges"
QueryOneTimeKeysPath = "/keyserver/queryOneTimeKeys" QueryOneTimeKeysPath = "/keyserver/queryOneTimeKeys"
QueryDeviceMessagesPath = "/keyserver/queryDeviceMessages"
) )
// NewKeyServerClient creates a KeyInternalAPI implemented by talking to a HTTP POST API. // NewKeyServerClient creates a KeyInternalAPI implemented by talking to a HTTP POST API.
@ -126,6 +127,23 @@ func (h *httpKeyInternalAPI) QueryOneTimeKeys(
} }
} }
func (h *httpKeyInternalAPI) QueryDeviceMessages(
ctx context.Context,
request *api.QueryDeviceMessagesRequest,
response *api.QueryDeviceMessagesResponse,
) {
span, ctx := opentracing.StartSpanFromContext(ctx, "QueryDeviceMessages")
defer span.Finish()
apiURL := h.apiURL + QueryDeviceMessagesPath
err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
if err != nil {
response.Error = &api.KeyError{
Err: err.Error(),
}
}
}
func (h *httpKeyInternalAPI) QueryKeyChanges( func (h *httpKeyInternalAPI) QueryKeyChanges(
ctx context.Context, ctx context.Context,
request *api.QueryKeyChangesRequest, request *api.QueryKeyChangesRequest,

View file

@ -69,6 +69,17 @@ func AddRoutes(internalAPIMux *mux.Router, s api.KeyInternalAPI) {
return util.JSONResponse{Code: http.StatusOK, JSON: &response} return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}), }),
) )
internalAPIMux.Handle(QueryDeviceMessagesPath,
httputil.MakeInternalAPI("queryDeviceMessages", func(req *http.Request) util.JSONResponse {
request := api.QueryDeviceMessagesRequest{}
response := api.QueryDeviceMessagesResponse{}
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
s.QueryDeviceMessages(req.Context(), &request, &response)
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
internalAPIMux.Handle(QueryKeyChangesPath, internalAPIMux.Handle(QueryKeyChangesPath,
httputil.MakeInternalAPI("queryKeyChanges", func(req *http.Request) util.JSONResponse { httputil.MakeInternalAPI("queryKeyChanges", func(req *http.Request) util.JSONResponse {
request := api.QueryKeyChangesRequest{} request := api.QueryKeyChangesRequest{}

View file

@ -171,6 +171,12 @@ func (r *RoomserverInternalAPI) processInviteEvent(
if err = event.SetUnsignedField("invite_room_state", irs); err != nil { if err = event.SetUnsignedField("invite_room_state", irs); err != nil {
return nil, err return nil, err
} }
} else {
log.WithError(ierr).Error("failed to build invite stripped state")
// still set the field else synapse deployments don't process the invite
if err = event.SetUnsignedField("invite_room_state", struct{}{}); err != nil {
return nil, err
}
} }
} }

View file

@ -41,6 +41,9 @@ func (k *mockKeyAPI) QueryKeyChanges(ctx context.Context, req *keyapi.QueryKeyCh
} }
func (k *mockKeyAPI) QueryOneTimeKeys(ctx context.Context, req *keyapi.QueryOneTimeKeysRequest, res *keyapi.QueryOneTimeKeysResponse) { func (k *mockKeyAPI) QueryOneTimeKeys(ctx context.Context, req *keyapi.QueryOneTimeKeysRequest, res *keyapi.QueryOneTimeKeysResponse) {
}
func (k *mockKeyAPI) QueryDeviceMessages(ctx context.Context, req *keyapi.QueryDeviceMessagesRequest, res *keyapi.QueryDeviceMessagesResponse) {
} }
type mockCurrentStateAPI struct { type mockCurrentStateAPI struct {

View file

@ -136,6 +136,8 @@ Local new device changes appear in v2 /sync
Local update device changes appear in v2 /sync Local update device changes appear in v2 /sync
Users receive device_list updates for their own devices Users receive device_list updates for their own devices
Get left notifs for other users in sync and /keys/changes when user leaves Get left notifs for other users in sync and /keys/changes when user leaves
Local device key changes get to remote servers
Local device key changes get to remote servers with correct prev_id
Can add account data Can add account data
Can add account data to room Can add account data to room
Can get account data without syncing Can get account data without syncing