Move receipt sending to own JetStream producer

This commit is contained in:
Till Faelligen 2022-03-22 09:59:44 +01:00
parent 9572f5ed19
commit 8e3d4d573a
22 changed files with 143 additions and 149 deletions

View file

@ -55,7 +55,8 @@ func AddPublicRoutes(
syncProducer := &producers.SyncAPIProducer{ syncProducer := &producers.SyncAPIProducer{
JetStream: js, JetStream: js,
Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData), TopicClientData: cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData),
TopicReceiptEvent: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent),
} }
routing.Setup( routing.Setup(

View file

@ -15,24 +15,28 @@
package producers package producers
import ( import (
"context"
"encoding/json" "encoding/json"
"strconv"
"github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
// SyncAPIProducer produces events for the sync API server to consume // SyncAPIProducer produces events for the sync API server to consume
type SyncAPIProducer struct { type SyncAPIProducer struct {
Topic string TopicClientData string
TopicReceiptEvent string
JetStream nats.JetStreamContext JetStream nats.JetStreamContext
} }
// SendData sends account data to the sync API server // SendData sends account data to the sync API server
func (p *SyncAPIProducer) SendData(userID string, roomID string, dataType string, readMarker *eventutil.ReadMarkerJSON) error { func (p *SyncAPIProducer) SendData(userID string, roomID string, dataType string, readMarker *eventutil.ReadMarkerJSON) error {
m := &nats.Msg{ m := &nats.Msg{
Subject: p.Topic, Subject: p.TopicClientData,
Header: nats.Header{}, Header: nats.Header{},
} }
m.Header.Set(jetstream.UserID, userID) m.Header.Set(jetstream.UserID, userID)
@ -52,8 +56,27 @@ func (p *SyncAPIProducer) SendData(userID string, roomID string, dataType string
"user_id": userID, "user_id": userID,
"room_id": roomID, "room_id": roomID,
"data_type": dataType, "data_type": dataType,
}).Tracef("Producing to topic '%s'", p.Topic) }).Tracef("Producing to topic '%s'", p.TopicClientData)
_, err = p.JetStream.PublishMsg(m) _, err = p.JetStream.PublishMsg(m)
return err return err
} }
func (p *SyncAPIProducer) SendReceipt(
ctx context.Context,
userID, roomID, eventID, receiptType string, timestamp gomatrixserverlib.Timestamp,
) error {
m := &nats.Msg{
Subject: p.TopicReceiptEvent,
Header: nats.Header{},
}
m.Header.Set(jetstream.UserID, userID)
m.Header.Set(jetstream.RoomID, roomID)
m.Header.Set(jetstream.EventID, eventID)
m.Header.Set("type", receiptType)
m.Header.Set("timestamp", strconv.Itoa(int(timestamp)))
log.WithFields(log.Fields{}).Tracef("Producing to topic '%s'", p.TopicReceiptEvent)
_, err := p.JetStream.PublishMsg(m, nats.Context(ctx))
return err
}

View file

@ -23,7 +23,6 @@ import (
"github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/clientapi/producers"
eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/internal/eventutil"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/dendrite/userapi/api"
@ -146,7 +145,7 @@ type fullyReadEvent struct {
// SaveReadMarker implements POST /rooms/{roomId}/read_markers // SaveReadMarker implements POST /rooms/{roomId}/read_markers
func SaveReadMarker( func SaveReadMarker(
req *http.Request, req *http.Request,
userAPI api.UserInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI, eduAPI eduserverAPI.EDUServerInputAPI, userAPI api.UserInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI,
syncProducer *producers.SyncAPIProducer, device *api.Device, roomID string, syncProducer *producers.SyncAPIProducer, device *api.Device, roomID string,
) util.JSONResponse { ) util.JSONResponse {
// Verify that the user is a member of this room // Verify that the user is a member of this room
@ -192,7 +191,7 @@ func SaveReadMarker(
// Handle the read receipt that may be included in the read marker // Handle the read receipt that may be included in the read marker
if r.Read != "" { if r.Read != "" {
return SetReceipt(req, eduAPI, device, roomID, "m.read", r.Read) return SetReceipt(req, syncProducer, device, roomID, "m.read", r.Read)
} }
return util.JSONResponse{ return util.JSONResponse{

View file

@ -19,21 +19,20 @@ import (
"net/http" "net/http"
"time" "time"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/dendrite/eduserver/api"
userapi "github.com/matrix-org/dendrite/userapi/api" userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/util" "github.com/matrix-org/util"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
func SetReceipt(req *http.Request, eduAPI api.EDUServerInputAPI, device *userapi.Device, roomId, receiptType, eventId string) util.JSONResponse { func SetReceipt(req *http.Request, syncProducer *producers.SyncAPIProducer, device *userapi.Device, roomID, receiptType, eventID string) util.JSONResponse {
timestamp := gomatrixserverlib.AsTimestamp(time.Now()) timestamp := gomatrixserverlib.AsTimestamp(time.Now())
logrus.WithFields(logrus.Fields{ logrus.WithFields(logrus.Fields{
"roomId": roomId, "roomID": roomID,
"receiptType": receiptType, "receiptType": receiptType,
"eventId": eventId, "eventID": eventID,
"userId": device.UserID, "userId": device.UserID,
"timestamp": timestamp, "timestamp": timestamp,
}).Debug("Setting receipt") }).Debug("Setting receipt")
@ -43,7 +42,7 @@ func SetReceipt(req *http.Request, eduAPI api.EDUServerInputAPI, device *userapi
return util.MessageResponse(400, fmt.Sprintf("receipt type must be m.read not '%s'", receiptType)) return util.MessageResponse(400, fmt.Sprintf("receipt type must be m.read not '%s'", receiptType))
} }
if err := api.SendReceipt(req.Context(), eduAPI, device.UserID, roomId, eventId, receiptType, timestamp); err != nil { if err := syncProducer.SendReceipt(req.Context(), device.UserID, roomID, eventID, receiptType, timestamp); err != nil {
return util.ErrorResponse(err) return util.ErrorResponse(err)
} }

View file

@ -942,7 +942,7 @@ func Setup(
if err != nil { if err != nil {
return util.ErrorResponse(err) return util.ErrorResponse(err)
} }
return SaveReadMarker(req, userAPI, rsAPI, eduAPI, syncProducer, device, vars["roomID"]) return SaveReadMarker(req, userAPI, rsAPI, syncProducer, device, vars["roomID"])
}), }),
).Methods(http.MethodPost, http.MethodOptions) ).Methods(http.MethodPost, http.MethodOptions)
@ -1297,7 +1297,7 @@ func Setup(
return util.ErrorResponse(err) return util.ErrorResponse(err)
} }
return SetReceipt(req, eduAPI, device, vars["roomId"], vars["receiptType"], vars["eventId"]) return SetReceipt(req, syncProducer, device, vars["roomId"], vars["receiptType"], vars["eventId"])
}), }),
).Methods(http.MethodPost, http.MethodOptions) ).Methods(http.MethodPost, http.MethodOptions)
} }

View file

@ -29,7 +29,7 @@ func FederationAPI(base *basepkg.BaseDendrite, cfg *config.Dendrite) {
keyRing := fsAPI.KeyRing() keyRing := fsAPI.KeyRing()
federationapi.AddPublicRoutes( federationapi.AddPublicRoutes(
base.PublicFederationAPIMux, base.PublicKeyAPIMux, base.PublicWellKnownAPIMux, base.ProcessContext, base.PublicFederationAPIMux, base.PublicKeyAPIMux, base.PublicWellKnownAPIMux,
&base.Cfg.FederationAPI, userAPI, federation, keyRing, &base.Cfg.FederationAPI, userAPI, federation, keyRing,
rsAPI, fsAPI, base.EDUServerClient(), keyAPI, rsAPI, fsAPI, base.EDUServerClient(), keyAPI,
&base.Cfg.MSCs, nil, &base.Cfg.MSCs, nil,

View file

@ -59,22 +59,6 @@ type InputSendToDeviceEventRequest struct {
// InputSendToDeviceEventResponse is a response to InputSendToDeviceEventRequest // InputSendToDeviceEventResponse is a response to InputSendToDeviceEventRequest
type InputSendToDeviceEventResponse struct{} type InputSendToDeviceEventResponse struct{}
type InputReceiptEvent struct {
UserID string `json:"user_id"`
RoomID string `json:"room_id"`
EventID string `json:"event_id"`
Type string `json:"type"`
Timestamp gomatrixserverlib.Timestamp `json:"timestamp"`
}
// InputReceiptEventRequest is a request to EDUServerInputAPI
type InputReceiptEventRequest struct {
InputReceiptEvent InputReceiptEvent `json:"input_receipt_event"`
}
// InputReceiptEventResponse is a response to InputReceiptEventRequest
type InputReceiptEventResponse struct{}
type InputCrossSigningKeyUpdateRequest struct { type InputCrossSigningKeyUpdateRequest struct {
CrossSigningKeyUpdate `json:"signing_keys"` CrossSigningKeyUpdate `json:"signing_keys"`
} }
@ -94,10 +78,4 @@ type EDUServerInputAPI interface {
request *InputSendToDeviceEventRequest, request *InputSendToDeviceEventRequest,
response *InputSendToDeviceEventResponse, response *InputSendToDeviceEventResponse,
) error ) error
InputReceiptEvent(
ctx context.Context,
request *InputReceiptEventRequest,
response *InputReceiptEventResponse,
) error
} }

View file

@ -67,22 +67,3 @@ func SendToDevice(
response := InputSendToDeviceEventResponse{} response := InputSendToDeviceEventResponse{}
return eduAPI.InputSendToDeviceEvent(ctx, &request, &response) return eduAPI.InputSendToDeviceEvent(ctx, &request, &response)
} }
// SendReceipt sends a receipt event to EDU Server
func SendReceipt(
ctx context.Context,
eduAPI EDUServerInputAPI, userID, roomID, eventID, receiptType string,
timestamp gomatrixserverlib.Timestamp,
) error {
request := InputReceiptEventRequest{
InputReceiptEvent: InputReceiptEvent{
UserID: userID,
RoomID: roomID,
EventID: eventID,
Type: receiptType,
Timestamp: timestamp,
},
}
response := InputReceiptEventResponse{}
return eduAPI.InputReceiptEvent(ctx, &request, &response)
}

View file

@ -50,7 +50,6 @@ func NewInternalAPI(
JetStream: js, JetStream: js,
OutputTypingEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent), OutputTypingEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent),
OutputSendToDeviceEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent), OutputSendToDeviceEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent),
OutputReceiptEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent),
ServerName: cfg.Matrix.ServerName, ServerName: cfg.Matrix.ServerName,
} }
} }

View file

@ -37,8 +37,6 @@ type EDUServerInputAPI struct {
OutputTypingEventTopic string OutputTypingEventTopic string
// The kafka topic to output new send to device events to. // The kafka topic to output new send to device events to.
OutputSendToDeviceEventTopic string OutputSendToDeviceEventTopic string
// The kafka topic to output new receipt events to
OutputReceiptEventTopic string
// kafka producer // kafka producer
JetStream nats.JetStreamContext JetStream nats.JetStreamContext
// Internal user query API // Internal user query API
@ -169,30 +167,3 @@ func (t *EDUServerInputAPI) sendToDeviceEvent(ise *api.InputSendToDeviceEvent) e
return nil return nil
} }
// InputReceiptEvent implements api.EDUServerInputAPI
// TODO: Intelligently batch requests sent by the same user (e.g wait a few milliseconds before emitting output events)
func (t *EDUServerInputAPI) InputReceiptEvent(
ctx context.Context,
request *api.InputReceiptEventRequest,
response *api.InputReceiptEventResponse,
) error {
logrus.WithFields(logrus.Fields{}).Tracef("Producing to topic '%s'", t.OutputReceiptEventTopic)
output := &api.OutputReceiptEvent{
UserID: request.InputReceiptEvent.UserID,
RoomID: request.InputReceiptEvent.RoomID,
EventID: request.InputReceiptEvent.EventID,
Type: request.InputReceiptEvent.Type,
Timestamp: request.InputReceiptEvent.Timestamp,
}
js, err := json.Marshal(output)
if err != nil {
return err
}
_, err = t.JetStream.PublishMsg(&nats.Msg{
Subject: t.OutputReceiptEventTopic,
Data: js,
})
return err
}

View file

@ -14,7 +14,6 @@ import (
const ( const (
EDUServerInputTypingEventPath = "/eduserver/input" EDUServerInputTypingEventPath = "/eduserver/input"
EDUServerInputSendToDeviceEventPath = "/eduserver/sendToDevice" EDUServerInputSendToDeviceEventPath = "/eduserver/sendToDevice"
EDUServerInputReceiptEventPath = "/eduserver/receipt"
) )
// NewEDUServerClient creates a EDUServerInputAPI implemented by talking to a HTTP POST API. // NewEDUServerClient creates a EDUServerInputAPI implemented by talking to a HTTP POST API.
@ -55,16 +54,3 @@ func (h *httpEDUServerInputAPI) InputSendToDeviceEvent(
apiURL := h.eduServerURL + EDUServerInputSendToDeviceEventPath apiURL := h.eduServerURL + EDUServerInputSendToDeviceEventPath
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
} }
// InputSendToDeviceEvent implements EDUServerInputAPI
func (h *httpEDUServerInputAPI) InputReceiptEvent(
ctx context.Context,
request *api.InputReceiptEventRequest,
response *api.InputReceiptEventResponse,
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "InputReceiptEventPath")
defer span.Finish()
apiURL := h.eduServerURL + EDUServerInputReceiptEventPath
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}

View file

@ -38,17 +38,4 @@ func AddRoutes(t api.EDUServerInputAPI, internalAPIMux *mux.Router) {
return util.JSONResponse{Code: http.StatusOK, JSON: &response} return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}), }),
) )
internalAPIMux.Handle(EDUServerInputReceiptEventPath,
httputil.MakeInternalAPI("inputReceiptEvent", func(req *http.Request) util.JSONResponse {
var request api.InputReceiptEventRequest
var response api.InputReceiptEventResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
if err := t.InputReceiptEvent(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
} }

View file

@ -17,7 +17,9 @@ package consumers
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"strconv"
"github.com/getsentry/sentry-go"
"github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/federationapi/queue" "github.com/matrix-org/dendrite/federationapi/queue"
"github.com/matrix-org/dendrite/federationapi/storage" "github.com/matrix-org/dendrite/federationapi/storage"
@ -198,15 +200,25 @@ func (t *OutputEDUConsumer) onTypingEvent(ctx context.Context, msg *nats.Msg) bo
// onReceiptEvent is called in response to a message received on the receipt // onReceiptEvent is called in response to a message received on the receipt
// events topic from the EDU server. // events topic from the EDU server.
func (t *OutputEDUConsumer) onReceiptEvent(ctx context.Context, msg *nats.Msg) bool { func (t *OutputEDUConsumer) onReceiptEvent(ctx context.Context, msg *nats.Msg) bool {
// Extract the typing event from msg. receipt := api.OutputReceiptEvent{
var receipt api.OutputReceiptEvent UserID: msg.Header.Get(jetstream.UserID),
if err := json.Unmarshal(msg.Data, &receipt); err != nil { RoomID: msg.Header.Get(jetstream.RoomID),
// Skip this msg but continue processing messages. EventID: msg.Header.Get(jetstream.EventID),
log.WithError(err).Errorf("eduserver output log: message parse failed (expected receipt)") Type: msg.Header.Get("type"),
}
timestamp, err := strconv.Atoi(msg.Header.Get("timestamp"))
if err != nil {
// If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("EDU server output log: message parse failure")
sentry.CaptureException(err)
return true return true
} }
receipt.Timestamp = gomatrixserverlib.Timestamp(timestamp)
// only send receipt events which originated from us // only send receipt events which originated from us
// TODO: We're consuming/producing on the same topic from the federation api, maybe add different topics?
_, receiptServerName, err := gomatrixserverlib.SplitID('@', receipt.UserID) _, receiptServerName, err := gomatrixserverlib.SplitID('@', receipt.UserID)
if err != nil { if err != nil {
log.WithError(err).WithField("user_id", receipt.UserID).Error("failed to extract domain from receipt sender") log.WithError(err).WithField("user_id", receipt.UserID).Error("failed to extract domain from receipt sender")

View file

@ -22,6 +22,7 @@ import (
"github.com/matrix-org/dendrite/federationapi/consumers" "github.com/matrix-org/dendrite/federationapi/consumers"
"github.com/matrix-org/dendrite/federationapi/internal" "github.com/matrix-org/dendrite/federationapi/internal"
"github.com/matrix-org/dendrite/federationapi/inthttp" "github.com/matrix-org/dendrite/federationapi/inthttp"
"github.com/matrix-org/dendrite/federationapi/producers"
"github.com/matrix-org/dendrite/federationapi/queue" "github.com/matrix-org/dendrite/federationapi/queue"
"github.com/matrix-org/dendrite/federationapi/statistics" "github.com/matrix-org/dendrite/federationapi/statistics"
"github.com/matrix-org/dendrite/federationapi/storage" "github.com/matrix-org/dendrite/federationapi/storage"
@ -31,6 +32,7 @@ import (
"github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
userapi "github.com/matrix-org/dendrite/userapi/api" userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
@ -46,6 +48,7 @@ func AddInternalRoutes(router *mux.Router, intAPI api.FederationInternalAPI) {
// AddPublicRoutes sets up and registers HTTP handlers on the base API muxes for the FederationAPI component. // AddPublicRoutes sets up and registers HTTP handlers on the base API muxes for the FederationAPI component.
func AddPublicRoutes( func AddPublicRoutes(
process *process.ProcessContext,
fedRouter, keyRouter, wellKnownRouter *mux.Router, fedRouter, keyRouter, wellKnownRouter *mux.Router,
cfg *config.FederationAPI, cfg *config.FederationAPI,
userAPI userapi.UserInternalAPI, userAPI userapi.UserInternalAPI,
@ -58,11 +61,18 @@ func AddPublicRoutes(
mscCfg *config.MSCs, mscCfg *config.MSCs,
servers federationAPI.ServersInRoomProvider, servers federationAPI.ServersInRoomProvider,
) { ) {
js, _ := jetstream.Prepare(process, &cfg.Matrix.JetStream)
producer := &producers.SyncAPIProducer{
JetStream: js,
TopicReceiptEvent: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent),
}
routing.Setup( routing.Setup(
fedRouter, keyRouter, wellKnownRouter, cfg, rsAPI, fedRouter, keyRouter, wellKnownRouter, cfg, rsAPI,
eduAPI, federationAPI, keyRing, eduAPI, federationAPI, keyRing,
federation, userAPI, keyAPI, mscCfg, federation, userAPI, keyAPI, mscCfg,
servers, servers, producer,
) )
} }

View file

@ -30,7 +30,7 @@ func TestRoomsV3URLEscapeDoNot404(t *testing.T) {
fsAPI := base.FederationAPIHTTPClient() fsAPI := base.FederationAPIHTTPClient()
// TODO: This is pretty fragile, as if anything calls anything on these nils this test will break. // TODO: This is pretty fragile, as if anything calls anything on these nils this test will break.
// Unfortunately, it makes little sense to instantiate these dependencies when we just want to test routing. // Unfortunately, it makes little sense to instantiate these dependencies when we just want to test routing.
federationapi.AddPublicRoutes(base.PublicFederationAPIMux, base.PublicKeyAPIMux, base.PublicWellKnownAPIMux, &cfg.FederationAPI, nil, nil, keyRing, nil, fsAPI, nil, nil, &cfg.MSCs, nil) federationapi.AddPublicRoutes(base.ProcessContext, base.PublicFederationAPIMux, base.PublicKeyAPIMux, base.PublicWellKnownAPIMux, &cfg.FederationAPI, nil, nil, keyRing, nil, fsAPI, nil, nil, &cfg.MSCs, nil)
baseURL, cancel := test.ListenAndServe(t, base.PublicFederationAPIMux, true) baseURL, cancel := test.ListenAndServe(t, base.PublicFederationAPIMux, true)
defer cancel() defer cancel()
serverName := gomatrixserverlib.ServerName(strings.TrimPrefix(baseURL, "https://")) serverName := gomatrixserverlib.ServerName(strings.TrimPrefix(baseURL, "https://"))

View file

@ -0,0 +1,50 @@
// Copyright 2017 Vector Creations Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package producers
import (
"context"
"strconv"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
)
// SyncAPIProducer produces events for the sync API server to consume
type SyncAPIProducer struct {
TopicReceiptEvent string
JetStream nats.JetStreamContext
}
func (p *SyncAPIProducer) SendReceipt(
ctx context.Context,
userID, roomID, eventID, receiptType string, timestamp gomatrixserverlib.Timestamp,
) error {
m := &nats.Msg{
Subject: p.TopicReceiptEvent,
Header: nats.Header{},
}
m.Header.Set(jetstream.UserID, userID)
m.Header.Set(jetstream.RoomID, roomID)
m.Header.Set(jetstream.EventID, eventID)
m.Header.Set("type", receiptType)
m.Header.Set("timestamp", strconv.Itoa(int(timestamp)))
log.WithFields(log.Fields{}).Tracef("Producing to topic '%s'", p.TopicReceiptEvent)
_, err := p.JetStream.PublishMsg(m, nats.Context(ctx))
return err
}

View file

@ -21,6 +21,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
eduserverAPI "github.com/matrix-org/dendrite/eduserver/api" eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
federationAPI "github.com/matrix-org/dendrite/federationapi/api" federationAPI "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/federationapi/producers"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/internal/httputil"
keyserverAPI "github.com/matrix-org/dendrite/keyserver/api" keyserverAPI "github.com/matrix-org/dendrite/keyserver/api"
@ -52,6 +53,7 @@ func Setup(
keyAPI keyserverAPI.KeyInternalAPI, keyAPI keyserverAPI.KeyInternalAPI,
mscCfg *config.MSCs, mscCfg *config.MSCs,
servers federationAPI.ServersInRoomProvider, servers federationAPI.ServersInRoomProvider,
producer *producers.SyncAPIProducer,
) { ) {
v2keysmux := keyMux.PathPrefix("/v2").Subrouter() v2keysmux := keyMux.PathPrefix("/v2").Subrouter()
v1fedmux := fedMux.PathPrefix("/v1").Subrouter() v1fedmux := fedMux.PathPrefix("/v1").Subrouter()
@ -116,7 +118,7 @@ func Setup(
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest, vars map[string]string) util.JSONResponse { func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest, vars map[string]string) util.JSONResponse {
return Send( return Send(
httpReq, request, gomatrixserverlib.TransactionID(vars["txnID"]), httpReq, request, gomatrixserverlib.TransactionID(vars["txnID"]),
cfg, rsAPI, eduAPI, keyAPI, keys, federation, mu, servers, cfg, rsAPI, eduAPI, keyAPI, keys, federation, mu, servers, producer,
) )
}, },
)).Methods(http.MethodPut, http.MethodOptions) )).Methods(http.MethodPut, http.MethodOptions)

View file

@ -25,6 +25,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
eduserverAPI "github.com/matrix-org/dendrite/eduserver/api" eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
federationAPI "github.com/matrix-org/dendrite/federationapi/api" federationAPI "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/federationapi/producers"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
keyapi "github.com/matrix-org/dendrite/keyserver/api" keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
@ -93,6 +94,7 @@ func Send(
federation *gomatrixserverlib.FederationClient, federation *gomatrixserverlib.FederationClient,
mu *internal.MutexByRoom, mu *internal.MutexByRoom,
servers federationAPI.ServersInRoomProvider, servers federationAPI.ServersInRoomProvider,
producer *producers.SyncAPIProducer,
) util.JSONResponse { ) util.JSONResponse {
// First we should check if this origin has already submitted this // First we should check if this origin has already submitted this
// txn ID to us. If they have and the txnIDs map contains an entry, // txn ID to us. If they have and the txnIDs map contains an entry,
@ -133,6 +135,7 @@ func Send(
servers: servers, servers: servers,
keyAPI: keyAPI, keyAPI: keyAPI,
roomsMu: mu, roomsMu: mu,
producer: producer,
} }
var txnEvents struct { var txnEvents struct {
@ -191,6 +194,7 @@ type txnReq struct {
federation txnFederationClient federation txnFederationClient
roomsMu *internal.MutexByRoom roomsMu *internal.MutexByRoom
servers federationAPI.ServersInRoomProvider servers federationAPI.ServersInRoomProvider
producer *producers.SyncAPIProducer
} }
// A subset of FederationClient functionality that txn requires. Useful for testing. // A subset of FederationClient functionality that txn requires. Useful for testing.
@ -430,17 +434,7 @@ func (t *txnReq) processReceiptEvent(ctx context.Context,
) error { ) error {
// store every event // store every event
for _, eventID := range eventIDs { for _, eventID := range eventIDs {
req := eduserverAPI.InputReceiptEventRequest{ if err := t.producer.SendReceipt(ctx, userID, roomID, eventID, receiptType, timestamp); err != nil {
InputReceiptEvent: eduserverAPI.InputReceiptEvent{
UserID: userID,
RoomID: roomID,
EventID: eventID,
Type: receiptType,
Timestamp: timestamp,
},
}
resp := eduserverAPI.InputReceiptEventResponse{}
if err := t.eduAPI.InputReceiptEvent(ctx, &req, &resp); err != nil {
return fmt.Errorf("unable to set receipt event: %w", err) return fmt.Errorf("unable to set receipt event: %w", err)
} }
} }

View file

@ -75,14 +75,6 @@ func (p *testEDUProducer) InputSendToDeviceEvent(
return nil return nil
} }
func (o *testEDUProducer) InputReceiptEvent(
ctx context.Context,
request *eduAPI.InputReceiptEventRequest,
response *eduAPI.InputReceiptEventResponse,
) error {
return nil
}
func (o *testEDUProducer) InputCrossSigningKeyUpdate( func (o *testEDUProducer) InputCrossSigningKeyUpdate(
ctx context.Context, ctx context.Context,
request *eduAPI.InputCrossSigningKeyUpdateRequest, request *eduAPI.InputCrossSigningKeyUpdateRequest,

View file

@ -9,6 +9,7 @@ import (
const ( const (
UserID = "user_id" UserID = "user_id"
RoomID = "room_id" RoomID = "room_id"
EventID = "event_id"
) )
var ( var (

View file

@ -64,7 +64,7 @@ func (m *Monolith) AddAllPublicRoutes(process *process.ProcessContext, csMux, ss
m.ExtPublicRoomsProvider, &m.Config.MSCs, m.ExtPublicRoomsProvider, &m.Config.MSCs,
) )
federationapi.AddPublicRoutes( federationapi.AddPublicRoutes(
ssMux, keyMux, wkMux, &m.Config.FederationAPI, m.UserAPI, m.FedClient, process, ssMux, keyMux, wkMux, &m.Config.FederationAPI, m.UserAPI, m.FedClient,
m.KeyRing, m.RoomserverAPI, m.FederationAPI, m.KeyRing, m.RoomserverAPI, m.FederationAPI,
m.EDUInternalAPI, m.KeyAPI, &m.Config.MSCs, nil, m.EDUInternalAPI, m.KeyAPI, &m.Config.MSCs, nil,
) )

View file

@ -17,8 +17,8 @@ package consumers
import ( import (
"context" "context"
"database/sql" "database/sql"
"encoding/json"
"fmt" "fmt"
"strconv"
"github.com/getsentry/sentry-go" "github.com/getsentry/sentry-go"
"github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/eduserver/api"
@ -81,14 +81,23 @@ func (s *OutputReceiptEventConsumer) Start() error {
} }
func (s *OutputReceiptEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { func (s *OutputReceiptEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
var output api.OutputReceiptEvent output := api.OutputReceiptEvent{
if err := json.Unmarshal(msg.Data, &output); err != nil { UserID: msg.Header.Get(jetstream.UserID),
RoomID: msg.Header.Get(jetstream.RoomID),
EventID: msg.Header.Get(jetstream.EventID),
Type: msg.Header.Get("type"),
}
timestamp, err := strconv.Atoi(msg.Header.Get("timestamp"))
if err != nil {
// If the message was invalid, log it and move on to the next message in the stream // If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("EDU server output log: message parse failure") log.WithError(err).Errorf("EDU server output log: message parse failure")
sentry.CaptureException(err) sentry.CaptureException(err)
return true return true
} }
output.Timestamp = gomatrixserverlib.Timestamp(timestamp)
streamPos, err := s.db.StoreReceipt( streamPos, err := s.db.StoreReceipt(
s.ctx, s.ctx,
output.RoomID, output.RoomID,