Merge branch 'master' into neilalexander/yggembed

This commit is contained in:
Neil Alexander 2020-06-10 17:18:48 +01:00
commit abdd40b1ec
8 changed files with 58 additions and 146 deletions

View file

@ -99,9 +99,9 @@ func main() {
httpServer := &http.Server{
Addr: ":0",
TLSNextProto: map[string]func(*http.Server, *tls.Conn, http.Handler){},
ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
IdleTimeout: 60 * time.Second,
ReadTimeout: 5 * time.Second,
WriteTimeout: 5 * time.Second,
IdleTimeout: 15 * time.Second,
BaseContext: func(_ net.Listener) context.Context {
return context.Background()
},

View file

@ -25,8 +25,8 @@ import (
func (n *Node) yamuxConfig() *yamux.Config {
cfg := yamux.DefaultConfig()
cfg.EnableKeepAlive = true
cfg.KeepAliveInterval = time.Second * 30
cfg.EnableKeepAlive = false
cfg.ConnectionWriteTimeout = time.Second * 5
cfg.MaxMessageSize = 65535
cfg.ReadBufSize = 655350
return cfg
@ -40,6 +40,8 @@ func (n *Node) listenFromYgg() {
return
}
var session *yamux.Session
// If the remote address is lower than ours then we'll be the
// server. Otherwse we'll be the client.
if strings.Compare(conn.RemoteAddr().String(), n.DerivedServerName()) < 0 {
session, err = yamux.Server(conn, n.yamuxConfig())
} else {
@ -55,6 +57,11 @@ func (n *Node) listenFromYgg() {
func (n *Node) listenFromYggConn(session *yamux.Session) {
n.sessions.Store(session.RemoteAddr().String(), session)
defer n.sessions.Delete(session.RemoteAddr())
defer func() {
if err := session.Close(); err != nil {
n.log.Println("session.Close:", err)
}
}()
for {
st, err := session.AcceptStream()
@ -90,16 +97,18 @@ func (n *Node) Dial(network, address string) (net.Conn, error) {
func (n *Node) DialContext(ctx context.Context, network, address string) (net.Conn, error) {
s, ok1 := n.sessions.Load(address)
session, ok2 := s.(*yamux.Session)
if !ok1 || !ok2 {
if !ok1 || !ok2 || (ok1 && ok2 && session.IsClosed()) {
conn, err := n.dialer.DialContext(ctx, network, address)
if err != nil {
n.log.Println("n.dialer.DialContext:", err)
return nil, err
}
if strings.Compare(address, n.DerivedServerName()) > 0 {
session, err = yamux.Client(conn, n.yamuxConfig())
} else {
// If the remote address is lower than ours then we will be the
// server. Otherwise we'll be the client.
if strings.Compare(conn.RemoteAddr().String(), n.DerivedServerName()) < 0 {
session, err = yamux.Server(conn, n.yamuxConfig())
} else {
session, err = yamux.Client(conn, n.yamuxConfig())
}
if err != nil {
return nil, err

View file

@ -20,7 +20,6 @@ import (
"github.com/matrix-org/dendrite/federationsender/consumers"
"github.com/matrix-org/dendrite/federationsender/internal"
"github.com/matrix-org/dendrite/federationsender/inthttp"
"github.com/matrix-org/dendrite/federationsender/producers"
"github.com/matrix-org/dendrite/federationsender/queue"
"github.com/matrix-org/dendrite/federationsender/storage"
"github.com/matrix-org/dendrite/federationsender/types"
@ -49,13 +48,13 @@ func NewInternalAPI(
logrus.WithError(err).Panic("failed to connect to federation sender db")
}
roomserverProducer := producers.NewRoomserverProducer(
rsAPI, base.Cfg.Matrix.ServerName, base.Cfg.Matrix.KeyID, base.Cfg.Matrix.PrivateKey,
)
statistics := &types.Statistics{}
queues := queue.NewOutgoingQueues(
base.Cfg.Matrix.ServerName, federation, roomserverProducer, statistics,
base.Cfg.Matrix.ServerName, federation, rsAPI, statistics, &queue.SigningInfo{
KeyID: base.Cfg.Matrix.KeyID,
PrivateKey: base.Cfg.Matrix.PrivateKey,
ServerName: base.Cfg.Matrix.ServerName,
},
)
rsConsumer := consumers.NewOutputRoomEventConsumer(
@ -73,5 +72,5 @@ func NewInternalAPI(
logrus.WithError(err).Panic("failed to start typing server consumer")
}
return internal.NewFederationSenderInternalAPI(federationSenderDB, base.Cfg, roomserverProducer, federation, keyRing, statistics, queues)
return internal.NewFederationSenderInternalAPI(federationSenderDB, base.Cfg, rsAPI, federation, keyRing, statistics, queues)
}

View file

@ -1,22 +1,20 @@
package internal
import (
"github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/federationsender/producers"
"github.com/matrix-org/dendrite/federationsender/queue"
"github.com/matrix-org/dendrite/federationsender/storage"
"github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
)
// FederationSenderInternalAPI is an implementation of api.FederationSenderInternalAPI
type FederationSenderInternalAPI struct {
api.FederationSenderInternalAPI
db storage.Database
cfg *config.Dendrite
statistics *types.Statistics
producer *producers.RoomserverProducer
rsAPI api.RoomserverInternalAPI
federation *gomatrixserverlib.FederationClient
keyRing *gomatrixserverlib.KeyRing
queues *queue.OutgoingQueues
@ -24,7 +22,7 @@ type FederationSenderInternalAPI struct {
func NewFederationSenderInternalAPI(
db storage.Database, cfg *config.Dendrite,
producer *producers.RoomserverProducer,
rsAPI api.RoomserverInternalAPI,
federation *gomatrixserverlib.FederationClient,
keyRing *gomatrixserverlib.KeyRing,
statistics *types.Statistics,
@ -33,7 +31,7 @@ func NewFederationSenderInternalAPI(
return &FederationSenderInternalAPI{
db: db,
cfg: cfg,
producer: producer,
rsAPI: rsAPI,
federation: federation,
keyRing: keyRing,
statistics: statistics,

View file

@ -7,6 +7,7 @@ import (
"github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/federationsender/internal/perform"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/version"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
@ -175,10 +176,11 @@ func (r *FederationSenderInternalAPI) performJoinUsingServer(
// If we successfully performed a send_join above then the other
// server now thinks we're a part of the room. Send the newly
// returned state to the roomserver to update our local view.
if err = r.producer.SendEventWithState(
ctx,
respSendJoin.ToRespState(),
event.Headered(respMakeJoin.RoomVersion),
respState := respSendJoin.ToRespState()
if err = roomserverAPI.SendEventWithState(
ctx, r.rsAPI,
&respState,
event.Headered(respMakeJoin.RoomVersion), nil,
); err != nil {
return fmt.Errorf("r.producer.SendEventWithState: %w", err)
}

View file

@ -1,108 +0,0 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package producers
import (
"context"
"crypto/ed25519"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
)
// RoomserverProducer produces events for the roomserver to consume.
type RoomserverProducer struct {
InputAPI api.RoomserverInternalAPI
serverName gomatrixserverlib.ServerName
keyID gomatrixserverlib.KeyID
privateKey ed25519.PrivateKey
}
// NewRoomserverProducer creates a new RoomserverProducer
func NewRoomserverProducer(
rsAPI api.RoomserverInternalAPI, serverName gomatrixserverlib.ServerName,
keyID gomatrixserverlib.KeyID, privateKey ed25519.PrivateKey,
) *RoomserverProducer {
return &RoomserverProducer{
InputAPI: rsAPI,
serverName: serverName,
keyID: keyID,
privateKey: privateKey,
}
}
// SendInviteResponse drops an invite response back into the roomserver so that users
// already in the room will be notified of the new invite. The invite response is signed
// by the remote side.
func (c *RoomserverProducer) SendInviteResponse(
ctx context.Context, res gomatrixserverlib.RespInviteV2, roomVersion gomatrixserverlib.RoomVersion,
) (string, error) {
ev := res.Event.Sign(string(c.serverName), c.keyID, c.privateKey).Headered(roomVersion)
ire := api.InputRoomEvent{
Kind: api.KindNew,
Event: ev,
AuthEventIDs: ev.AuthEventIDs(),
SendAsServer: string(c.serverName),
TransactionID: nil,
}
return c.SendInputRoomEvents(ctx, []api.InputRoomEvent{ire})
}
// SendEventWithState writes an event with KindNew to the roomserver input log
// with the state at the event as KindOutlier before it.
func (c *RoomserverProducer) SendEventWithState(
ctx context.Context, state gomatrixserverlib.RespState, event gomatrixserverlib.HeaderedEvent,
) error {
outliers, err := state.Events()
if err != nil {
return err
}
var ires []api.InputRoomEvent
for _, outlier := range outliers {
ires = append(ires, api.InputRoomEvent{
Kind: api.KindOutlier,
Event: outlier.Headered(event.RoomVersion),
AuthEventIDs: outlier.AuthEventIDs(),
})
}
stateEventIDs := make([]string, len(state.StateEvents))
for i := range state.StateEvents {
stateEventIDs[i] = state.StateEvents[i].EventID()
}
ires = append(ires, api.InputRoomEvent{
Kind: api.KindNew,
Event: event,
AuthEventIDs: event.AuthEventIDs(),
HasState: true,
StateEventIDs: stateEventIDs,
})
_, err = c.SendInputRoomEvents(ctx, ires)
return err
}
// SendInputRoomEvents writes the given input room events to the roomserver input API.
func (c *RoomserverProducer) SendInputRoomEvents(
ctx context.Context, ires []api.InputRoomEvent,
) (eventID string, err error) {
request := api.InputRoomEventsRequest{InputRoomEvents: ires}
var response api.InputRoomEventsResponse
err = c.InputAPI.InputRoomEvents(ctx, &request, &response)
eventID = response.EventID
return
}

View file

@ -20,8 +20,8 @@ import (
"fmt"
"time"
"github.com/matrix-org/dendrite/federationsender/producers"
"github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrix"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
@ -34,7 +34,8 @@ import (
// ensures that only one request is in flight to a given destination
// at a time.
type destinationQueue struct {
rsProducer *producers.RoomserverProducer // roomserver producer
signing *SigningInfo
rsAPI api.RoomserverInternalAPI
client *gomatrixserverlib.FederationClient // federation client
origin gomatrixserverlib.ServerName // origin of requests
destination gomatrixserverlib.ServerName // destination of requests
@ -370,11 +371,9 @@ func (oq *destinationQueue) nextInvites(
return done, err
}
if _, err = oq.rsProducer.SendInviteResponse(
context.TODO(),
inviteRes,
roomVersion,
); err != nil {
invEv := inviteRes.Event.Sign(string(oq.signing.ServerName), oq.signing.KeyID, oq.signing.PrivateKey).Headered(roomVersion)
_, err = api.SendEvents(context.TODO(), oq.rsAPI, []gomatrixserverlib.HeaderedEvent{invEv}, oq.signing.ServerName, nil)
if err != nil {
log.WithFields(log.Fields{
"event_id": ev.EventID(),
"state_key": ev.StateKey(),

View file

@ -15,11 +15,12 @@
package queue
import (
"crypto/ed25519"
"fmt"
"sync"
"github.com/matrix-org/dendrite/federationsender/producers"
"github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
log "github.com/sirupsen/logrus"
@ -28,10 +29,11 @@ import (
// OutgoingQueues is a collection of queues for sending transactions to other
// matrix servers
type OutgoingQueues struct {
rsProducer *producers.RoomserverProducer
rsAPI api.RoomserverInternalAPI
origin gomatrixserverlib.ServerName
client *gomatrixserverlib.FederationClient
statistics *types.Statistics
signing *SigningInfo
queuesMutex sync.Mutex // protects the below
queues map[gomatrixserverlib.ServerName]*destinationQueue
}
@ -40,18 +42,28 @@ type OutgoingQueues struct {
func NewOutgoingQueues(
origin gomatrixserverlib.ServerName,
client *gomatrixserverlib.FederationClient,
rsProducer *producers.RoomserverProducer,
rsAPI api.RoomserverInternalAPI,
statistics *types.Statistics,
signing *SigningInfo,
) *OutgoingQueues {
return &OutgoingQueues{
rsProducer: rsProducer,
rsAPI: rsAPI,
origin: origin,
client: client,
statistics: statistics,
signing: signing,
queues: map[gomatrixserverlib.ServerName]*destinationQueue{},
}
}
// TODO: Move this somewhere useful for other components as we often need to ferry these 3 variables
// around together
type SigningInfo struct {
ServerName gomatrixserverlib.ServerName
KeyID gomatrixserverlib.KeyID
PrivateKey ed25519.PrivateKey
}
func (oqs *OutgoingQueues) getQueueIfExists(destination gomatrixserverlib.ServerName) *destinationQueue {
oqs.queuesMutex.Lock()
defer oqs.queuesMutex.Unlock()
@ -64,7 +76,7 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d
oq := oqs.queues[destination]
if oq == nil {
oq = &destinationQueue{
rsProducer: oqs.rsProducer,
rsAPI: oqs.rsAPI,
origin: oqs.origin,
destination: destination,
client: oqs.client,
@ -73,6 +85,7 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d
incomingEDUs: make(chan *gomatrixserverlib.EDU, 128),
incomingInvites: make(chan *gomatrixserverlib.InviteV2Request, 128),
retryServerCh: make(chan bool),
signing: oqs.signing,
}
oqs.queues[destination] = oq
}