Use more interfaces in federationapi; begin adding regression test

This commit is contained in:
Kegan Dougal 2022-05-16 15:57:58 +01:00
parent c7f4e163b1
commit 58cef49f4e
16 changed files with 246 additions and 60 deletions

View file

@ -12,12 +12,16 @@ import (
// FederationInternalAPI is used to query information from the federation sender. // FederationInternalAPI is used to query information from the federation sender.
type FederationInternalAPI interface { type FederationInternalAPI interface {
FederationClient gomatrixserverlib.FederatedStateClient
KeyserverFederationAPI
gomatrixserverlib.KeyDatabase gomatrixserverlib.KeyDatabase
ClientFederationAPI ClientFederationAPI
RoomserverFederationAPI RoomserverFederationAPI
QueryServerKeys(ctx context.Context, request *QueryServerKeysRequest, response *QueryServerKeysResponse) error QueryServerKeys(ctx context.Context, request *QueryServerKeysRequest, response *QueryServerKeysResponse) error
LookupServerKeys(ctx context.Context, s gomatrixserverlib.ServerName, keyRequests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp) ([]gomatrixserverlib.ServerKeys, error)
MSC2836EventRelationships(ctx context.Context, dst gomatrixserverlib.ServerName, r gomatrixserverlib.MSC2836EventRelationshipsRequest, roomVersion gomatrixserverlib.RoomVersion) (res gomatrixserverlib.MSC2836EventRelationshipsResponse, err error)
MSC2946Spaces(ctx context.Context, dst gomatrixserverlib.ServerName, roomID string, suggestedOnly bool) (res gomatrixserverlib.MSC2946SpacesResponse, err error)
// Broadcasts an EDU to all servers in rooms we are joined to. Used in the yggdrasil demos. // Broadcasts an EDU to all servers in rooms we are joined to. Used in the yggdrasil demos.
PerformBroadcastEDU( PerformBroadcastEDU(
@ -60,17 +64,43 @@ type RoomserverFederationAPI interface {
LookupMissingEvents(ctx context.Context, s gomatrixserverlib.ServerName, roomID string, missing gomatrixserverlib.MissingEvents, roomVersion gomatrixserverlib.RoomVersion) (res gomatrixserverlib.RespMissingEvents, err error) LookupMissingEvents(ctx context.Context, s gomatrixserverlib.ServerName, roomID string, missing gomatrixserverlib.MissingEvents, roomVersion gomatrixserverlib.RoomVersion) (res gomatrixserverlib.RespMissingEvents, err error)
} }
// FederationClient is a subset of gomatrixserverlib.FederationClient functions which the fedsender // KeyserverFederationAPI is a subset of gomatrixserverlib.FederationClient functions which the keyserver
// implements as proxy calls, with built-in backoff/retries/etc. Errors returned from functions in // implements as proxy calls, with built-in backoff/retries/etc. Errors returned from functions in
// this interface are of type FederationClientError // this interface are of type FederationClientError
type FederationClient interface { type KeyserverFederationAPI interface {
gomatrixserverlib.FederatedStateClient
GetUserDevices(ctx context.Context, s gomatrixserverlib.ServerName, userID string) (res gomatrixserverlib.RespUserDevices, err error) GetUserDevices(ctx context.Context, s gomatrixserverlib.ServerName, userID string) (res gomatrixserverlib.RespUserDevices, err error)
ClaimKeys(ctx context.Context, s gomatrixserverlib.ServerName, oneTimeKeys map[string]map[string]string) (res gomatrixserverlib.RespClaimKeys, err error) ClaimKeys(ctx context.Context, s gomatrixserverlib.ServerName, oneTimeKeys map[string]map[string]string) (res gomatrixserverlib.RespClaimKeys, err error)
QueryKeys(ctx context.Context, s gomatrixserverlib.ServerName, keys map[string][]string) (res gomatrixserverlib.RespQueryKeys, err error) QueryKeys(ctx context.Context, s gomatrixserverlib.ServerName, keys map[string][]string) (res gomatrixserverlib.RespQueryKeys, err error)
}
// an interface for gmsl.FederationClient - contains functions called by federationapi only.
type FederationClient interface {
gomatrixserverlib.KeyClient
SendTransaction(ctx context.Context, t gomatrixserverlib.Transaction) (res gomatrixserverlib.RespSend, err error)
// Perform operations
LookupRoomAlias(ctx context.Context, s gomatrixserverlib.ServerName, roomAlias string) (res gomatrixserverlib.RespDirectory, err error)
Peek(ctx context.Context, s gomatrixserverlib.ServerName, roomID, peekID string, roomVersions []gomatrixserverlib.RoomVersion) (res gomatrixserverlib.RespPeek, err error)
MakeJoin(ctx context.Context, s gomatrixserverlib.ServerName, roomID, userID string, roomVersions []gomatrixserverlib.RoomVersion) (res gomatrixserverlib.RespMakeJoin, err error)
SendJoin(ctx context.Context, s gomatrixserverlib.ServerName, event *gomatrixserverlib.Event) (res gomatrixserverlib.RespSendJoin, err error)
MakeLeave(ctx context.Context, s gomatrixserverlib.ServerName, roomID, userID string) (res gomatrixserverlib.RespMakeLeave, err error)
SendLeave(ctx context.Context, s gomatrixserverlib.ServerName, event *gomatrixserverlib.Event) (err error)
SendInviteV2(ctx context.Context, s gomatrixserverlib.ServerName, request gomatrixserverlib.InviteV2Request) (res gomatrixserverlib.RespInviteV2, err error)
GetEvent(ctx context.Context, s gomatrixserverlib.ServerName, eventID string) (res gomatrixserverlib.Transaction, err error)
GetEventAuth(ctx context.Context, s gomatrixserverlib.ServerName, roomVersion gomatrixserverlib.RoomVersion, roomID, eventID string) (res gomatrixserverlib.RespEventAuth, err error)
GetUserDevices(ctx context.Context, s gomatrixserverlib.ServerName, userID string) (gomatrixserverlib.RespUserDevices, error)
ClaimKeys(ctx context.Context, s gomatrixserverlib.ServerName, oneTimeKeys map[string]map[string]string) (gomatrixserverlib.RespClaimKeys, error)
QueryKeys(ctx context.Context, s gomatrixserverlib.ServerName, keys map[string][]string) (gomatrixserverlib.RespQueryKeys, error)
Backfill(ctx context.Context, s gomatrixserverlib.ServerName, roomID string, limit int, eventIDs []string) (res gomatrixserverlib.Transaction, err error)
MSC2836EventRelationships(ctx context.Context, dst gomatrixserverlib.ServerName, r gomatrixserverlib.MSC2836EventRelationshipsRequest, roomVersion gomatrixserverlib.RoomVersion) (res gomatrixserverlib.MSC2836EventRelationshipsResponse, err error) MSC2836EventRelationships(ctx context.Context, dst gomatrixserverlib.ServerName, r gomatrixserverlib.MSC2836EventRelationshipsRequest, roomVersion gomatrixserverlib.RoomVersion) (res gomatrixserverlib.MSC2836EventRelationshipsResponse, err error)
MSC2946Spaces(ctx context.Context, dst gomatrixserverlib.ServerName, roomID string, suggestedOnly bool) (res gomatrixserverlib.MSC2946SpacesResponse, err error) MSC2946Spaces(ctx context.Context, dst gomatrixserverlib.ServerName, roomID string, suggestedOnly bool) (res gomatrixserverlib.MSC2946SpacesResponse, err error)
LookupServerKeys(ctx context.Context, s gomatrixserverlib.ServerName, keyRequests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp) ([]gomatrixserverlib.ServerKeys, error)
ExchangeThirdPartyInvite(ctx context.Context, s gomatrixserverlib.ServerName, builder gomatrixserverlib.EventBuilder) (err error)
LookupState(ctx context.Context, s gomatrixserverlib.ServerName, roomID string, eventID string, roomVersion gomatrixserverlib.RoomVersion) (res gomatrixserverlib.RespState, err error)
LookupStateIDs(ctx context.Context, s gomatrixserverlib.ServerName, roomID string, eventID string) (res gomatrixserverlib.RespStateIDs, err error)
LookupMissingEvents(ctx context.Context, s gomatrixserverlib.ServerName, roomID string, missing gomatrixserverlib.MissingEvents, roomVersion gomatrixserverlib.RoomVersion) (res gomatrixserverlib.RespMissingEvents, err error)
} }
// FederationClientError is returned from FederationClient methods in the event of a problem. // FederationClientError is returned from FederationClient methods in the event of a problem.

View file

@ -93,7 +93,7 @@ func AddPublicRoutes(
// can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes. // can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes.
func NewInternalAPI( func NewInternalAPI(
base *base.BaseDendrite, base *base.BaseDendrite,
federation *gomatrixserverlib.FederationClient, federation api.FederationClient,
rsAPI roomserverAPI.FederationRoomserverAPI, rsAPI roomserverAPI.FederationRoomserverAPI,
caches *caching.Caches, caches *caching.Caches,
keyRing *gomatrixserverlib.KeyRing, keyRing *gomatrixserverlib.KeyRing,

View file

@ -3,21 +3,103 @@ package federationapi_test
import ( import (
"context" "context"
"crypto/ed25519" "crypto/ed25519"
"encoding/json"
"strings" "strings"
"testing" "testing"
"time"
"github.com/matrix-org/dendrite/federationapi" "github.com/matrix-org/dendrite/federationapi"
"github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/federationapi/internal" "github.com/matrix-org/dendrite/federationapi/internal"
"github.com/matrix-org/dendrite/roomserver/api" rsapi "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/test" "github.com/matrix-org/dendrite/test"
"github.com/matrix-org/dendrite/test/testrig"
"github.com/matrix-org/gomatrix" "github.com/matrix-org/gomatrix"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
) )
type fedRoomserverAPI struct { type fedRoomserverAPI struct {
api.FederationRoomserverAPI rsapi.FederationRoomserverAPI
inputRoomEvents func(ctx context.Context, req *rsapi.InputRoomEventsRequest, res *rsapi.InputRoomEventsResponse)
}
// PerformJoin will call this function
func (f *fedRoomserverAPI) InputRoomEvents(ctx context.Context, req *rsapi.InputRoomEventsRequest, res *rsapi.InputRoomEventsResponse) {
if f.inputRoomEvents == nil {
return
}
f.inputRoomEvents(ctx, req, res)
}
type fedClient struct {
api.FederationClient
allowJoins []*test.Room
t *testing.T
}
func (f *fedClient) GetServerKeys(ctx context.Context, matrixServer gomatrixserverlib.ServerName) (gomatrixserverlib.ServerKeys, error) {
var keys gomatrixserverlib.ServerKeys
keys.ServerName = matrixServer
keys.ValidUntilTS = gomatrixserverlib.AsTimestamp(time.Now().Add(10 * time.Hour))
publicKey := test.PrivateKey.Public().(ed25519.PublicKey)
keys.VerifyKeys = map[gomatrixserverlib.KeyID]gomatrixserverlib.VerifyKey{
test.KeyID: {
Key: gomatrixserverlib.Base64Bytes(publicKey),
},
}
toSign, err := json.Marshal(keys.ServerKeyFields)
if err != nil {
return keys, err
}
keys.Raw, err = gomatrixserverlib.SignJSON(
string(matrixServer), test.KeyID, test.PrivateKey, toSign,
)
if err != nil {
return keys, err
}
return keys, nil
}
func (f *fedClient) MakeJoin(ctx context.Context, s gomatrixserverlib.ServerName, roomID, userID string, roomVersions []gomatrixserverlib.RoomVersion) (res gomatrixserverlib.RespMakeJoin, err error) {
for _, r := range f.allowJoins {
if r.ID == roomID {
res.RoomVersion = r.Version
res.JoinEvent = gomatrixserverlib.EventBuilder{
Sender: userID,
RoomID: roomID,
Type: "m.room.member",
StateKey: &userID,
Content: gomatrixserverlib.RawJSON([]byte(`{"membership":"join"}`)),
PrevEvents: r.ForwardExtremities(),
}
var needed gomatrixserverlib.StateNeeded
needed, err = gomatrixserverlib.StateNeededForEventBuilder(&res.JoinEvent)
if err != nil {
f.t.Errorf("StateNeededForEventBuilder: %v", err)
return
}
res.JoinEvent.AuthEvents = r.MustGetAuthEventRefsForEvent(f.t, needed)
return
}
}
return
}
func (f *fedClient) SendJoin(ctx context.Context, s gomatrixserverlib.ServerName, event *gomatrixserverlib.Event) (res gomatrixserverlib.RespSendJoin, err error) {
for _, r := range f.allowJoins {
if r.ID == event.RoomID() {
r.InsertEvent(f.t, event.Headered(r.Version))
f.t.Logf("Join event: %v", event.EventID())
res.StateEvents = gomatrixserverlib.NewEventJSONsFromHeaderedEvents(r.CurrentState())
res.AuthEvents = gomatrixserverlib.NewEventJSONsFromHeaderedEvents(r.Events())
}
}
return
} }
// Regression test to make sure that /send_join is updating the destination hosts synchronously and // Regression test to make sure that /send_join is updating the destination hosts synchronously and
@ -29,17 +111,44 @@ func TestFederationAPIJoinThenKeyUpdate(t *testing.T) {
} }
func testFederationAPIJoinThenKeyUpdate(t *testing.T, dbType test.DBType) { func testFederationAPIJoinThenKeyUpdate(t *testing.T, dbType test.DBType) {
/* base, close := testrig.CreateBaseDendrite(t, dbType)
base, close := testrig.CreateBaseDendrite(t, dbType) defer close()
defer close() jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
user := test.NewUser() user := test.NewUser()
room := test.NewRoom(t, user) room := test.NewRoom(t, user)
joiningVia := gomatrixserverlib.ServerName("example.localhost")
rsapi := &fedRoomserverAPI{
inputRoomEvents: func(ctx context.Context, req *rsapi.InputRoomEventsRequest, res *rsapi.InputRoomEventsResponse) {
if req.Asynchronous {
t.Errorf("InputRoomEvents from PerformJoin MUST be synchronous")
}
},
}
fsapi := federationapi.NewInternalAPI(base, &fedClient{
allowJoins: []*test.Room{room},
t: t,
}, rsapi, base.Caches, nil, false)
var resp api.PerformJoinResponse
fsapi.PerformJoin(context.Background(), &api.PerformJoinRequest{
RoomID: room.ID,
UserID: user.ID,
ServerNames: []gomatrixserverlib.ServerName{joiningVia},
}, &resp)
if resp.JoinedVia != joiningVia {
t.Errorf("PerformJoin: joined via %v want %v", resp.JoinedVia, joiningVia)
}
if resp.LastError != nil {
t.Fatalf("PerformJoin: returned error: %+v", *resp.LastError)
}
// TODO:
// Inject a keyserver key change event and ensure we try to send it out
fsapi := federationapi.NewInternalAPI(base, mockClient, fedRoomserverAPI{}, base.Caches, nil, false)
*/
} }
// Tests that event IDs with '/' in them (escaped as %2F) are correctly passed to the right handler and don't 404. // Tests that event IDs with '/' in them (escaped as %2F) are correctly passed to the right handler and don't 404.

View file

@ -26,7 +26,7 @@ type FederationInternalAPI struct {
cfg *config.FederationAPI cfg *config.FederationAPI
statistics *statistics.Statistics statistics *statistics.Statistics
rsAPI roomserverAPI.FederationRoomserverAPI rsAPI roomserverAPI.FederationRoomserverAPI
federation *gomatrixserverlib.FederationClient federation api.FederationClient
keyRing *gomatrixserverlib.KeyRing keyRing *gomatrixserverlib.KeyRing
queues *queue.OutgoingQueues queues *queue.OutgoingQueues
joins sync.Map // joins currently in progress joins sync.Map // joins currently in progress
@ -35,7 +35,7 @@ type FederationInternalAPI struct {
func NewFederationInternalAPI( func NewFederationInternalAPI(
db storage.Database, cfg *config.FederationAPI, db storage.Database, cfg *config.FederationAPI,
rsAPI roomserverAPI.FederationRoomserverAPI, rsAPI roomserverAPI.FederationRoomserverAPI,
federation *gomatrixserverlib.FederationClient, federation api.FederationClient,
statistics *statistics.Statistics, statistics *statistics.Statistics,
caches *caching.Caches, caches *caching.Caches,
queues *queue.OutgoingQueues, queues *queue.OutgoingQueues,

View file

@ -666,7 +666,7 @@ func setDefaultRoomVersionFromJoinEvent(joinEvent gomatrixserverlib.EventBuilder
// FederatedAuthProvider is an auth chain provider which fetches events from the server provided // FederatedAuthProvider is an auth chain provider which fetches events from the server provided
func federatedAuthProvider( func federatedAuthProvider(
ctx context.Context, federation *gomatrixserverlib.FederationClient, ctx context.Context, federation api.FederationClient,
keyRing gomatrixserverlib.JSONVerifier, server gomatrixserverlib.ServerName, keyRing gomatrixserverlib.JSONVerifier, server gomatrixserverlib.ServerName,
) gomatrixserverlib.AuthChainProvider { ) gomatrixserverlib.AuthChainProvider {
// A list of events that we have retried, if they were not included in // A list of events that we have retried, if they were not included in

View file

@ -21,6 +21,7 @@ import (
"sync" "sync"
"time" "time"
fedapi "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/federationapi/statistics" "github.com/matrix-org/dendrite/federationapi/statistics"
"github.com/matrix-org/dendrite/federationapi/storage" "github.com/matrix-org/dendrite/federationapi/storage"
"github.com/matrix-org/dendrite/federationapi/storage/shared" "github.com/matrix-org/dendrite/federationapi/storage/shared"
@ -50,20 +51,20 @@ type destinationQueue struct {
process *process.ProcessContext process *process.ProcessContext
signing *SigningInfo signing *SigningInfo
rsAPI api.FederationRoomserverAPI rsAPI api.FederationRoomserverAPI
client *gomatrixserverlib.FederationClient // federation client client fedapi.FederationClient // federation client
origin gomatrixserverlib.ServerName // origin of requests origin gomatrixserverlib.ServerName // origin of requests
destination gomatrixserverlib.ServerName // destination of requests destination gomatrixserverlib.ServerName // destination of requests
running atomic.Bool // is the queue worker running? running atomic.Bool // is the queue worker running?
backingOff atomic.Bool // true if we're backing off backingOff atomic.Bool // true if we're backing off
overflowed atomic.Bool // the queues exceed maxPDUsInMemory/maxEDUsInMemory, so we should consult the database for more overflowed atomic.Bool // the queues exceed maxPDUsInMemory/maxEDUsInMemory, so we should consult the database for more
statistics *statistics.ServerStatistics // statistics about this remote server statistics *statistics.ServerStatistics // statistics about this remote server
transactionIDMutex sync.Mutex // protects transactionID transactionIDMutex sync.Mutex // protects transactionID
transactionID gomatrixserverlib.TransactionID // last transaction ID if retrying, or "" if last txn was successful transactionID gomatrixserverlib.TransactionID // last transaction ID if retrying, or "" if last txn was successful
notify chan struct{} // interrupts idle wait pending PDUs/EDUs notify chan struct{} // interrupts idle wait pending PDUs/EDUs
pendingPDUs []*queuedPDU // PDUs waiting to be sent pendingPDUs []*queuedPDU // PDUs waiting to be sent
pendingEDUs []*queuedEDU // EDUs waiting to be sent pendingEDUs []*queuedEDU // EDUs waiting to be sent
pendingMutex sync.RWMutex // protects pendingPDUs and pendingEDUs pendingMutex sync.RWMutex // protects pendingPDUs and pendingEDUs
interruptBackoff chan bool // interrupts backoff interruptBackoff chan bool // interrupts backoff
} }
// Send event adds the event to the pending queue for the destination. // Send event adds the event to the pending queue for the destination.

View file

@ -26,6 +26,7 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/tidwall/gjson" "github.com/tidwall/gjson"
fedapi "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/federationapi/statistics" "github.com/matrix-org/dendrite/federationapi/statistics"
"github.com/matrix-org/dendrite/federationapi/storage" "github.com/matrix-org/dendrite/federationapi/storage"
"github.com/matrix-org/dendrite/federationapi/storage/shared" "github.com/matrix-org/dendrite/federationapi/storage/shared"
@ -41,7 +42,7 @@ type OutgoingQueues struct {
disabled bool disabled bool
rsAPI api.FederationRoomserverAPI rsAPI api.FederationRoomserverAPI
origin gomatrixserverlib.ServerName origin gomatrixserverlib.ServerName
client *gomatrixserverlib.FederationClient client fedapi.FederationClient
statistics *statistics.Statistics statistics *statistics.Statistics
signing *SigningInfo signing *SigningInfo
queuesMutex sync.Mutex // protects the below queuesMutex sync.Mutex // protects the below
@ -85,7 +86,7 @@ func NewOutgoingQueues(
process *process.ProcessContext, process *process.ProcessContext,
disabled bool, disabled bool,
origin gomatrixserverlib.ServerName, origin gomatrixserverlib.ServerName,
client *gomatrixserverlib.FederationClient, client fedapi.FederationClient,
rsAPI api.FederationRoomserverAPI, rsAPI api.FederationRoomserverAPI,
statistics *statistics.Statistics, statistics *statistics.Statistics,
signing *SigningInfo, signing *SigningInfo,

View file

@ -30,7 +30,7 @@ import (
// RoomAliasToID converts the queried alias into a room ID and returns it // RoomAliasToID converts the queried alias into a room ID and returns it
func RoomAliasToID( func RoomAliasToID(
httpReq *http.Request, httpReq *http.Request,
federation *gomatrixserverlib.FederationClient, federation federationAPI.FederationClient,
cfg *config.FederationAPI, cfg *config.FederationAPI,
rsAPI roomserverAPI.FederationRoomserverAPI, rsAPI roomserverAPI.FederationRoomserverAPI,
senderAPI federationAPI.FederationInternalAPI, senderAPI federationAPI.FederationInternalAPI,

View file

@ -54,7 +54,7 @@ func Setup(
rsAPI roomserverAPI.FederationRoomserverAPI, rsAPI roomserverAPI.FederationRoomserverAPI,
fsAPI *fedInternal.FederationInternalAPI, fsAPI *fedInternal.FederationInternalAPI,
keys gomatrixserverlib.JSONVerifier, keys gomatrixserverlib.JSONVerifier,
federation *gomatrixserverlib.FederationClient, federation federationAPI.FederationClient,
userAPI userapi.FederationUserAPI, userAPI userapi.FederationUserAPI,
keyAPI keyserverAPI.FederationKeyAPI, keyAPI keyserverAPI.FederationKeyAPI,
mscCfg *config.MSCs, mscCfg *config.MSCs,

View file

@ -85,7 +85,7 @@ func Send(
rsAPI api.FederationRoomserverAPI, rsAPI api.FederationRoomserverAPI,
keyAPI keyapi.FederationKeyAPI, keyAPI keyapi.FederationKeyAPI,
keys gomatrixserverlib.JSONVerifier, keys gomatrixserverlib.JSONVerifier,
federation *gomatrixserverlib.FederationClient, federation federationAPI.FederationClient,
mu *internal.MutexByRoom, mu *internal.MutexByRoom,
servers federationAPI.ServersInRoomProvider, servers federationAPI.ServersInRoomProvider,
producer *producers.SyncAPIProducer, producer *producers.SyncAPIProducer,

View file

@ -23,6 +23,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
federationAPI "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
userapi "github.com/matrix-org/dendrite/userapi/api" userapi "github.com/matrix-org/dendrite/userapi/api"
@ -57,7 +58,7 @@ var (
func CreateInvitesFrom3PIDInvites( func CreateInvitesFrom3PIDInvites(
req *http.Request, rsAPI api.FederationRoomserverAPI, req *http.Request, rsAPI api.FederationRoomserverAPI,
cfg *config.FederationAPI, cfg *config.FederationAPI,
federation *gomatrixserverlib.FederationClient, federation federationAPI.FederationClient,
userAPI userapi.FederationUserAPI, userAPI userapi.FederationUserAPI,
) util.JSONResponse { ) util.JSONResponse {
var body invites var body invites
@ -107,7 +108,7 @@ func ExchangeThirdPartyInvite(
roomID string, roomID string,
rsAPI api.FederationRoomserverAPI, rsAPI api.FederationRoomserverAPI,
cfg *config.FederationAPI, cfg *config.FederationAPI,
federation *gomatrixserverlib.FederationClient, federation federationAPI.FederationClient,
) util.JSONResponse { ) util.JSONResponse {
var builder gomatrixserverlib.EventBuilder var builder gomatrixserverlib.EventBuilder
if err := json.Unmarshal(request.Content(), &builder); err != nil { if err := json.Unmarshal(request.Content(), &builder); err != nil {
@ -165,7 +166,8 @@ func ExchangeThirdPartyInvite(
// Ask the requesting server to sign the newly created event so we know it // Ask the requesting server to sign the newly created event so we know it
// acknowledged it // acknowledged it
signedEvent, err := federation.SendInvite(httpReq.Context(), request.Origin(), event) inviteReq, err := gomatrixserverlib.NewInviteV2Request(event.Headered(verRes.RoomVersion), nil)
signedEvent, err := federation.SendInviteV2(httpReq.Context(), request.Origin(), inviteReq)
if err != nil { if err != nil {
util.GetLogger(httpReq.Context()).WithError(err).Error("federation.SendInvite failed") util.GetLogger(httpReq.Context()).WithError(err).Error("federation.SendInvite failed")
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
@ -205,7 +207,7 @@ func ExchangeThirdPartyInvite(
func createInviteFrom3PIDInvite( func createInviteFrom3PIDInvite(
ctx context.Context, rsAPI api.FederationRoomserverAPI, ctx context.Context, rsAPI api.FederationRoomserverAPI,
cfg *config.FederationAPI, cfg *config.FederationAPI,
inv invite, federation *gomatrixserverlib.FederationClient, inv invite, federation federationAPI.FederationClient,
userAPI userapi.FederationUserAPI, userAPI userapi.FederationUserAPI,
) (*gomatrixserverlib.Event, error) { ) (*gomatrixserverlib.Event, error) {
verReq := api.QueryRoomVersionForRoomRequest{RoomID: inv.RoomID} verReq := api.QueryRoomVersionForRoomRequest{RoomID: inv.RoomID}
@ -335,7 +337,7 @@ func buildMembershipEvent(
// them responded with an error. // them responded with an error.
func sendToRemoteServer( func sendToRemoteServer(
ctx context.Context, inv invite, ctx context.Context, inv invite,
federation *gomatrixserverlib.FederationClient, _ *config.FederationAPI, federation federationAPI.FederationClient, _ *config.FederationAPI,
builder gomatrixserverlib.EventBuilder, builder gomatrixserverlib.EventBuilder,
) (err error) { ) (err error) {
remoteServers := make([]gomatrixserverlib.ServerName, 2) remoteServers := make([]gomatrixserverlib.ServerName, 2)

View file

@ -84,7 +84,7 @@ type DeviceListUpdater struct {
db DeviceListUpdaterDatabase db DeviceListUpdaterDatabase
api DeviceListUpdaterAPI api DeviceListUpdaterAPI
producer KeyChangeProducer producer KeyChangeProducer
fedClient fedsenderapi.FederationClient fedClient fedsenderapi.KeyserverFederationAPI
workerChans []chan gomatrixserverlib.ServerName workerChans []chan gomatrixserverlib.ServerName
// When device lists are stale for a user, they get inserted into this map with a channel which `Update` will // When device lists are stale for a user, they get inserted into this map with a channel which `Update` will
@ -127,7 +127,7 @@ type KeyChangeProducer interface {
// NewDeviceListUpdater creates a new updater which fetches fresh device lists when they go stale. // NewDeviceListUpdater creates a new updater which fetches fresh device lists when they go stale.
func NewDeviceListUpdater( func NewDeviceListUpdater(
db DeviceListUpdaterDatabase, api DeviceListUpdaterAPI, producer KeyChangeProducer, db DeviceListUpdaterDatabase, api DeviceListUpdaterAPI, producer KeyChangeProducer,
fedClient fedsenderapi.FederationClient, numWorkers int, fedClient fedsenderapi.KeyserverFederationAPI, numWorkers int,
) *DeviceListUpdater { ) *DeviceListUpdater {
return &DeviceListUpdater{ return &DeviceListUpdater{
userIDToMutex: make(map[string]*sync.Mutex), userIDToMutex: make(map[string]*sync.Mutex),

View file

@ -37,7 +37,7 @@ import (
type KeyInternalAPI struct { type KeyInternalAPI struct {
DB storage.Database DB storage.Database
ThisServer gomatrixserverlib.ServerName ThisServer gomatrixserverlib.ServerName
FedClient fedsenderapi.FederationClient FedClient fedsenderapi.KeyserverFederationAPI
UserAPI userapi.KeyserverUserAPI UserAPI userapi.KeyserverUserAPI
Producer *producers.KeyChange Producer *producers.KeyChange
Updater *DeviceListUpdater Updater *DeviceListUpdater

View file

@ -37,7 +37,7 @@ func AddInternalRoutes(router *mux.Router, intAPI api.KeyInternalAPI) {
// NewInternalAPI returns a concerete implementation of the internal API. Callers // NewInternalAPI returns a concerete implementation of the internal API. Callers
// can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes. // can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes.
func NewInternalAPI( func NewInternalAPI(
base *base.BaseDendrite, cfg *config.KeyServer, fedClient fedsenderapi.FederationClient, base *base.BaseDendrite, cfg *config.KeyServer, fedClient fedsenderapi.KeyserverFederationAPI,
) api.KeyInternalAPI { ) api.KeyInternalAPI {
js, _ := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) js, _ := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)

View file

@ -52,6 +52,18 @@ func WithUnsigned(unsigned interface{}) eventModifier {
} }
} }
func WithKeyID(keyID gomatrixserverlib.KeyID) eventModifier {
return func(e *eventMods) {
e.keyID = keyID
}
}
func WithPrivateKey(pkey ed25519.PrivateKey) eventModifier {
return func(e *eventMods) {
e.privKey = pkey
}
}
// Reverse a list of events // Reverse a list of events
func Reversed(in []*gomatrixserverlib.HeaderedEvent) []*gomatrixserverlib.HeaderedEvent { func Reversed(in []*gomatrixserverlib.HeaderedEvent) []*gomatrixserverlib.HeaderedEvent {
out := make([]*gomatrixserverlib.HeaderedEvent, len(in)) out := make([]*gomatrixserverlib.HeaderedEvent, len(in))

View file

@ -36,8 +36,8 @@ var (
roomIDCounter = int64(0) roomIDCounter = int64(0)
testKeyID = gomatrixserverlib.KeyID("ed25519:test") KeyID = gomatrixserverlib.KeyID("ed25519:test")
testPrivateKey = ed25519.NewKeyFromSeed([]byte{ PrivateKey = ed25519.NewKeyFromSeed([]byte{
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32,
}) })
@ -49,8 +49,9 @@ type Room struct {
preset Preset preset Preset
creator *User creator *User
authEvents gomatrixserverlib.AuthEvents authEvents gomatrixserverlib.AuthEvents
events []*gomatrixserverlib.HeaderedEvent currentState map[string]*gomatrixserverlib.HeaderedEvent
events []*gomatrixserverlib.HeaderedEvent
} }
// Create a new test room. Automatically creates the initial create events. // Create a new test room. Automatically creates the initial create events.
@ -60,11 +61,12 @@ func NewRoom(t *testing.T, creator *User, modifiers ...roomModifier) *Room {
// set defaults then let roomModifiers override // set defaults then let roomModifiers override
r := &Room{ r := &Room{
ID: fmt.Sprintf("!%d:localhost", counter), ID: fmt.Sprintf("!%d:localhost", counter),
creator: creator, creator: creator,
authEvents: gomatrixserverlib.NewAuthEvents(nil), authEvents: gomatrixserverlib.NewAuthEvents(nil),
preset: PresetPublicChat, preset: PresetPublicChat,
Version: gomatrixserverlib.RoomVersionV9, Version: gomatrixserverlib.RoomVersionV9,
currentState: make(map[string]*gomatrixserverlib.HeaderedEvent),
} }
for _, m := range modifiers { for _, m := range modifiers {
m(t, r) m(t, r)
@ -73,6 +75,24 @@ func NewRoom(t *testing.T, creator *User, modifiers ...roomModifier) *Room {
return r return r
} }
func (r *Room) MustGetAuthEventRefsForEvent(t *testing.T, needed gomatrixserverlib.StateNeeded) []gomatrixserverlib.EventReference {
t.Helper()
a, err := needed.AuthEventReferences(&r.authEvents)
if err != nil {
t.Fatalf("MustGetAuthEvents: %v", err)
}
return a
}
func (r *Room) ForwardExtremities() []string {
if len(r.events) == 0 {
return nil
}
return []string{
r.events[len(r.events)-1].EventID(),
}
}
func (r *Room) insertCreateEvents(t *testing.T) { func (r *Room) insertCreateEvents(t *testing.T) {
t.Helper() t.Helper()
var joinRule gomatrixserverlib.JoinRuleContent var joinRule gomatrixserverlib.JoinRuleContent
@ -112,10 +132,10 @@ func (r *Room) CreateEvent(t *testing.T, creator *User, eventType string, conten
} }
if mod.privKey == nil { if mod.privKey == nil {
mod.privKey = testPrivateKey mod.privKey = PrivateKey
} }
if mod.keyID == "" { if mod.keyID == "" {
mod.keyID = testKeyID mod.keyID = KeyID
} }
if mod.originServerTS.IsZero() { if mod.originServerTS.IsZero() {
mod.originServerTS = time.Now() mod.originServerTS = time.Now()
@ -174,13 +194,14 @@ func (r *Room) CreateEvent(t *testing.T, creator *User, eventType string, conten
// Add a new event to this room DAG. Not thread-safe. // Add a new event to this room DAG. Not thread-safe.
func (r *Room) InsertEvent(t *testing.T, he *gomatrixserverlib.HeaderedEvent) { func (r *Room) InsertEvent(t *testing.T, he *gomatrixserverlib.HeaderedEvent) {
t.Helper() t.Helper()
// Add the event to the list of auth events // Add the event to the list of auth/state events
r.events = append(r.events, he) r.events = append(r.events, he)
if he.StateKey() != nil { if he.StateKey() != nil {
err := r.authEvents.AddEvent(he.Unwrap()) err := r.authEvents.AddEvent(he.Unwrap())
if err != nil { if err != nil {
t.Fatalf("InsertEvent: failed to add event to auth events: %s", err) t.Fatalf("InsertEvent: failed to add event to auth events: %s", err)
} }
r.currentState[he.Type()+" "+*he.StateKey()] = he
} }
} }
@ -188,6 +209,16 @@ func (r *Room) Events() []*gomatrixserverlib.HeaderedEvent {
return r.events return r.events
} }
func (r *Room) CurrentState() []*gomatrixserverlib.HeaderedEvent {
events := make([]*gomatrixserverlib.HeaderedEvent, len(r.currentState))
i := 0
for _, e := range r.currentState {
events[i] = e
i++
}
return events
}
func (r *Room) CreateAndInsert(t *testing.T, creator *User, eventType string, content interface{}, mods ...eventModifier) *gomatrixserverlib.HeaderedEvent { func (r *Room) CreateAndInsert(t *testing.T, creator *User, eventType string, content interface{}, mods ...eventModifier) *gomatrixserverlib.HeaderedEvent {
t.Helper() t.Helper()
he := r.CreateEvent(t, creator, eventType, content, mods...) he := r.CreateEvent(t, creator, eventType, content, mods...)