Merge branch 'matrix-org:main' into main

This commit is contained in:
Brian Meek 2022-08-31 10:43:39 -07:00 committed by GitHub
commit b6e13d7f94
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
22 changed files with 174 additions and 101 deletions

View file

@ -68,14 +68,15 @@ func NewOutputRoomEventConsumer(
// Start consuming from room servers // Start consuming from room servers
func (s *OutputRoomEventConsumer) Start() error { func (s *OutputRoomEventConsumer) Start() error {
return jetstream.JetStreamConsumer( return jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.topic, s.durable, s.onMessage, s.ctx, s.jetstream, s.topic, s.durable, 1,
nats.DeliverAll(), nats.ManualAck(), s.onMessage, nats.DeliverAll(), nats.ManualAck(),
) )
} }
// onMessage is called when the appservice component receives a new event from // onMessage is called when the appservice component receives a new event from
// the room server output log. // the room server output log.
func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
msg := msgs[0] // Guaranteed to exist if onMessage is called
// Parse out the event JSON // Parse out the event JSON
var output api.OutputEvent var output api.OutputEvent
if err := json.Unmarshal(msg.Data, &output); err != nil { if err := json.Unmarshal(msg.Data, &output); err != nil {

View file

@ -24,6 +24,7 @@ import (
"net" "net"
"net/http" "net/http"
"os" "os"
"strings"
"time" "time"
"github.com/gorilla/mux" "github.com/gorilla/mux"
@ -42,6 +43,7 @@ import (
"github.com/matrix-org/dendrite/setup" "github.com/matrix-org/dendrite/setup"
"github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/test"
"github.com/matrix-org/dendrite/userapi" "github.com/matrix-org/dendrite/userapi"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
@ -70,31 +72,84 @@ func main() {
var pk ed25519.PublicKey var pk ed25519.PublicKey
var sk ed25519.PrivateKey var sk ed25519.PrivateKey
keyfile := *instanceName + ".key" // iterate through the cli args and check if the config flag was set
if _, err := os.Stat(keyfile); os.IsNotExist(err) { configFlagSet := false
if pk, sk, err = ed25519.GenerateKey(nil); err != nil { for _, arg := range os.Args {
panic(err) if arg == "--config" || arg == "-config" {
configFlagSet = true
break
} }
if err = os.WriteFile(keyfile, sk, 0644); err != nil {
panic(err)
}
} else if err == nil {
if sk, err = os.ReadFile(keyfile); err != nil {
panic(err)
}
if len(sk) != ed25519.PrivateKeySize {
panic("the private key is not long enough")
}
pk = sk.Public().(ed25519.PublicKey)
} }
cfg := &config.Dendrite{}
// use custom config if config flag is set
if configFlagSet {
cfg = setup.ParseFlags(true)
sk = cfg.Global.PrivateKey
} else {
keyfile := *instanceName + ".pem"
if _, err := os.Stat(keyfile); os.IsNotExist(err) {
oldkeyfile := *instanceName + ".key"
if _, err = os.Stat(oldkeyfile); os.IsNotExist(err) {
if err = test.NewMatrixKey(keyfile); err != nil {
panic("failed to generate a new PEM key: " + err.Error())
}
if _, sk, err = config.LoadMatrixKey(keyfile, os.ReadFile); err != nil {
panic("failed to load PEM key: " + err.Error())
}
} else {
if sk, err = os.ReadFile(oldkeyfile); err != nil {
panic("failed to read the old private key: " + err.Error())
}
if len(sk) != ed25519.PrivateKeySize {
panic("the private key is not long enough")
}
if err := test.SaveMatrixKey(keyfile, sk); err != nil {
panic("failed to convert the private key to PEM format: " + err.Error())
}
}
} else {
var err error
if _, sk, err = config.LoadMatrixKey(keyfile, os.ReadFile); err != nil {
panic("failed to load PEM key: " + err.Error())
}
}
cfg.Defaults(true)
cfg.Global.PrivateKey = sk
cfg.Global.JetStream.StoragePath = config.Path(fmt.Sprintf("%s/", *instanceName))
cfg.UserAPI.AccountDatabase.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-account.db", *instanceName))
cfg.MediaAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-mediaapi.db", *instanceName))
cfg.SyncAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-syncapi.db", *instanceName))
cfg.RoomServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-roomserver.db", *instanceName))
cfg.KeyServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-keyserver.db", *instanceName))
cfg.FederationAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-federationapi.db", *instanceName))
cfg.AppServiceAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-appservice.db", *instanceName))
cfg.MSCs.MSCs = []string{"msc2836", "msc2946"}
cfg.MSCs.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-mscs.db", *instanceName))
cfg.ClientAPI.RegistrationDisabled = false
cfg.ClientAPI.OpenRegistrationWithoutVerificationEnabled = true
if err := cfg.Derive(); err != nil {
panic(err)
}
}
pk = sk.Public().(ed25519.PublicKey)
cfg.Global.ServerName = gomatrixserverlib.ServerName(hex.EncodeToString(pk))
cfg.Global.KeyID = gomatrixserverlib.KeyID(signing.KeyID)
base := base.NewBaseDendrite(cfg, "Monolith")
defer base.Close() // nolint: errcheck
pRouter := pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk, false) pRouter := pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk, false)
pQUIC := pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), pRouter, []string{"matrix"}) pQUIC := pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), pRouter, []string{"matrix"})
pMulticast := pineconeMulticast.NewMulticast(logrus.WithField("pinecone", "multicast"), pRouter) pMulticast := pineconeMulticast.NewMulticast(logrus.WithField("pinecone", "multicast"), pRouter)
pManager := pineconeConnections.NewConnectionManager(pRouter, nil) pManager := pineconeConnections.NewConnectionManager(pRouter, nil)
pMulticast.Start() pMulticast.Start()
if instancePeer != nil && *instancePeer != "" { if instancePeer != nil && *instancePeer != "" {
pManager.AddPeer(*instancePeer) for _, peer := range strings.Split(*instancePeer, ",") {
pManager.AddPeer(strings.Trim(peer, " \t\r\n"))
}
} }
go func() { go func() {
@ -125,29 +180,6 @@ func main() {
} }
}() }()
cfg := &config.Dendrite{}
cfg.Defaults(true)
cfg.Global.ServerName = gomatrixserverlib.ServerName(hex.EncodeToString(pk))
cfg.Global.PrivateKey = sk
cfg.Global.KeyID = gomatrixserverlib.KeyID(signing.KeyID)
cfg.Global.JetStream.StoragePath = config.Path(fmt.Sprintf("%s/", *instanceName))
cfg.UserAPI.AccountDatabase.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-account.db", *instanceName))
cfg.MediaAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-mediaapi.db", *instanceName))
cfg.SyncAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-syncapi.db", *instanceName))
cfg.RoomServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-roomserver.db", *instanceName))
cfg.KeyServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-keyserver.db", *instanceName))
cfg.FederationAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-federationapi.db", *instanceName))
cfg.AppServiceAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-appservice.db", *instanceName))
cfg.MSCs.MSCs = []string{"msc2836", "msc2946"}
cfg.ClientAPI.RegistrationDisabled = false
cfg.ClientAPI.OpenRegistrationWithoutVerificationEnabled = true
if err := cfg.Derive(); err != nil {
panic(err)
}
base := base.NewBaseDendrite(cfg, "Monolith")
defer base.Close() // nolint: errcheck
federation := conn.CreateFederationClient(base, pQUIC) federation := conn.CreateFederationClient(base, pQUIC)
serverKeyAPI := &signing.YggdrasilKeys{} serverKeyAPI := &signing.YggdrasilKeys{}

View file

@ -67,14 +67,15 @@ func NewKeyChangeConsumer(
// Start consuming from key servers // Start consuming from key servers
func (t *KeyChangeConsumer) Start() error { func (t *KeyChangeConsumer) Start() error {
return jetstream.JetStreamConsumer( return jetstream.JetStreamConsumer(
t.ctx, t.jetstream, t.topic, t.durable, t.onMessage, t.ctx, t.jetstream, t.topic, t.durable, 1,
nats.DeliverAll(), nats.ManualAck(), t.onMessage, nats.DeliverAll(), nats.ManualAck(),
) )
} }
// onMessage is called in response to a message received on the // onMessage is called in response to a message received on the
// key change events topic from the key server. // key change events topic from the key server.
func (t *KeyChangeConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { func (t *KeyChangeConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
msg := msgs[0] // Guaranteed to exist if onMessage is called
var m api.DeviceMessage var m api.DeviceMessage
if err := json.Unmarshal(msg.Data, &m); err != nil { if err := json.Unmarshal(msg.Data, &m); err != nil {
logrus.WithError(err).Errorf("failed to read device message from key change topic") logrus.WithError(err).Errorf("failed to read device message from key change topic")

View file

@ -69,14 +69,15 @@ func (t *OutputPresenceConsumer) Start() error {
return nil return nil
} }
return jetstream.JetStreamConsumer( return jetstream.JetStreamConsumer(
t.ctx, t.jetstream, t.topic, t.durable, t.onMessage, t.ctx, t.jetstream, t.topic, t.durable, 1, t.onMessage,
nats.DeliverAll(), nats.ManualAck(), nats.HeadersOnly(), nats.DeliverAll(), nats.ManualAck(), nats.HeadersOnly(),
) )
} }
// onMessage is called in response to a message received on the presence // onMessage is called in response to a message received on the presence
// events topic from the client api. // events topic from the client api.
func (t *OutputPresenceConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { func (t *OutputPresenceConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
msg := msgs[0] // Guaranteed to exist if onMessage is called
// only send presence events which originated from us // only send presence events which originated from us
userID := msg.Header.Get(jetstream.UserID) userID := msg.Header.Get(jetstream.UserID)
_, serverName, err := gomatrixserverlib.SplitID('@', userID) _, serverName, err := gomatrixserverlib.SplitID('@', userID)

View file

@ -65,14 +65,15 @@ func NewOutputReceiptConsumer(
// Start consuming from the clientapi // Start consuming from the clientapi
func (t *OutputReceiptConsumer) Start() error { func (t *OutputReceiptConsumer) Start() error {
return jetstream.JetStreamConsumer( return jetstream.JetStreamConsumer(
t.ctx, t.jetstream, t.topic, t.durable, t.onMessage, t.ctx, t.jetstream, t.topic, t.durable, 1, t.onMessage,
nats.DeliverAll(), nats.ManualAck(), nats.HeadersOnly(), nats.DeliverAll(), nats.ManualAck(), nats.HeadersOnly(),
) )
} }
// onMessage is called in response to a message received on the receipt // onMessage is called in response to a message received on the receipt
// events topic from the client api. // events topic from the client api.
func (t *OutputReceiptConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { func (t *OutputReceiptConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
msg := msgs[0] // Guaranteed to exist if onMessage is called
receipt := syncTypes.OutputReceiptEvent{ receipt := syncTypes.OutputReceiptEvent{
UserID: msg.Header.Get(jetstream.UserID), UserID: msg.Header.Get(jetstream.UserID),
RoomID: msg.Header.Get(jetstream.RoomID), RoomID: msg.Header.Get(jetstream.RoomID),

View file

@ -68,8 +68,8 @@ func NewOutputRoomEventConsumer(
// Start consuming from room servers // Start consuming from room servers
func (s *OutputRoomEventConsumer) Start() error { func (s *OutputRoomEventConsumer) Start() error {
return jetstream.JetStreamConsumer( return jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.topic, s.durable, s.onMessage, s.ctx, s.jetstream, s.topic, s.durable, 1,
nats.DeliverAll(), nats.ManualAck(), s.onMessage, nats.DeliverAll(), nats.ManualAck(),
) )
} }
@ -77,7 +77,8 @@ func (s *OutputRoomEventConsumer) Start() error {
// It is unsafe to call this with messages for the same room in multiple gorountines // It is unsafe to call this with messages for the same room in multiple gorountines
// because updates it will likely fail with a types.EventIDMismatchError when it // because updates it will likely fail with a types.EventIDMismatchError when it
// realises that it cannot update the room state using the deltas. // realises that it cannot update the room state using the deltas.
func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
msg := msgs[0] // Guaranteed to exist if onMessage is called
// Parse out the event JSON // Parse out the event JSON
var output api.OutputEvent var output api.OutputEvent
if err := json.Unmarshal(msg.Data, &output); err != nil { if err := json.Unmarshal(msg.Data, &output); err != nil {

View file

@ -63,14 +63,15 @@ func NewOutputSendToDeviceConsumer(
// Start consuming from the client api // Start consuming from the client api
func (t *OutputSendToDeviceConsumer) Start() error { func (t *OutputSendToDeviceConsumer) Start() error {
return jetstream.JetStreamConsumer( return jetstream.JetStreamConsumer(
t.ctx, t.jetstream, t.topic, t.durable, t.onMessage, t.ctx, t.jetstream, t.topic, t.durable, 1,
nats.DeliverAll(), nats.ManualAck(), t.onMessage, nats.DeliverAll(), nats.ManualAck(),
) )
} }
// onMessage is called in response to a message received on the // onMessage is called in response to a message received on the
// send-to-device events topic from the client api. // send-to-device events topic from the client api.
func (t *OutputSendToDeviceConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { func (t *OutputSendToDeviceConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
msg := msgs[0] // Guaranteed to exist if onMessage is called
// only send send-to-device events which originated from us // only send send-to-device events which originated from us
sender := msg.Header.Get("sender") sender := msg.Header.Get("sender")
_, originServerName, err := gomatrixserverlib.SplitID('@', sender) _, originServerName, err := gomatrixserverlib.SplitID('@', sender)

View file

@ -62,14 +62,15 @@ func NewOutputTypingConsumer(
// Start consuming from the clientapi // Start consuming from the clientapi
func (t *OutputTypingConsumer) Start() error { func (t *OutputTypingConsumer) Start() error {
return jetstream.JetStreamConsumer( return jetstream.JetStreamConsumer(
t.ctx, t.jetstream, t.topic, t.durable, t.onMessage, t.ctx, t.jetstream, t.topic, t.durable, 1, t.onMessage,
nats.DeliverAll(), nats.ManualAck(), nats.HeadersOnly(), nats.DeliverAll(), nats.ManualAck(), nats.HeadersOnly(),
) )
} }
// onMessage is called in response to a message received on the typing // onMessage is called in response to a message received on the typing
// events topic from the client api. // events topic from the client api.
func (t *OutputTypingConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { func (t *OutputTypingConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
msg := msgs[0] // Guaranteed to exist if onMessage is called
// Extract the typing event from msg. // Extract the typing event from msg.
roomID := msg.Header.Get(jetstream.RoomID) roomID := msg.Header.Get(jetstream.RoomID)
userID := msg.Header.Get(jetstream.UserID) userID := msg.Header.Get(jetstream.UserID)

View file

@ -55,14 +55,15 @@ func NewDeviceListUpdateConsumer(
// Start consuming from key servers // Start consuming from key servers
func (t *DeviceListUpdateConsumer) Start() error { func (t *DeviceListUpdateConsumer) Start() error {
return jetstream.JetStreamConsumer( return jetstream.JetStreamConsumer(
t.ctx, t.jetstream, t.topic, t.durable, t.onMessage, t.ctx, t.jetstream, t.topic, t.durable, 1,
nats.DeliverAll(), nats.ManualAck(), t.onMessage, nats.DeliverAll(), nats.ManualAck(),
) )
} }
// onMessage is called in response to a message received on the // onMessage is called in response to a message received on the
// key change events topic from the key server. // key change events topic from the key server.
func (t *DeviceListUpdateConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { func (t *DeviceListUpdateConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
msg := msgs[0] // Guaranteed to exist if onMessage is called
var m gomatrixserverlib.DeviceListUpdateEvent var m gomatrixserverlib.DeviceListUpdateEvent
if err := json.Unmarshal(msg.Data, &m); err != nil { if err := json.Unmarshal(msg.Data, &m); err != nil {
logrus.WithError(err).Errorf("Failed to read from device list update input topic") logrus.WithError(err).Errorf("Failed to read from device list update input topic")

View file

@ -225,12 +225,7 @@ func loadConfig(
} }
privateKeyPath := absPath(basePath, c.Global.PrivateKeyPath) privateKeyPath := absPath(basePath, c.Global.PrivateKeyPath)
privateKeyData, err := readFile(privateKeyPath) if c.Global.KeyID, c.Global.PrivateKey, err = LoadMatrixKey(privateKeyPath, readFile); err != nil {
if err != nil {
return nil, err
}
if c.Global.KeyID, c.Global.PrivateKey, err = readKeyPEM(privateKeyPath, privateKeyData, true); err != nil {
return nil, err return nil, err
} }
@ -266,6 +261,14 @@ func loadConfig(
return &c, nil return &c, nil
} }
func LoadMatrixKey(privateKeyPath string, readFile func(string) ([]byte, error)) (gomatrixserverlib.KeyID, ed25519.PrivateKey, error) {
privateKeyData, err := readFile(privateKeyPath)
if err != nil {
return "", nil, err
}
return readKeyPEM(privateKeyPath, privateKeyData, true)
}
// Derive generates data that is derived from various values provided in // Derive generates data that is derived from various values provided in
// the config file. // the config file.
func (config *Dendrite) Derive() error { func (config *Dendrite) Derive() error {

View file

@ -9,9 +9,16 @@ import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
// JetStreamConsumer starts a durable consumer on the given subject with the
// given durable name. The function will be called when one or more messages
// is available, up to the maximum batch size specified. If the batch is set to
// 1 then messages will be delivered one at a time. If the function is called,
// the messages array is guaranteed to be at least 1 in size. Any provided NATS
// options will be passed through to the pull subscriber creation. The consumer
// will continue to run until the context expires, at which point it will stop.
func JetStreamConsumer( func JetStreamConsumer(
ctx context.Context, js nats.JetStreamContext, subj, durable string, ctx context.Context, js nats.JetStreamContext, subj, durable string, batch int,
f func(ctx context.Context, msg *nats.Msg) bool, f func(ctx context.Context, msgs []*nats.Msg) bool,
opts ...nats.SubOpt, opts ...nats.SubOpt,
) error { ) error {
defer func() { defer func() {
@ -27,6 +34,14 @@ func JetStreamConsumer(
} }
}() }()
// If the batch size is greater than 1, we will want to acknowledge all
// received messages in the batch. Below we will send an acknowledgement
// for the most recent message in the batch and AckAll will ensure that
// all messages that came before it are also acknowledged implicitly.
if batch > 1 {
opts = append(opts, nats.AckAll())
}
name := durable + "Pull" name := durable + "Pull"
sub, err := js.PullSubscribe(subj, name, opts...) sub, err := js.PullSubscribe(subj, name, opts...)
if err != nil { if err != nil {
@ -50,7 +65,7 @@ func JetStreamConsumer(
// enforce its own deadline (roughly 5 seconds by default). Therefore // enforce its own deadline (roughly 5 seconds by default). Therefore
// it is our responsibility to check whether our context expired or // it is our responsibility to check whether our context expired or
// not when a context error is returned. Footguns. Footguns everywhere. // not when a context error is returned. Footguns. Footguns everywhere.
msgs, err := sub.Fetch(1, nats.Context(ctx)) msgs, err := sub.Fetch(batch, nats.Context(ctx))
if err != nil { if err != nil {
if err == context.Canceled || err == context.DeadlineExceeded { if err == context.Canceled || err == context.DeadlineExceeded {
// Work out whether it was the JetStream context that expired // Work out whether it was the JetStream context that expired
@ -74,13 +89,13 @@ func JetStreamConsumer(
if len(msgs) < 1 { if len(msgs) < 1 {
continue continue
} }
msg := msgs[0] msg := msgs[len(msgs)-1] // most recent message, in case of AckAll
if err = msg.InProgress(nats.Context(ctx)); err != nil { if err = msg.InProgress(nats.Context(ctx)); err != nil {
logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.InProgress: %w", err)) logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.InProgress: %w", err))
sentry.CaptureException(err) sentry.CaptureException(err)
continue continue
} }
if f(ctx, msg) { if f(ctx, msgs) {
if err = msg.AckSync(nats.Context(ctx)); err != nil { if err = msg.AckSync(nats.Context(ctx)); err != nil {
logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.AckSync: %w", err)) logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.AckSync: %w", err))
sentry.CaptureException(err) sentry.CaptureException(err)

View file

@ -75,15 +75,16 @@ func NewOutputClientDataConsumer(
// Start consuming from room servers // Start consuming from room servers
func (s *OutputClientDataConsumer) Start() error { func (s *OutputClientDataConsumer) Start() error {
return jetstream.JetStreamConsumer( return jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.topic, s.durable, s.onMessage, s.ctx, s.jetstream, s.topic, s.durable, 1,
nats.DeliverAll(), nats.ManualAck(), s.onMessage, nats.DeliverAll(), nats.ManualAck(),
) )
} }
// onMessage is called when the sync server receives a new event from the client API server output log. // onMessage is called when the sync server receives a new event from the client API server output log.
// It is not safe for this function to be called from multiple goroutines, or else the // It is not safe for this function to be called from multiple goroutines, or else the
// sync stream position may race and be incorrectly calculated. // sync stream position may race and be incorrectly calculated.
func (s *OutputClientDataConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { func (s *OutputClientDataConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
msg := msgs[0] // Guaranteed to exist if onMessage is called
// Parse out the event JSON // Parse out the event JSON
userID := msg.Header.Get(jetstream.UserID) userID := msg.Header.Get(jetstream.UserID)
var output eventutil.AccountData var output eventutil.AccountData

View file

@ -75,12 +75,13 @@ func NewOutputKeyChangeEventConsumer(
// Start consuming from the key server // Start consuming from the key server
func (s *OutputKeyChangeEventConsumer) Start() error { func (s *OutputKeyChangeEventConsumer) Start() error {
return jetstream.JetStreamConsumer( return jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.topic, s.durable, s.onMessage, s.ctx, s.jetstream, s.topic, s.durable, 1,
nats.DeliverAll(), nats.ManualAck(), s.onMessage, nats.DeliverAll(), nats.ManualAck(),
) )
} }
func (s *OutputKeyChangeEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { func (s *OutputKeyChangeEventConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
msg := msgs[0] // Guaranteed to exist if onMessage is called
var m api.DeviceMessage var m api.DeviceMessage
if err := json.Unmarshal(msg.Data, &m); err != nil { if err := json.Unmarshal(msg.Data, &m); err != nil {
logrus.WithError(err).Errorf("failed to read device message from key change topic") logrus.WithError(err).Errorf("failed to read device message from key change topic")

View file

@ -128,12 +128,13 @@ func (s *PresenceConsumer) Start() error {
return nil return nil
} }
return jetstream.JetStreamConsumer( return jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.presenceTopic, s.durable, s.onMessage, s.ctx, s.jetstream, s.presenceTopic, s.durable, 1, s.onMessage,
nats.DeliverAll(), nats.ManualAck(), nats.HeadersOnly(), nats.DeliverAll(), nats.ManualAck(), nats.HeadersOnly(),
) )
} }
func (s *PresenceConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { func (s *PresenceConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
msg := msgs[0] // Guaranteed to exist if onMessage is called
userID := msg.Header.Get(jetstream.UserID) userID := msg.Header.Get(jetstream.UserID)
presence := msg.Header.Get("presence") presence := msg.Header.Get("presence")
timestamp := msg.Header.Get("last_active_ts") timestamp := msg.Header.Get("last_active_ts")

View file

@ -74,12 +74,13 @@ func NewOutputReceiptEventConsumer(
// Start consuming receipts events. // Start consuming receipts events.
func (s *OutputReceiptEventConsumer) Start() error { func (s *OutputReceiptEventConsumer) Start() error {
return jetstream.JetStreamConsumer( return jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.topic, s.durable, s.onMessage, s.ctx, s.jetstream, s.topic, s.durable, 1,
nats.DeliverAll(), nats.ManualAck(), s.onMessage, nats.DeliverAll(), nats.ManualAck(),
) )
} }
func (s *OutputReceiptEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { func (s *OutputReceiptEventConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
msg := msgs[0] // Guaranteed to exist if onMessage is called
output := types.OutputReceiptEvent{ output := types.OutputReceiptEvent{
UserID: msg.Header.Get(jetstream.UserID), UserID: msg.Header.Get(jetstream.UserID),
RoomID: msg.Header.Get(jetstream.RoomID), RoomID: msg.Header.Get(jetstream.RoomID),

View file

@ -79,15 +79,16 @@ func NewOutputRoomEventConsumer(
// Start consuming from room servers // Start consuming from room servers
func (s *OutputRoomEventConsumer) Start() error { func (s *OutputRoomEventConsumer) Start() error {
return jetstream.JetStreamConsumer( return jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.topic, s.durable, s.onMessage, s.ctx, s.jetstream, s.topic, s.durable, 1,
nats.DeliverAll(), nats.ManualAck(), s.onMessage, nats.DeliverAll(), nats.ManualAck(),
) )
} }
// onMessage is called when the sync server receives a new event from the room server output log. // onMessage is called when the sync server receives a new event from the room server output log.
// It is not safe for this function to be called from multiple goroutines, or else the // It is not safe for this function to be called from multiple goroutines, or else the
// sync stream position may race and be incorrectly calculated. // sync stream position may race and be incorrectly calculated.
func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
msg := msgs[0] // Guaranteed to exist if onMessage is called
// Parse out the event JSON // Parse out the event JSON
var err error var err error
var output api.OutputEvent var output api.OutputEvent

View file

@ -68,12 +68,13 @@ func NewOutputSendToDeviceEventConsumer(
// Start consuming send-to-device events. // Start consuming send-to-device events.
func (s *OutputSendToDeviceEventConsumer) Start() error { func (s *OutputSendToDeviceEventConsumer) Start() error {
return jetstream.JetStreamConsumer( return jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.topic, s.durable, s.onMessage, s.ctx, s.jetstream, s.topic, s.durable, 1,
nats.DeliverAll(), nats.ManualAck(), s.onMessage, nats.DeliverAll(), nats.ManualAck(),
) )
} }
func (s *OutputSendToDeviceEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { func (s *OutputSendToDeviceEventConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
msg := msgs[0] // Guaranteed to exist if onMessage is called
userID := msg.Header.Get(jetstream.UserID) userID := msg.Header.Get(jetstream.UserID)
_, domain, err := gomatrixserverlib.SplitID('@', userID) _, domain, err := gomatrixserverlib.SplitID('@', userID)
if err != nil { if err != nil {

View file

@ -64,12 +64,13 @@ func NewOutputTypingEventConsumer(
// Start consuming typing events. // Start consuming typing events.
func (s *OutputTypingEventConsumer) Start() error { func (s *OutputTypingEventConsumer) Start() error {
return jetstream.JetStreamConsumer( return jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.topic, s.durable, s.onMessage, s.ctx, s.jetstream, s.topic, s.durable, 1,
nats.DeliverAll(), nats.ManualAck(), s.onMessage, nats.DeliverAll(), nats.ManualAck(),
) )
} }
func (s *OutputTypingEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { func (s *OutputTypingEventConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
msg := msgs[0] // Guaranteed to exist if onMessage is called
roomID := msg.Header.Get(jetstream.RoomID) roomID := msg.Header.Get(jetstream.RoomID)
userID := msg.Header.Get(jetstream.UserID) userID := msg.Header.Get(jetstream.UserID)
typing, err := strconv.ParseBool(msg.Header.Get("typing")) typing, err := strconv.ParseBool(msg.Header.Get("typing"))

View file

@ -67,8 +67,8 @@ func NewOutputNotificationDataConsumer(
// Start starts consumption. // Start starts consumption.
func (s *OutputNotificationDataConsumer) Start() error { func (s *OutputNotificationDataConsumer) Start() error {
return jetstream.JetStreamConsumer( return jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.topic, s.durable, s.onMessage, s.ctx, s.jetstream, s.topic, s.durable, 1,
nats.DeliverAll(), nats.ManualAck(), s.onMessage, nats.DeliverAll(), nats.ManualAck(),
) )
} }
@ -76,7 +76,8 @@ func (s *OutputNotificationDataConsumer) Start() error {
// the push server. It is not safe for this function to be called from // the push server. It is not safe for this function to be called from
// multiple goroutines, or else the sync stream position may race and // multiple goroutines, or else the sync stream position may race and
// be incorrectly calculated. // be incorrectly calculated.
func (s *OutputNotificationDataConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { func (s *OutputNotificationDataConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
msg := msgs[0] // Guaranteed to exist if onMessage is called
userID := string(msg.Header.Get(jetstream.UserID)) userID := string(msg.Header.Get(jetstream.UserID))
// Parse out the event JSON // Parse out the event JSON

View file

@ -15,6 +15,7 @@
package test package test
import ( import (
"crypto/ed25519"
"crypto/rand" "crypto/rand"
"crypto/rsa" "crypto/rsa"
"crypto/x509" "crypto/x509"
@ -44,6 +45,10 @@ func NewMatrixKey(matrixKeyPath string) (err error) {
if err != nil { if err != nil {
return err return err
} }
return SaveMatrixKey(matrixKeyPath, data[3:])
}
func SaveMatrixKey(matrixKeyPath string, data ed25519.PrivateKey) error {
keyOut, err := os.OpenFile(matrixKeyPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600) keyOut, err := os.OpenFile(matrixKeyPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
if err != nil { if err != nil {
return err return err
@ -62,7 +67,7 @@ func NewMatrixKey(matrixKeyPath string) (err error) {
Headers: map[string]string{ Headers: map[string]string{
"Key-ID": fmt.Sprintf("ed25519:%s", keyID[:6]), "Key-ID": fmt.Sprintf("ed25519:%s", keyID[:6]),
}, },
Bytes: data[3:], Bytes: data,
}) })
return err return err
} }

View file

@ -56,15 +56,16 @@ func NewOutputReadUpdateConsumer(
func (s *OutputReadUpdateConsumer) Start() error { func (s *OutputReadUpdateConsumer) Start() error {
if err := jetstream.JetStreamConsumer( if err := jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.topic, s.durable, s.onMessage, s.ctx, s.jetstream, s.topic, s.durable, 1,
nats.DeliverAll(), nats.ManualAck(), s.onMessage, nats.DeliverAll(), nats.ManualAck(),
); err != nil { ); err != nil {
return err return err
} }
return nil return nil
} }
func (s *OutputReadUpdateConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { func (s *OutputReadUpdateConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
msg := msgs[0] // Guaranteed to exist if onMessage is called
var read types.ReadUpdate var read types.ReadUpdate
if err := json.Unmarshal(msg.Data, &read); err != nil { if err := json.Unmarshal(msg.Data, &read); err != nil {
log.WithError(err).Error("userapi clientapi consumer: message parse failure") log.WithError(err).Error("userapi clientapi consumer: message parse failure")

View file

@ -65,15 +65,16 @@ func NewOutputStreamEventConsumer(
func (s *OutputStreamEventConsumer) Start() error { func (s *OutputStreamEventConsumer) Start() error {
if err := jetstream.JetStreamConsumer( if err := jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.topic, s.durable, s.onMessage, s.ctx, s.jetstream, s.topic, s.durable, 1,
nats.DeliverAll(), nats.ManualAck(), s.onMessage, nats.DeliverAll(), nats.ManualAck(),
); err != nil { ); err != nil {
return err return err
} }
return nil return nil
} }
func (s *OutputStreamEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { func (s *OutputStreamEventConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
msg := msgs[0] // Guaranteed to exist if onMessage is called
var output types.StreamedEvent var output types.StreamedEvent
output.Event = &gomatrixserverlib.HeaderedEvent{} output.Event = &gomatrixserverlib.HeaderedEvent{}
if err := json.Unmarshal(msg.Data, &output); err != nil { if err := json.Unmarshal(msg.Data, &output); err != nil {