Move structs, cleanup EDU Server traces

This commit is contained in:
Till Faelligen 2022-03-23 09:24:56 +01:00
parent b41b9f8599
commit 9abf0a75c9
30 changed files with 112 additions and 205 deletions

View file

@ -19,9 +19,9 @@ import (
"encoding/json" "encoding/json"
"strconv" "strconv"
"github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api" userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
@ -128,7 +128,7 @@ func (p *SyncAPIProducer) SendToDevice(
"type": eventType, "type": eventType,
}).Tracef("Producing to topic '%s'", p.TopicSendToDeviceEvent) }).Tracef("Producing to topic '%s'", p.TopicSendToDeviceEvent)
for _, device := range devices { for _, device := range devices {
ote := &api.OutputSendToDeviceEvent{ ote := &types.OutputSendToDeviceEvent{
UserID: userID, UserID: userID,
DeviceID: device, DeviceID: device,
SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{ SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{

View file

@ -25,7 +25,7 @@ import (
) )
// SendToDevice handles PUT /_matrix/client/r0/sendToDevice/{eventType}/{txnId} // SendToDevice handles PUT /_matrix/client/r0/sendToDevice/{eventType}/{txnId}
// sends the device events to the EDU Server // sends the device events to the syncapi & federationsender
func SendToDevice( func SendToDevice(
req *http.Request, device *userapi.Device, req *http.Request, device *userapi.Device,
syncProducer *producers.SyncAPIProducer, syncProducer *producers.SyncAPIProducer,

View file

@ -59,7 +59,6 @@ func main() {
// itself. // itself.
cfg.AppServiceAPI.InternalAPI.Connect = httpAPIAddr cfg.AppServiceAPI.InternalAPI.Connect = httpAPIAddr
cfg.ClientAPI.InternalAPI.Connect = httpAPIAddr cfg.ClientAPI.InternalAPI.Connect = httpAPIAddr
cfg.EDUServer.InternalAPI.Connect = httpAPIAddr
cfg.FederationAPI.InternalAPI.Connect = httpAPIAddr cfg.FederationAPI.InternalAPI.Connect = httpAPIAddr
cfg.KeyServer.InternalAPI.Connect = httpAPIAddr cfg.KeyServer.InternalAPI.Connect = httpAPIAddr
cfg.MediaAPI.InternalAPI.Connect = httpAPIAddr cfg.MediaAPI.InternalAPI.Connect = httpAPIAddr

View file

@ -187,12 +187,6 @@ client_api:
threshold: 5 threshold: 5
cooloff_ms: 500 cooloff_ms: 500
# Configuration for the EDU server.
edu_server:
internal_api:
listen: http://localhost:7778 # Only used in polylith deployments
connect: http://localhost:7778 # Only used in polylith deployments
# Configuration for the Federation API. # Configuration for the Federation API.
federation_api: federation_api:
internal_api: internal_api:

View file

@ -1,44 +0,0 @@
// Copyright 2017 Vector Creations Ltd
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package api
import (
"github.com/matrix-org/gomatrixserverlib"
)
// OutputSendToDeviceEvent is an entry in the send-to-device output kafka log.
// This contains the full event content, along with the user ID and device ID
// to which it is destined.
type OutputSendToDeviceEvent struct {
UserID string `json:"user_id"`
DeviceID string `json:"device_id"`
gomatrixserverlib.SendToDeviceEvent
}
// OutputReceiptEvent is an entry in the receipt output kafka log
type OutputReceiptEvent struct {
UserID string `json:"user_id"`
RoomID string `json:"room_id"`
EventID string `json:"event_id"`
Type string `json:"type"`
Timestamp gomatrixserverlib.Timestamp `json:"timestamp"`
}
// OutputCrossSigningKeyUpdate is an entry in the signing key update output kafka log
type OutputCrossSigningKeyUpdate struct {
CrossSigningKeyUpdate `json:"signing_keys"`
}

View file

@ -1,44 +0,0 @@
// Copyright 2021 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package api
import "github.com/matrix-org/gomatrixserverlib"
const (
MSigningKeyUpdate = "m.signing_key_update"
)
type FederationReceiptMRead struct {
User map[string]FederationReceiptData `json:"m.read"`
}
type FederationReceiptData struct {
Data ReceiptTS `json:"data"`
EventIDs []string `json:"event_ids"`
}
type ReceiptMRead struct {
User map[string]ReceiptTS `json:"m.read"`
}
type ReceiptTS struct {
TS gomatrixserverlib.Timestamp `json:"ts"`
}
type CrossSigningKeyUpdate struct {
MasterKey *gomatrixserverlib.CrossSigningKey `json:"master_key,omitempty"`
SelfSigningKey *gomatrixserverlib.CrossSigningKey `json:"self_signing_key,omitempty"`
UserID string `json:"user_id"`
}

View file

@ -20,12 +20,13 @@ import (
"strconv" "strconv"
"github.com/getsentry/sentry-go" "github.com/getsentry/sentry-go"
"github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/federationapi/queue" "github.com/matrix-org/dendrite/federationapi/queue"
"github.com/matrix-org/dendrite/federationapi/storage" "github.com/matrix-org/dendrite/federationapi/storage"
fedTypes "github.com/matrix-org/dendrite/federationapi/types"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/setup/process"
syncTypes "github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
@ -45,7 +46,7 @@ type OutputEDUConsumer struct {
receiptTopic string receiptTopic string
} }
// NewOutputEDUConsumer creates a new OutputEDUConsumer. Call Start() to begin consuming from EDU servers. // NewOutputEDUConsumer creates a new OutputEDUConsumer. Call Start() to begin consuming EDUs.
func NewOutputEDUConsumer( func NewOutputEDUConsumer(
process *process.ProcessContext, process *process.ProcessContext,
cfg *config.FederationAPI, cfg *config.FederationAPI,
@ -104,9 +105,9 @@ func (t *OutputEDUConsumer) onSendToDeviceEvent(ctx context.Context, msg *nats.M
return true return true
} }
// Extract the send-to-device event from msg. // Extract the send-to-device event from msg.
var ote api.OutputSendToDeviceEvent var ote syncTypes.OutputSendToDeviceEvent
if err := json.Unmarshal(msg.Data, &ote); err != nil { if err := json.Unmarshal(msg.Data, &ote); err != nil {
log.WithError(err).Errorf("eduserver output log: message parse failed (expected send-to-device)") log.WithError(err).Errorf("output log: message parse failed (expected send-to-device)")
return true return true
} }
@ -154,7 +155,7 @@ func (t *OutputEDUConsumer) onTypingEvent(ctx context.Context, msg *nats.Msg) bo
userID := msg.Header.Get(jetstream.UserID) userID := msg.Header.Get(jetstream.UserID)
typing, err := strconv.ParseBool(msg.Header.Get("typing")) typing, err := strconv.ParseBool(msg.Header.Get("typing"))
if err != nil { if err != nil {
log.WithError(err).Errorf("EDU server output log: typing parse failure") log.WithError(err).Errorf("EDU output log: typing parse failure")
return true return true
} }
@ -189,7 +190,6 @@ func (t *OutputEDUConsumer) onTypingEvent(ctx context.Context, msg *nats.Msg) bo
log.WithError(err).Error("failed to marshal EDU JSON") log.WithError(err).Error("failed to marshal EDU JSON")
return true return true
} }
log.Debugf("sending edu: %+v", edu)
if err := t.queues.SendEDU(edu, t.ServerName, names); err != nil { if err := t.queues.SendEDU(edu, t.ServerName, names); err != nil {
log.WithError(err).Error("failed to send EDU") log.WithError(err).Error("failed to send EDU")
return false return false
@ -201,7 +201,7 @@ func (t *OutputEDUConsumer) onTypingEvent(ctx context.Context, msg *nats.Msg) bo
// onReceiptEvent is called in response to a message received on the receipt // onReceiptEvent is called in response to a message received on the receipt
// events topic from the EDU server. // events topic from the EDU server.
func (t *OutputEDUConsumer) onReceiptEvent(ctx context.Context, msg *nats.Msg) bool { func (t *OutputEDUConsumer) onReceiptEvent(ctx context.Context, msg *nats.Msg) bool {
receipt := api.OutputReceiptEvent{ receipt := syncTypes.OutputReceiptEvent{
UserID: msg.Header.Get(jetstream.UserID), UserID: msg.Header.Get(jetstream.UserID),
RoomID: msg.Header.Get(jetstream.RoomID), RoomID: msg.Header.Get(jetstream.RoomID),
EventID: msg.Header.Get(jetstream.EventID), EventID: msg.Header.Get(jetstream.EventID),
@ -211,7 +211,7 @@ func (t *OutputEDUConsumer) onReceiptEvent(ctx context.Context, msg *nats.Msg) b
timestamp, err := strconv.Atoi(msg.Header.Get("timestamp")) timestamp, err := strconv.Atoi(msg.Header.Get("timestamp"))
if err != nil { if err != nil {
// If the message was invalid, log it and move on to the next message in the stream // If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("EDU server output log: message parse failure") log.WithError(err).Errorf("EDU output log: message parse failure")
sentry.CaptureException(err) sentry.CaptureException(err)
return true return true
} }
@ -240,11 +240,11 @@ func (t *OutputEDUConsumer) onReceiptEvent(ctx context.Context, msg *nats.Msg) b
names[i] = joined[i].ServerName names[i] = joined[i].ServerName
} }
content := map[string]api.FederationReceiptMRead{} content := map[string]fedTypes.FederationReceiptMRead{}
content[receipt.RoomID] = api.FederationReceiptMRead{ content[receipt.RoomID] = fedTypes.FederationReceiptMRead{
User: map[string]api.FederationReceiptData{ User: map[string]fedTypes.FederationReceiptData{
receipt.UserID: { receipt.UserID: {
Data: api.ReceiptTS{ Data: fedTypes.ReceiptTS{
TS: receipt.Timestamp, TS: receipt.Timestamp,
}, },
EventIDs: []string{receipt.EventID}, EventIDs: []string{receipt.EventID},

View file

@ -18,9 +18,9 @@ import (
"context" "context"
"encoding/json" "encoding/json"
eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/federationapi/queue" "github.com/matrix-org/dendrite/federationapi/queue"
"github.com/matrix-org/dendrite/federationapi/storage" "github.com/matrix-org/dendrite/federationapi/storage"
"github.com/matrix-org/dendrite/federationapi/types"
"github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/keyserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
@ -190,7 +190,7 @@ func (t *KeyChangeConsumer) onCrossSigningMessage(m api.DeviceMessage) bool {
// Pack the EDU and marshal it // Pack the EDU and marshal it
edu := &gomatrixserverlib.EDU{ edu := &gomatrixserverlib.EDU{
Type: eduserverAPI.MSigningKeyUpdate, Type: types.MSigningKeyUpdate,
Origin: string(t.serverName), Origin: string(t.serverName),
} }
if edu.Content, err = json.Marshal(output); err != nil { if edu.Content, err = json.Marshal(output); err != nil {

View file

@ -30,7 +30,7 @@ func TestRoomsV3URLEscapeDoNot404(t *testing.T) {
fsAPI := base.FederationAPIHTTPClient() fsAPI := base.FederationAPIHTTPClient()
// TODO: This is pretty fragile, as if anything calls anything on these nils this test will break. // TODO: This is pretty fragile, as if anything calls anything on these nils this test will break.
// Unfortunately, it makes little sense to instantiate these dependencies when we just want to test routing. // Unfortunately, it makes little sense to instantiate these dependencies when we just want to test routing.
federationapi.AddPublicRoutes(base.ProcessContext, base.PublicFederationAPIMux, base.PublicKeyAPIMux, base.PublicWellKnownAPIMux, &cfg.FederationAPI, nil, nil, keyRing, nil, fsAPI, nil, nil, &cfg.MSCs, nil) federationapi.AddPublicRoutes(base.ProcessContext, base.PublicFederationAPIMux, base.PublicKeyAPIMux, base.PublicWellKnownAPIMux, &cfg.FederationAPI, nil, nil, keyRing, nil, fsAPI, nil, &cfg.MSCs, nil)
baseURL, cancel := test.ListenAndServe(t, base.PublicFederationAPIMux, true) baseURL, cancel := test.ListenAndServe(t, base.PublicFederationAPIMux, true)
defer cancel() defer cancel()
serverName := gomatrixserverlib.ServerName(strings.TrimPrefix(baseURL, "https://")) serverName := gomatrixserverlib.ServerName(strings.TrimPrefix(baseURL, "https://"))

View file

@ -19,8 +19,8 @@ import (
"encoding/json" "encoding/json"
"strconv" "strconv"
"github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api" userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
@ -97,7 +97,7 @@ func (p *SyncAPIProducer) SendToDevice(
"type": eventType, "type": eventType,
}).Tracef("Producing to topic '%s'", p.TopicSendToDeviceEvent) }).Tracef("Producing to topic '%s'", p.TopicSendToDeviceEvent)
for _, device := range devices { for _, device := range devices {
ote := &api.OutputSendToDeviceEvent{ ote := &types.OutputSendToDeviceEvent{
UserID: userID, UserID: userID,
DeviceID: device, DeviceID: device,
SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{ SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{

View file

@ -23,9 +23,9 @@ import (
"time" "time"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
federationAPI "github.com/matrix-org/dendrite/federationapi/api" federationAPI "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/federationapi/producers" "github.com/matrix-org/dendrite/federationapi/producers"
"github.com/matrix-org/dendrite/federationapi/types"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
keyapi "github.com/matrix-org/dendrite/keyserver/api" keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
@ -331,7 +331,7 @@ func (t *txnReq) processEDUs(ctx context.Context) {
continue continue
} }
if err := t.producer.SendTyping(ctx, typingPayload.UserID, typingPayload.RoomID, typingPayload.Typing, 30*1000); err != nil { if err := t.producer.SendTyping(ctx, typingPayload.UserID, typingPayload.RoomID, typingPayload.Typing, 30*1000); err != nil {
util.GetLogger(ctx).WithError(err).Error("Failed to send typing event to edu server") util.GetLogger(ctx).WithError(err).Error("Failed to send typing event to JetStream")
} }
case gomatrixserverlib.MDirectToDevice: case gomatrixserverlib.MDirectToDevice:
// https://matrix.org/docs/spec/server_server/r0.1.3#m-direct-to-device-schema // https://matrix.org/docs/spec/server_server/r0.1.3#m-direct-to-device-schema
@ -348,7 +348,7 @@ func (t *txnReq) processEDUs(ctx context.Context) {
"sender": directPayload.Sender, "sender": directPayload.Sender,
"user_id": userID, "user_id": userID,
"device_id": deviceID, "device_id": deviceID,
}).Error("Failed to send send-to-device event to edu server") }).Error("Failed to send send-to-device event to JetStream")
} }
} }
} }
@ -356,7 +356,7 @@ func (t *txnReq) processEDUs(ctx context.Context) {
t.processDeviceListUpdate(ctx, e) t.processDeviceListUpdate(ctx, e)
case gomatrixserverlib.MReceipt: case gomatrixserverlib.MReceipt:
// https://matrix.org/docs/spec/server_server/r0.1.4#receipts // https://matrix.org/docs/spec/server_server/r0.1.4#receipts
payload := map[string]eduserverAPI.FederationReceiptMRead{} payload := map[string]types.FederationReceiptMRead{}
if err := json.Unmarshal(e.Content, &payload); err != nil { if err := json.Unmarshal(e.Content, &payload); err != nil {
util.GetLogger(ctx).WithError(err).Debug("Failed to unmarshal receipt event") util.GetLogger(ctx).WithError(err).Debug("Failed to unmarshal receipt event")
@ -380,12 +380,12 @@ func (t *txnReq) processEDUs(ctx context.Context) {
"user_id": userID, "user_id": userID,
"room_id": roomID, "room_id": roomID,
"events": mread.EventIDs, "events": mread.EventIDs,
}).Error("Failed to send receipt event to edu server") }).Error("Failed to send receipt event to JetStream")
continue continue
} }
} }
} }
case eduserverAPI.MSigningKeyUpdate: case types.MSigningKeyUpdate:
if err := t.processSigningKeyUpdate(ctx, e); err != nil { if err := t.processSigningKeyUpdate(ctx, e); err != nil {
logrus.WithError(err).Errorf("Failed to process signing key update") logrus.WithError(err).Errorf("Failed to process signing key update")
} }
@ -396,7 +396,7 @@ func (t *txnReq) processEDUs(ctx context.Context) {
} }
func (t *txnReq) processSigningKeyUpdate(ctx context.Context, e gomatrixserverlib.EDU) error { func (t *txnReq) processSigningKeyUpdate(ctx context.Context, e gomatrixserverlib.EDU) error {
var updatePayload eduserverAPI.CrossSigningKeyUpdate var updatePayload keyapi.CrossSigningKeyUpdate
if err := json.Unmarshal(e.Content, &updatePayload); err != nil { if err := json.Unmarshal(e.Content, &updatePayload); err != nil {
util.GetLogger(ctx).WithError(err).WithFields(logrus.Fields{ util.GetLogger(ctx).WithError(err).WithFields(logrus.Fields{
"user_id": updatePayload.UserID, "user_id": updatePayload.UserID,
@ -423,7 +423,7 @@ func (t *txnReq) processSigningKeyUpdate(ctx context.Context, e gomatrixserverli
return nil return nil
} }
// processReceiptEvent sends receipt events to the edu server // processReceiptEvent sends receipt events to JetStream
func (t *txnReq) processReceiptEvent(ctx context.Context, func (t *txnReq) processReceiptEvent(ctx context.Context,
userID, roomID, receiptType string, userID, roomID, receiptType string,
timestamp gomatrixserverlib.Timestamp, timestamp gomatrixserverlib.Timestamp,

View file

@ -18,6 +18,8 @@ import (
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
) )
const MSigningKeyUpdate = "m.signing_key_update" // TODO: move to gomatrixserverlib
// A JoinedHost is a server that is joined to a matrix room. // A JoinedHost is a server that is joined to a matrix room.
type JoinedHost struct { type JoinedHost struct {
// The MemberEventID of a m.room.member join event. // The MemberEventID of a m.room.member join event.
@ -51,3 +53,16 @@ type InboundPeek struct {
RenewedTimestamp int64 RenewedTimestamp int64
RenewalInterval int64 RenewalInterval int64
} }
type FederationReceiptMRead struct {
User map[string]FederationReceiptData `json:"m.read"`
}
type FederationReceiptData struct {
Data ReceiptTS `json:"data"`
EventIDs []string `json:"event_ids"`
}
type ReceiptTS struct {
TS gomatrixserverlib.Timestamp `json:"ts"`
}

View file

@ -19,8 +19,6 @@ package caching
import ( import (
"sync" "sync"
"time" "time"
"github.com/sirupsen/logrus"
) )
const defaultTypingTimeout = 10 * time.Second const defaultTypingTimeout = 10 * time.Second
@ -102,7 +100,6 @@ func (t *EDUCache) GetTypingUsersIfUpdatedAfter(
func (t *EDUCache) AddTypingUser( func (t *EDUCache) AddTypingUser(
userID, roomID string, expire *time.Time, userID, roomID string, expire *time.Time,
) int64 { ) int64 {
logrus.Debugf("Adding user to room: %s %s", userID, roomID)
expireTime := getExpireTime(expire) expireTime := getExpireTime(expire)
if until := time.Until(expireTime); until > 0 { if until := time.Until(expireTime); until > 0 {
timer := time.AfterFunc(until, func() { timer := time.AfterFunc(until, func() {

View file

@ -97,7 +97,6 @@ func MakeConfig(configDir, kafkaURI, database, host string, startPort int) (*con
cfg.UserAPI.AccountDatabase.ConnectionString = config.DataSource(database) cfg.UserAPI.AccountDatabase.ConnectionString = config.DataSource(database)
cfg.AppServiceAPI.InternalAPI.Listen = assignAddress() cfg.AppServiceAPI.InternalAPI.Listen = assignAddress()
cfg.EDUServer.InternalAPI.Listen = assignAddress()
cfg.FederationAPI.InternalAPI.Listen = assignAddress() cfg.FederationAPI.InternalAPI.Listen = assignAddress()
cfg.KeyServer.InternalAPI.Listen = assignAddress() cfg.KeyServer.InternalAPI.Listen = assignAddress()
cfg.MediaAPI.InternalAPI.Listen = assignAddress() cfg.MediaAPI.InternalAPI.Listen = assignAddress()
@ -106,7 +105,6 @@ func MakeConfig(configDir, kafkaURI, database, host string, startPort int) (*con
cfg.UserAPI.InternalAPI.Listen = assignAddress() cfg.UserAPI.InternalAPI.Listen = assignAddress()
cfg.AppServiceAPI.InternalAPI.Connect = cfg.AppServiceAPI.InternalAPI.Listen cfg.AppServiceAPI.InternalAPI.Connect = cfg.AppServiceAPI.InternalAPI.Listen
cfg.EDUServer.InternalAPI.Connect = cfg.EDUServer.InternalAPI.Listen
cfg.FederationAPI.InternalAPI.Connect = cfg.FederationAPI.InternalAPI.Listen cfg.FederationAPI.InternalAPI.Connect = cfg.FederationAPI.InternalAPI.Listen
cfg.KeyServer.InternalAPI.Connect = cfg.KeyServer.InternalAPI.Listen cfg.KeyServer.InternalAPI.Connect = cfg.KeyServer.InternalAPI.Listen
cfg.MediaAPI.InternalAPI.Connect = cfg.MediaAPI.InternalAPI.Listen cfg.MediaAPI.InternalAPI.Connect = cfg.MediaAPI.InternalAPI.Listen

View file

@ -21,7 +21,6 @@ import (
"strings" "strings"
"time" "time"
eduapi "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/keyserver/types" "github.com/matrix-org/dendrite/keyserver/types"
userapi "github.com/matrix-org/dendrite/userapi/api" userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
@ -66,14 +65,25 @@ const (
// DeviceMessage represents the message produced into Kafka by the key server. // DeviceMessage represents the message produced into Kafka by the key server.
type DeviceMessage struct { type DeviceMessage struct {
Type DeviceMessageType `json:"Type,omitempty"` Type DeviceMessageType `json:"Type,omitempty"`
*DeviceKeys `json:"DeviceKeys,omitempty"` *DeviceKeys `json:"DeviceKeys,omitempty"`
*eduapi.OutputCrossSigningKeyUpdate `json:"CrossSigningKeyUpdate,omitempty"` *OutputCrossSigningKeyUpdate `json:"CrossSigningKeyUpdate,omitempty"`
// A monotonically increasing number which represents device changes for this user. // A monotonically increasing number which represents device changes for this user.
StreamID int64 StreamID int64
DeviceChangeID int64 DeviceChangeID int64
} }
// OutputCrossSigningKeyUpdate is an entry in the signing key update output kafka log
type OutputCrossSigningKeyUpdate struct {
CrossSigningKeyUpdate `json:"signing_keys"`
}
type CrossSigningKeyUpdate struct {
MasterKey *gomatrixserverlib.CrossSigningKey `json:"master_key,omitempty"`
SelfSigningKey *gomatrixserverlib.CrossSigningKey `json:"self_signing_key,omitempty"`
UserID string `json:"user_id"`
}
// DeviceKeysEqual returns true if the device keys updates contain the // DeviceKeysEqual returns true if the device keys updates contain the
// same display name and key JSON. This will return false if either of // same display name and key JSON. This will return false if either of
// the updates is not a device keys update, or if the user ID/device ID // the updates is not a device keys update, or if the user ID/device ID

View file

@ -22,7 +22,6 @@ import (
"fmt" "fmt"
"strings" "strings"
eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/keyserver/types" "github.com/matrix-org/dendrite/keyserver/types"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
@ -246,7 +245,7 @@ func (a *KeyInternalAPI) PerformUploadDeviceKeys(ctx context.Context, req *api.P
} }
// Finally, generate a notification that we updated the keys. // Finally, generate a notification that we updated the keys.
update := eduserverAPI.CrossSigningKeyUpdate{ update := api.CrossSigningKeyUpdate{
UserID: req.UserID, UserID: req.UserID,
} }
if mk, ok := byPurpose[gomatrixserverlib.CrossSigningKeyPurposeMaster]; ok { if mk, ok := byPurpose[gomatrixserverlib.CrossSigningKeyPurposeMaster]; ok {
@ -337,7 +336,7 @@ func (a *KeyInternalAPI) PerformUploadDeviceSignatures(ctx context.Context, req
for userID := range req.Signatures { for userID := range req.Signatures {
masterKey := queryRes.MasterKeys[userID] masterKey := queryRes.MasterKeys[userID]
selfSigningKey := queryRes.SelfSigningKeys[userID] selfSigningKey := queryRes.SelfSigningKeys[userID]
update := eduserverAPI.CrossSigningKeyUpdate{ update := api.CrossSigningKeyUpdate{
UserID: userID, UserID: userID,
MasterKey: &masterKey, MasterKey: &masterKey,
SelfSigningKey: &selfSigningKey, SelfSigningKey: &selfSigningKey,

View file

@ -18,7 +18,6 @@ import (
"context" "context"
"encoding/json" "encoding/json"
eduapi "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/keyserver/storage" "github.com/matrix-org/dendrite/keyserver/storage"
"github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/jetstream"
@ -70,10 +69,10 @@ func (p *KeyChange) ProduceKeyChanges(keys []api.DeviceMessage) error {
return nil return nil
} }
func (p *KeyChange) ProduceSigningKeyUpdate(key eduapi.CrossSigningKeyUpdate) error { func (p *KeyChange) ProduceSigningKeyUpdate(key api.CrossSigningKeyUpdate) error {
output := &api.DeviceMessage{ output := &api.DeviceMessage{
Type: api.TypeCrossSigningUpdate, Type: api.TypeCrossSigningUpdate,
OutputCrossSigningKeyUpdate: &eduapi.OutputCrossSigningKeyUpdate{ OutputCrossSigningKeyUpdate: &api.OutputCrossSigningKeyUpdate{
CrossSigningKeyUpdate: key, CrossSigningKeyUpdate: key,
}, },
} }

View file

@ -56,7 +56,6 @@ type Dendrite struct {
Global Global `yaml:"global"` Global Global `yaml:"global"`
AppServiceAPI AppServiceAPI `yaml:"app_service_api"` AppServiceAPI AppServiceAPI `yaml:"app_service_api"`
ClientAPI ClientAPI `yaml:"client_api"` ClientAPI ClientAPI `yaml:"client_api"`
EDUServer EDUServer `yaml:"edu_server"`
FederationAPI FederationAPI `yaml:"federation_api"` FederationAPI FederationAPI `yaml:"federation_api"`
KeyServer KeyServer `yaml:"key_server"` KeyServer KeyServer `yaml:"key_server"`
MediaAPI MediaAPI `yaml:"media_api"` MediaAPI MediaAPI `yaml:"media_api"`
@ -296,7 +295,6 @@ func (c *Dendrite) Defaults(generate bool) {
c.Global.Defaults(generate) c.Global.Defaults(generate)
c.ClientAPI.Defaults(generate) c.ClientAPI.Defaults(generate)
c.EDUServer.Defaults(generate)
c.FederationAPI.Defaults(generate) c.FederationAPI.Defaults(generate)
c.KeyServer.Defaults(generate) c.KeyServer.Defaults(generate)
c.MediaAPI.Defaults(generate) c.MediaAPI.Defaults(generate)
@ -314,8 +312,7 @@ func (c *Dendrite) Verify(configErrs *ConfigErrors, isMonolith bool) {
Verify(configErrs *ConfigErrors, isMonolith bool) Verify(configErrs *ConfigErrors, isMonolith bool)
} }
for _, c := range []verifiable{ for _, c := range []verifiable{
&c.Global, &c.ClientAPI, &c.Global, &c.ClientAPI, &c.FederationAPI,
&c.EDUServer, &c.FederationAPI,
&c.KeyServer, &c.MediaAPI, &c.RoomServer, &c.KeyServer, &c.MediaAPI, &c.RoomServer,
&c.SyncAPI, &c.UserAPI, &c.SyncAPI, &c.UserAPI,
&c.AppServiceAPI, &c.MSCs, &c.AppServiceAPI, &c.MSCs,
@ -327,7 +324,6 @@ func (c *Dendrite) Verify(configErrs *ConfigErrors, isMonolith bool) {
func (c *Dendrite) Wiring() { func (c *Dendrite) Wiring() {
c.Global.JetStream.Matrix = &c.Global c.Global.JetStream.Matrix = &c.Global
c.ClientAPI.Matrix = &c.Global c.ClientAPI.Matrix = &c.Global
c.EDUServer.Matrix = &c.Global
c.FederationAPI.Matrix = &c.Global c.FederationAPI.Matrix = &c.Global
c.KeyServer.Matrix = &c.Global c.KeyServer.Matrix = &c.Global
c.MediaAPI.Matrix = &c.Global c.MediaAPI.Matrix = &c.Global
@ -519,15 +515,6 @@ func (config *Dendrite) UserAPIURL() string {
return string(config.UserAPI.InternalAPI.Connect) return string(config.UserAPI.InternalAPI.Connect)
} }
// EDUServerURL returns an HTTP URL for where the EDU server is listening.
func (config *Dendrite) EDUServerURL() string {
// Hard code the EDU server to talk HTTP for now.
// If we support HTTPS we need to think of a practical way to do certificate validation.
// People setting up servers shouldn't need to get a certificate valid for the public
// internet for an internal API.
return string(config.EDUServer.InternalAPI.Connect)
}
// KeyServerURL returns an HTTP URL for where the key server is listening. // KeyServerURL returns an HTTP URL for where the key server is listening.
func (config *Dendrite) KeyServerURL() string { func (config *Dendrite) KeyServerURL() string {
// Hard code the key server to talk HTTP for now. // Hard code the key server to talk HTTP for now.

View file

@ -1,17 +0,0 @@
package config
type EDUServer struct {
Matrix *Global `yaml:"-"`
InternalAPI InternalAPIOptions `yaml:"internal_api"`
}
func (c *EDUServer) Defaults(generate bool) {
c.InternalAPI.Listen = "http://localhost:7778"
c.InternalAPI.Connect = "http://localhost:7778"
}
func (c *EDUServer) Verify(configErrs *ConfigErrors, isMonolith bool) {
checkURL(configErrs, "edu_server.internal_api.listen", string(c.InternalAPI.Listen))
checkURL(configErrs, "edu_server.internal_api.connect", string(c.InternalAPI.Connect))
}

View file

@ -101,10 +101,6 @@ current_state_server:
max_open_conns: 100 max_open_conns: 100
max_idle_conns: 2 max_idle_conns: 2
conn_max_lifetime: -1 conn_max_lifetime: -1
edu_server:
internal_api:
listen: http://localhost:7778
connect: http://localhost:7778
federation_api: federation_api:
internal_api: internal_api:
listen: http://localhost:7772 listen: http://localhost:7772

View file

@ -74,12 +74,12 @@ func (s *OutputTypingEventConsumer) onMessage(ctx context.Context, msg *nats.Msg
userID := msg.Header.Get(jetstream.UserID) userID := msg.Header.Get(jetstream.UserID)
typing, err := strconv.ParseBool(msg.Header.Get("typing")) typing, err := strconv.ParseBool(msg.Header.Get("typing"))
if err != nil { if err != nil {
log.WithError(err).Errorf("EDU server output log: typing parse failure") log.WithError(err).Errorf("output log: typing parse failure")
return true return true
} }
timeout, err := strconv.Atoi(msg.Header.Get("timeout_ms")) timeout, err := strconv.Atoi(msg.Header.Get("timeout_ms"))
if err != nil { if err != nil {
log.WithError(err).Errorf("EDU server output log: timeout parse failure") log.WithError(err).Errorf("output log: timeout_ms parse failure")
return true return true
} }

View file

@ -21,7 +21,6 @@ import (
"strconv" "strconv"
"github.com/getsentry/sentry-go" "github.com/getsentry/sentry-go"
"github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/setup/process"
@ -81,7 +80,7 @@ func (s *OutputReceiptEventConsumer) Start() error {
} }
func (s *OutputReceiptEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { func (s *OutputReceiptEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
output := api.OutputReceiptEvent{ output := types.OutputReceiptEvent{
UserID: msg.Header.Get(jetstream.UserID), UserID: msg.Header.Get(jetstream.UserID),
RoomID: msg.Header.Get(jetstream.RoomID), RoomID: msg.Header.Get(jetstream.RoomID),
EventID: msg.Header.Get(jetstream.EventID), EventID: msg.Header.Get(jetstream.EventID),
@ -91,7 +90,7 @@ func (s *OutputReceiptEventConsumer) onMessage(ctx context.Context, msg *nats.Ms
timestamp, err := strconv.Atoi(msg.Header.Get("timestamp")) timestamp, err := strconv.Atoi(msg.Header.Get("timestamp"))
if err != nil { if err != nil {
// If the message was invalid, log it and move on to the next message in the stream // If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("EDU server output log: message parse failure") log.WithError(err).Errorf("output log: message parse failure")
sentry.CaptureException(err) sentry.CaptureException(err)
return true return true
} }
@ -126,7 +125,7 @@ func (s *OutputReceiptEventConsumer) onMessage(ctx context.Context, msg *nats.Ms
return true return true
} }
func (s *OutputReceiptEventConsumer) sendReadUpdate(ctx context.Context, output api.OutputReceiptEvent) error { func (s *OutputReceiptEventConsumer) sendReadUpdate(ctx context.Context, output types.OutputReceiptEvent) error {
if output.Type != "m.read" { if output.Type != "m.read" {
return nil return nil
} }

View file

@ -19,7 +19,6 @@ import (
"encoding/json" "encoding/json"
"github.com/getsentry/sentry-go" "github.com/getsentry/sentry-go"
"github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/setup/process"
@ -85,10 +84,10 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(ctx context.Context, msg *na
return true return true
} }
var output api.OutputSendToDeviceEvent var output types.OutputSendToDeviceEvent
if err := json.Unmarshal(msg.Data, &output); err != nil { if err := json.Unmarshal(msg.Data, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream // If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("EDU server output log: message parse failure") log.WithError(err).Errorf("output log: message parse failure")
sentry.CaptureException(err) sentry.CaptureException(err)
return true return true
} }
@ -98,7 +97,7 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(ctx context.Context, msg *na
"user_id": output.UserID, "user_id": output.UserID,
"device_id": output.DeviceID, "device_id": output.DeviceID,
"event_type": output.Type, "event_type": output.Type,
}).Info("sync API received send-to-device event from EDU server") }).Debugf("sync API received send-to-device event from the clientapi/federationsender")
streamPos, err := s.db.StoreNewSendForDeviceMessage( streamPos, err := s.db.StoreNewSendForDeviceMessage(
s.ctx, output.UserID, output.DeviceID, output.SendToDeviceEvent, s.ctx, output.UserID, output.DeviceID, output.SendToDeviceEvent,

View file

@ -17,7 +17,6 @@ package storage
import ( import (
"context" "context"
eduAPI "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
@ -46,7 +45,7 @@ type Database interface {
InviteEventsInRange(ctx context.Context, targetUserID string, r types.Range) (map[string]*gomatrixserverlib.HeaderedEvent, map[string]*gomatrixserverlib.HeaderedEvent, error) InviteEventsInRange(ctx context.Context, targetUserID string, r types.Range) (map[string]*gomatrixserverlib.HeaderedEvent, map[string]*gomatrixserverlib.HeaderedEvent, error)
PeeksInRange(ctx context.Context, userID, deviceID string, r types.Range) (peeks []types.Peek, err error) PeeksInRange(ctx context.Context, userID, deviceID string, r types.Range) (peeks []types.Peek, err error)
RoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []eduAPI.OutputReceiptEvent, error) RoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []types.OutputReceiptEvent, error)
// AllJoinedUsersInRooms returns a map of room ID to a list of all joined user IDs. // AllJoinedUsersInRooms returns a map of room ID to a list of all joined user IDs.
AllJoinedUsersInRooms(ctx context.Context) (map[string][]string, error) AllJoinedUsersInRooms(ctx context.Context) (map[string][]string, error)
@ -136,7 +135,7 @@ type Database interface {
// StoreReceipt stores new receipt events // StoreReceipt stores new receipt events
StoreReceipt(ctx context.Context, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error) StoreReceipt(ctx context.Context, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error)
// GetRoomReceipts gets all receipts for a given roomID // GetRoomReceipts gets all receipts for a given roomID
GetRoomReceipts(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) ([]eduAPI.OutputReceiptEvent, error) GetRoomReceipts(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) ([]types.OutputReceiptEvent, error)
// UpsertRoomUnreadNotificationCounts updates the notification statistics about a (user, room) key. // UpsertRoomUnreadNotificationCounts updates the notification statistics about a (user, room) key.
UpsertRoomUnreadNotificationCounts(ctx context.Context, userID, roomID string, notificationCount, highlightCount int) (types.StreamPosition, error) UpsertRoomUnreadNotificationCounts(ctx context.Context, userID, roomID string, notificationCount, highlightCount int) (types.StreamPosition, error)

View file

@ -21,7 +21,6 @@ import (
"github.com/lib/pq" "github.com/lib/pq"
"github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/syncapi/storage/tables" "github.com/matrix-org/dendrite/syncapi/storage/tables"
@ -95,16 +94,16 @@ func (r *receiptStatements) UpsertReceipt(ctx context.Context, txn *sql.Tx, room
return return
} }
func (r *receiptStatements) SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []api.OutputReceiptEvent, error) { func (r *receiptStatements) SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []types.OutputReceiptEvent, error) {
var lastPos types.StreamPosition var lastPos types.StreamPosition
rows, err := r.selectRoomReceipts.QueryContext(ctx, pq.Array(roomIDs), streamPos) rows, err := r.selectRoomReceipts.QueryContext(ctx, pq.Array(roomIDs), streamPos)
if err != nil { if err != nil {
return 0, nil, fmt.Errorf("unable to query room receipts: %w", err) return 0, nil, fmt.Errorf("unable to query room receipts: %w", err)
} }
defer internal.CloseAndLogIfError(ctx, rows, "SelectRoomReceiptsAfter: rows.close() failed") defer internal.CloseAndLogIfError(ctx, rows, "SelectRoomReceiptsAfter: rows.close() failed")
var res []api.OutputReceiptEvent var res []types.OutputReceiptEvent
for rows.Next() { for rows.Next() {
r := api.OutputReceiptEvent{} r := types.OutputReceiptEvent{}
var id types.StreamPosition var id types.StreamPosition
err = rows.Scan(&id, &r.RoomID, &r.Type, &r.UserID, &r.EventID, &r.Timestamp) err = rows.Scan(&id, &r.RoomID, &r.Type, &r.UserID, &r.EventID, &r.Timestamp)
if err != nil { if err != nil {

View file

@ -20,7 +20,6 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
eduAPI "github.com/matrix-org/dendrite/eduserver/api"
userapi "github.com/matrix-org/dendrite/userapi/api" userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/internal/eventutil"
@ -135,7 +134,7 @@ func (d *Database) PeeksInRange(ctx context.Context, userID, deviceID string, r
return d.Peeks.SelectPeeksInRange(ctx, nil, userID, deviceID, r) return d.Peeks.SelectPeeksInRange(ctx, nil, userID, deviceID, r)
} }
func (d *Database) RoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []eduAPI.OutputReceiptEvent, error) { func (d *Database) RoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []types.OutputReceiptEvent, error) {
return d.Receipts.SelectRoomReceiptsAfter(ctx, roomIDs, streamPos) return d.Receipts.SelectRoomReceiptsAfter(ctx, roomIDs, streamPos)
} }
@ -972,7 +971,7 @@ func (d *Database) StoreReceipt(ctx context.Context, roomId, receiptType, userId
return return
} }
func (d *Database) GetRoomReceipts(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) ([]eduAPI.OutputReceiptEvent, error) { func (d *Database) GetRoomReceipts(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) ([]types.OutputReceiptEvent, error) {
_, receipts, err := d.Receipts.SelectRoomReceiptsAfter(ctx, roomIDs, streamPos) _, receipts, err := d.Receipts.SelectRoomReceiptsAfter(ctx, roomIDs, streamPos)
return receipts, err return receipts, err
} }

View file

@ -20,7 +20,6 @@ import (
"fmt" "fmt"
"strings" "strings"
"github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/syncapi/storage/tables" "github.com/matrix-org/dendrite/syncapi/storage/tables"
@ -99,7 +98,7 @@ func (r *receiptStatements) UpsertReceipt(ctx context.Context, txn *sql.Tx, room
} }
// SelectRoomReceiptsAfter select all receipts for a given room after a specific timestamp // SelectRoomReceiptsAfter select all receipts for a given room after a specific timestamp
func (r *receiptStatements) SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []api.OutputReceiptEvent, error) { func (r *receiptStatements) SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []types.OutputReceiptEvent, error) {
selectSQL := strings.Replace(selectRoomReceipts, "($2)", sqlutil.QueryVariadicOffset(len(roomIDs), 1), 1) selectSQL := strings.Replace(selectRoomReceipts, "($2)", sqlutil.QueryVariadicOffset(len(roomIDs), 1), 1)
var lastPos types.StreamPosition var lastPos types.StreamPosition
params := make([]interface{}, len(roomIDs)+1) params := make([]interface{}, len(roomIDs)+1)
@ -112,9 +111,9 @@ func (r *receiptStatements) SelectRoomReceiptsAfter(ctx context.Context, roomIDs
return 0, nil, fmt.Errorf("unable to query room receipts: %w", err) return 0, nil, fmt.Errorf("unable to query room receipts: %w", err)
} }
defer internal.CloseAndLogIfError(ctx, rows, "SelectRoomReceiptsAfter: rows.close() failed") defer internal.CloseAndLogIfError(ctx, rows, "SelectRoomReceiptsAfter: rows.close() failed")
var res []api.OutputReceiptEvent var res []types.OutputReceiptEvent
for rows.Next() { for rows.Next() {
r := api.OutputReceiptEvent{} r := types.OutputReceiptEvent{}
var id types.StreamPosition var id types.StreamPosition
err = rows.Scan(&id, &r.RoomID, &r.Type, &r.UserID, &r.EventID, &r.Timestamp) err = rows.Scan(&id, &r.RoomID, &r.Type, &r.UserID, &r.EventID, &r.Timestamp)
if err != nil { if err != nil {

View file

@ -18,7 +18,6 @@ import (
"context" "context"
"database/sql" "database/sql"
eduAPI "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
@ -168,7 +167,7 @@ type Filter interface {
type Receipts interface { type Receipts interface {
UpsertReceipt(ctx context.Context, txn *sql.Tx, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error) UpsertReceipt(ctx context.Context, txn *sql.Tx, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error)
SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []eduAPI.OutputReceiptEvent, error) SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []types.OutputReceiptEvent, error)
SelectMaxReceiptID(ctx context.Context, txn *sql.Tx) (id int64, err error) SelectMaxReceiptID(ctx context.Context, txn *sql.Tx) (id int64, err error)
} }

View file

@ -4,7 +4,6 @@ import (
"context" "context"
"encoding/json" "encoding/json"
eduAPI "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
) )
@ -53,7 +52,7 @@ func (p *ReceiptStreamProvider) IncrementalSync(
} }
// Group receipts by room, so we can create one ClientEvent for every room // Group receipts by room, so we can create one ClientEvent for every room
receiptsByRoom := make(map[string][]eduAPI.OutputReceiptEvent) receiptsByRoom := make(map[string][]types.OutputReceiptEvent)
for _, receipt := range receipts { for _, receipt := range receipts {
receiptsByRoom[receipt.RoomID] = append(receiptsByRoom[receipt.RoomID], receipt) receiptsByRoom[receipt.RoomID] = append(receiptsByRoom[receipt.RoomID], receipt)
} }
@ -68,15 +67,15 @@ func (p *ReceiptStreamProvider) IncrementalSync(
Type: gomatrixserverlib.MReceipt, Type: gomatrixserverlib.MReceipt,
RoomID: roomID, RoomID: roomID,
} }
content := make(map[string]eduAPI.ReceiptMRead) content := make(map[string]ReceiptMRead)
for _, receipt := range receipts { for _, receipt := range receipts {
read, ok := content[receipt.EventID] read, ok := content[receipt.EventID]
if !ok { if !ok {
read = eduAPI.ReceiptMRead{ read = ReceiptMRead{
User: make(map[string]eduAPI.ReceiptTS), User: make(map[string]ReceiptTS),
} }
} }
read.User[receipt.UserID] = eduAPI.ReceiptTS{TS: receipt.Timestamp} read.User[receipt.UserID] = ReceiptTS{TS: receipt.Timestamp}
content[receipt.EventID] = read content[receipt.EventID] = read
} }
ev.Content, err = json.Marshal(content) ev.Content, err = json.Marshal(content)
@ -91,3 +90,11 @@ func (p *ReceiptStreamProvider) IncrementalSync(
return lastPos return lastPos
} }
type ReceiptMRead struct {
User map[string]ReceiptTS `json:"m.read"`
}
type ReceiptTS struct {
TS gomatrixserverlib.Timestamp `json:"ts"`
}

View file

@ -487,3 +487,21 @@ type StreamedEvent struct {
Event *gomatrixserverlib.HeaderedEvent `json:"event"` Event *gomatrixserverlib.HeaderedEvent `json:"event"`
StreamPosition StreamPosition `json:"stream_position"` StreamPosition StreamPosition `json:"stream_position"`
} }
// OutputReceiptEvent is an entry in the receipt output kafka log
type OutputReceiptEvent struct {
UserID string `json:"user_id"`
RoomID string `json:"room_id"`
EventID string `json:"event_id"`
Type string `json:"type"`
Timestamp gomatrixserverlib.Timestamp `json:"timestamp"`
}
// OutputSendToDeviceEvent is an entry in the send-to-device output kafka log.
// This contains the full event content, along with the user ID and device ID
// to which it is destined.
type OutputSendToDeviceEvent struct {
UserID string `json:"user_id"`
DeviceID string `json:"device_id"`
gomatrixserverlib.SendToDeviceEvent
}