Merge branch 'master' into piotr-kozimor/threepid-sessions-in-userapi

This commit is contained in:
PiotrKozimor 2022-01-17 15:04:46 +01:00 committed by GitHub
commit c2112eb536
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 113 additions and 26 deletions

View file

@ -34,6 +34,7 @@ import (
type OutputRoomEventConsumer struct {
ctx context.Context
jetstream nats.JetStreamContext
durable nats.SubOpt
topic string
asDB storage.Database
rsAPI api.RoomserverInternalAPI
@ -54,6 +55,7 @@ func NewOutputRoomEventConsumer(
return &OutputRoomEventConsumer{
ctx: process.Context(),
jetstream: js,
durable: cfg.Global.JetStream.Durable("AppserviceRoomserverConsumer"),
topic: cfg.Global.JetStream.TopicFor(jetstream.OutputRoomEvent),
asDB: appserviceDB,
rsAPI: rsAPI,
@ -64,7 +66,7 @@ func NewOutputRoomEventConsumer(
// Start consuming from room servers
func (s *OutputRoomEventConsumer) Start() error {
_, err := s.jetstream.Subscribe(s.topic, s.onMessage)
_, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable)
return err
}

View file

@ -99,10 +99,6 @@ func main() {
}
keyRing := fsAPI.KeyRing()
// The underlying roomserver implementation needs to be able to call the fedsender.
// This is different to rsAPI which can be the http client which doesn't need this dependency
rsImpl.SetFederationAPI(fsAPI, keyRing)
keyImpl := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, fsAPI)
keyAPI := keyImpl
if base.UseHTTPAPIs {

View file

@ -34,6 +34,7 @@ import (
type OutputEDUConsumer struct {
ctx context.Context
jetstream nats.JetStreamContext
durable nats.SubOpt
db storage.Database
queues *queue.OutgoingQueues
ServerName gomatrixserverlib.ServerName
@ -56,6 +57,7 @@ func NewOutputEDUConsumer(
queues: queues,
db: store,
ServerName: cfg.Matrix.ServerName,
durable: cfg.Matrix.JetStream.Durable("FederationAPIEDUServerConsumer"),
typingTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent),
sendToDeviceTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent),
receiptTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent),
@ -64,13 +66,13 @@ func NewOutputEDUConsumer(
// Start consuming from EDU servers
func (t *OutputEDUConsumer) Start() error {
if _, err := t.jetstream.Subscribe(t.typingTopic, t.onTypingEvent); err != nil {
if _, err := t.jetstream.Subscribe(t.typingTopic, t.onTypingEvent, t.durable); err != nil {
return err
}
if _, err := t.jetstream.Subscribe(t.sendToDeviceTopic, t.onSendToDeviceEvent); err != nil {
if _, err := t.jetstream.Subscribe(t.sendToDeviceTopic, t.onSendToDeviceEvent, t.durable); err != nil {
return err
}
if _, err := t.jetstream.Subscribe(t.receiptTopic, t.onReceiptEvent); err != nil {
if _, err := t.jetstream.Subscribe(t.receiptTopic, t.onReceiptEvent, t.durable); err != nil {
return err
}
return nil

View file

@ -37,6 +37,7 @@ type OutputRoomEventConsumer struct {
cfg *config.FederationAPI
rsAPI api.RoomserverInternalAPI
jetstream nats.JetStreamContext
durable nats.SubOpt
db storage.Database
queues *queue.OutgoingQueues
topic string
@ -58,13 +59,14 @@ func NewOutputRoomEventConsumer(
db: store,
queues: queues,
rsAPI: rsAPI,
durable: cfg.Matrix.JetStream.Durable("FederationAPIRoomServerConsumer"),
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent),
}
}
// Start consuming from room servers
func (s *OutputRoomEventConsumer) Start() error {
_, err := s.jetstream.Subscribe(s.topic, s.onMessage)
_, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable)
return err
}

View file

@ -68,6 +68,13 @@ func TestMain(m *testing.M) {
panic("can't create cache: " + err.Error())
}
// Create a temporary directory for JetStream.
d, err := ioutil.TempDir("./", "jetstream*")
if err != nil {
panic(err)
}
defer os.RemoveAll(d)
// Draw up just enough Dendrite config for the server key
// API to work.
cfg := &config.Dendrite{}
@ -75,6 +82,8 @@ func TestMain(m *testing.M) {
cfg.Global.ServerName = gomatrixserverlib.ServerName(s.name)
cfg.Global.PrivateKey = testPriv
cfg.Global.JetStream.InMemory = true
cfg.Global.JetStream.TopicPrefix = string(s.name[:1])
cfg.Global.JetStream.StoragePath = config.Path(d)
cfg.Global.KeyID = serverKeyID
cfg.Global.KeyValidityPeriod = s.validity
cfg.FederationAPI.Database.ConnectionString = config.DataSource("file::memory:")

View file

@ -16,6 +16,7 @@ import (
"context"
"net/http"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
@ -29,15 +30,42 @@ func GetEventAuth(
roomID string,
eventID string,
) util.JSONResponse {
// TODO: Optimisation: we shouldn't be querying all the room state
// that is in state.StateEvents - we just ignore it.
state, err := getState(ctx, request, rsAPI, roomID, eventID)
event, resErr := fetchEvent(ctx, rsAPI, eventID)
if resErr != nil {
return *resErr
}
if event.RoomID() != roomID {
return util.JSONResponse{Code: http.StatusNotFound, JSON: jsonerror.NotFound("event does not belong to this room")}
}
resErr = allowedToSeeEvent(ctx, request.Origin(), rsAPI, eventID)
if resErr != nil {
return *resErr
}
var response api.QueryStateAndAuthChainResponse
err := rsAPI.QueryStateAndAuthChain(
ctx,
&api.QueryStateAndAuthChainRequest{
RoomID: roomID,
PrevEventIDs: []string{eventID},
AuthEventIDs: event.AuthEventIDs(),
OnlyFetchAuthChain: true,
},
&response,
)
if err != nil {
return *err
return util.ErrorResponse(err)
}
if !response.RoomExists {
return util.JSONResponse{Code: http.StatusNotFound, JSON: nil}
}
return util.JSONResponse{
Code: http.StatusOK,
JSON: gomatrixserverlib.RespEventAuth{AuthEvents: state.AuthEvents},
JSON: gomatrixserverlib.RespEventAuth{
AuthEvents: gomatrixserverlib.UnwrapEventHeaders(response.AuthChainEvents),
},
}
}

View file

@ -226,6 +226,10 @@ type QueryStateAndAuthChainRequest struct {
PrevEventIDs []string `json:"prev_event_ids"`
// The list of auth events for the event. Used to calculate the auth chain
AuthEventIDs []string `json:"auth_event_ids"`
// If true, the auth chain events for the auth event IDs given will be fetched only. Prev event IDs are ignored.
// If false, state and auth chain events for the prev event IDs and entire current state will be included.
// TODO: not a great API shape. It serves 2 main uses: false=>response for send_join, true=>response for /event_auth
OnlyFetchAuthChain bool `json:"only_fetch_auth_chain"`
// Should state resolution be ran on the result events?
// TODO: check call sites and remove if we always want to do state res
ResolveState bool `json:"resolve_state"`

View file

@ -67,14 +67,12 @@ func NewRoomserverAPI(
InputRoomEventTopic: inputRoomEventTopic,
OutputRoomEventTopic: outputRoomEventTopic,
JetStream: consumer,
Durable: cfg.Matrix.JetStream.Durable("RoomserverInputConsumer"),
ServerName: cfg.Matrix.ServerName,
ACLs: serverACLs,
},
// perform-er structs get initialised when we have a federation sender to use
}
if err := a.Inputer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start roomserver input API")
}
return a
}
@ -140,6 +138,10 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.FederationInternalA
r.Forgetter = &perform.Forgetter{
DB: r.DB,
}
if err := r.Inputer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start roomserver input API")
}
}
func (r *RoomserverInternalAPI) SetAppserviceAPI(asAPI asAPI.AppServiceQueryAPI) {

View file

@ -43,6 +43,7 @@ var keyContentFields = map[string]string{
type Inputer struct {
DB storage.Database
JetStream nats.JetStreamContext
Durable nats.SubOpt
ServerName gomatrixserverlib.ServerName
ACLs *acls.ServerACLs
InputRoomEventTopic string
@ -59,14 +60,15 @@ func (r *Inputer) Start() error {
// later, possibly with an error response to the inputter if synchronous.
func(msg *nats.Msg) {
roomID := msg.Header.Get("room_id")
defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec()
var inputRoomEvent api.InputRoomEvent
if err := json.Unmarshal(msg.Data, &inputRoomEvent); err != nil {
_ = msg.Term()
return
}
inbox, _ := r.workers.LoadOrStore(roomID, &phony.Inbox{})
roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc()
inbox.(*phony.Inbox).Act(nil, func() {
defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec()
if err := r.processRoomEvent(context.TODO(), &inputRoomEvent); err != nil {
sentry.CaptureException(err)
} else {
@ -84,6 +86,8 @@ func (r *Inputer) Start() error {
// or nak them within a certain amount of time. This stops that from
// happening, so we don't end up doing a lot of unnecessary duplicate work.
nats.MaxDeliver(0),
// Use a durable named consumer.
r.Durable,
)
return err
}
@ -111,15 +115,17 @@ func (r *Inputer) InputRoomEvents(
if _, err = r.JetStream.PublishMsg(msg); err != nil {
return
}
roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc()
}
} else {
responses := make(chan error, len(request.InputRoomEvents))
defer close(responses)
for _, e := range request.InputRoomEvents {
inputRoomEvent := e
inbox, _ := r.workers.LoadOrStore(inputRoomEvent.Event.RoomID(), &phony.Inbox{})
roomID := inputRoomEvent.Event.RoomID()
inbox, _ := r.workers.LoadOrStore(roomID, &phony.Inbox{})
roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc()
inbox.(*phony.Inbox).Act(nil, func() {
defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec()
err := r.processRoomEvent(context.TODO(), &inputRoomEvent)
if err != nil {
sentry.CaptureException(err)

View file

@ -457,6 +457,21 @@ func (r *Queryer) QueryStateAndAuthChain(
response.RoomExists = true
response.RoomVersion = info.RoomVersion
// handle this entirely separately to the other case so we don't have to pull out
// the entire current state of the room
// TODO: this probably means it should be a different query operation...
if request.OnlyFetchAuthChain {
var authEvents []*gomatrixserverlib.Event
authEvents, err = GetAuthChain(ctx, r.DB.EventsFromIDs, request.AuthEventIDs)
if err != nil {
return err
}
for _, event := range authEvents {
response.AuthChainEvents = append(response.AuthChainEvents, event.Headered(info.RoomVersion))
}
return nil
}
var stateEvents []*gomatrixserverlib.Event
stateEvents, err = r.loadStateAtEventIDs(ctx, *info, request.PrevEventIDs)
if err != nil {

View file

@ -2,6 +2,8 @@ package config
import (
"fmt"
"github.com/nats-io/nats.go"
)
type JetStream struct {
@ -23,6 +25,10 @@ func (c *JetStream) TopicFor(name string) string {
return fmt.Sprintf("%s%s", c.TopicPrefix, name)
}
func (c *JetStream) Durable(name string) nats.SubOpt {
return nats.Durable(c.TopicFor(name))
}
func (c *JetStream) Defaults(generate bool) {
c.Addresses = []string{}
c.TopicPrefix = "Dendrite"

View file

@ -75,13 +75,18 @@ func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (nats.JetStreamContex
}
if info == nil {
stream.Subjects = []string{name}
// If we're trying to keep everything in memory (e.g. unit tests)
// then overwrite the storage policy.
if cfg.InMemory {
stream.Storage = nats.MemoryStorage
}
if _, err = s.AddStream(stream); err != nil {
// Namespace the streams without modifying the original streams
// array, otherwise we end up with namespaces on namespaces.
namespaced := *stream
namespaced.Name = name
if _, err = s.AddStream(&namespaced); err != nil {
logrus.WithError(err).WithField("stream", name).Fatal("Unable to add stream")
}
}

View file

@ -34,6 +34,7 @@ import (
type OutputClientDataConsumer struct {
ctx context.Context
jetstream nats.JetStreamContext
durable nats.SubOpt
topic string
db storage.Database
stream types.StreamProvider
@ -53,6 +54,7 @@ func NewOutputClientDataConsumer(
ctx: process.Context(),
jetstream: js,
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData),
durable: cfg.Matrix.JetStream.Durable("SyncAPIClientAPIConsumer"),
db: store,
notifier: notifier,
stream: stream,
@ -61,7 +63,7 @@ func NewOutputClientDataConsumer(
// Start consuming from room servers
func (s *OutputClientDataConsumer) Start() error {
_, err := s.jetstream.Subscribe(s.topic, s.onMessage)
_, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable)
return err
}

View file

@ -34,6 +34,7 @@ import (
type OutputReceiptEventConsumer struct {
ctx context.Context
jetstream nats.JetStreamContext
durable nats.SubOpt
topic string
db storage.Database
stream types.StreamProvider
@ -54,6 +55,7 @@ func NewOutputReceiptEventConsumer(
ctx: process.Context(),
jetstream: js,
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent),
durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerReceiptConsumer"),
db: store,
notifier: notifier,
stream: stream,
@ -62,7 +64,7 @@ func NewOutputReceiptEventConsumer(
// Start consuming from EDU api
func (s *OutputReceiptEventConsumer) Start() error {
_, err := s.jetstream.Subscribe(s.topic, s.onMessage)
_, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable)
return err
}

View file

@ -36,6 +36,7 @@ import (
type OutputSendToDeviceEventConsumer struct {
ctx context.Context
jetstream nats.JetStreamContext
durable nats.SubOpt
topic string
db storage.Database
serverName gomatrixserverlib.ServerName // our server name
@ -57,6 +58,7 @@ func NewOutputSendToDeviceEventConsumer(
ctx: process.Context(),
jetstream: js,
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent),
durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerSendToDeviceConsumer"),
db: store,
serverName: cfg.Matrix.ServerName,
notifier: notifier,
@ -66,7 +68,7 @@ func NewOutputSendToDeviceEventConsumer(
// Start consuming from EDU api
func (s *OutputSendToDeviceEventConsumer) Start() error {
_, err := s.jetstream.Subscribe(s.topic, s.onMessage)
_, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable)
return err
}

View file

@ -35,6 +35,7 @@ import (
type OutputTypingEventConsumer struct {
ctx context.Context
jetstream nats.JetStreamContext
durable nats.SubOpt
topic string
eduCache *cache.EDUCache
stream types.StreamProvider
@ -56,6 +57,7 @@ func NewOutputTypingEventConsumer(
ctx: process.Context(),
jetstream: js,
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent),
durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerTypingConsumer"),
eduCache: eduCache,
notifier: notifier,
stream: stream,
@ -64,7 +66,7 @@ func NewOutputTypingEventConsumer(
// Start consuming from EDU api
func (s *OutputTypingEventConsumer) Start() error {
_, err := s.jetstream.Subscribe(s.topic, s.onMessage)
_, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable)
return err
}

View file

@ -38,6 +38,7 @@ type OutputRoomEventConsumer struct {
cfg *config.SyncAPI
rsAPI api.RoomserverInternalAPI
jetstream nats.JetStreamContext
durable nats.SubOpt
topic string
db storage.Database
pduStream types.StreamProvider
@ -61,6 +62,7 @@ func NewOutputRoomEventConsumer(
cfg: cfg,
jetstream: js,
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent),
durable: cfg.Matrix.JetStream.Durable("SyncAPIRoomServerConsumer"),
db: store,
notifier: notifier,
pduStream: pduStream,
@ -71,7 +73,7 @@ func NewOutputRoomEventConsumer(
// Start consuming from room servers
func (s *OutputRoomEventConsumer) Start() error {
_, err := s.jetstream.Subscribe(s.topic, s.onMessage)
_, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable)
return err
}