Merge branch 'master' into tmp-file-delete

This commit is contained in:
Prateek Sachan 2020-04-14 18:36:51 +05:30 committed by GitHub
commit 49a3ba51a3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
67 changed files with 887 additions and 379 deletions

View file

@ -72,7 +72,7 @@ Diagram:
| | | |
| | +---+ | |
| | +----------| S | | |
| | | Typing +---+ | |
| | | EDU +---+ | |
| |>=========================================>| Server |>=====================>| |
+------------+ | | +----------+
+---+ | |
@ -156,7 +156,7 @@ choke-point to implement ratelimiting and backoff correctly.
* It may be impossible to implement without folding it into the Room Server
forever coupling the components together.
## Typing Server
## EDU Server
* Reads new updates to typing from the logs written by the FS and CTS.
* Updates the current list of people typing in a room.
@ -179,7 +179,7 @@ choke-point to implement ratelimiting and backoff correctly.
* Reads new events and the current state of the rooms from logs written by the Room Server.
* Reads new receipts positions from the logs written by the Receipts Server.
* Reads changes to presence from the logs written by the Presence Server.
* Reads changes to typing from the logs written by the Typing Server.
* Reads changes to typing from the logs written by the EDU Server.
* Writes when a client starts and stops syncing to the logs.
## Client Search

View file

@ -20,6 +20,7 @@ package api
import (
"context"
"database/sql"
"errors"
"net/http"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
@ -97,15 +98,15 @@ type httpAppServiceQueryAPI struct {
// NewAppServiceQueryAPIHTTP creates a AppServiceQueryAPI implemented by talking
// to a HTTP POST API.
// If httpClient is nil then it uses http.DefaultClient
// If httpClient is nil an error is returned
func NewAppServiceQueryAPIHTTP(
appserviceURL string,
httpClient *http.Client,
) AppServiceQueryAPI {
) (AppServiceQueryAPI, error) {
if httpClient == nil {
httpClient = http.DefaultClient
return nil, errors.New("NewRoomserverAliasAPIHTTP: httpClient is <nil>")
}
return &httpAppServiceQueryAPI{appserviceURL, httpClient}
return &httpAppServiceQueryAPI{appserviceURL, httpClient}, nil
}
// RoomAliasExists implements AppServiceQueryAPI

View file

@ -23,9 +23,9 @@ import (
"github.com/matrix-org/dendrite/clientapi/routing"
"github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/common/transactions"
eduServerAPI "github.com/matrix-org/dendrite/eduserver/api"
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
typingServerAPI "github.com/matrix-org/dendrite/typingserver/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
)
@ -41,13 +41,13 @@ func SetupClientAPIComponent(
aliasAPI roomserverAPI.RoomserverAliasAPI,
inputAPI roomserverAPI.RoomserverInputAPI,
queryAPI roomserverAPI.RoomserverQueryAPI,
typingInputAPI typingServerAPI.TypingServerInputAPI,
eduInputAPI eduServerAPI.EDUServerInputAPI,
asAPI appserviceAPI.AppServiceQueryAPI,
transactionsCache *transactions.Cache,
fedSenderAPI federationSenderAPI.FederationSenderQueryAPI,
) {
roomserverProducer := producers.NewRoomserverProducer(inputAPI, queryAPI)
typingProducer := producers.NewTypingServerProducer(typingInputAPI)
eduProducer := producers.NewEDUServerProducer(eduInputAPI)
userUpdateProducer := &producers.UserUpdateProducer{
Producer: base.KafkaProducer,
@ -69,6 +69,6 @@ func SetupClientAPIComponent(
routing.Setup(
base.APIMux, base.Cfg, roomserverProducer, queryAPI, aliasAPI, asAPI,
accountsDB, deviceDB, federation, *keyRing, userUpdateProducer,
syncProducer, typingProducer, transactionsCache, fedSenderAPI,
syncProducer, eduProducer, transactionsCache, fedSenderAPI,
)
}

View file

@ -16,32 +16,32 @@ import (
"context"
"time"
"github.com/matrix-org/dendrite/typingserver/api"
"github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/gomatrixserverlib"
)
// TypingServerProducer produces events for the typing server to consume
type TypingServerProducer struct {
InputAPI api.TypingServerInputAPI
// EDUServerProducer produces events for the EDU server to consume
type EDUServerProducer struct {
InputAPI api.EDUServerInputAPI
}
// NewTypingServerProducer creates a new TypingServerProducer
func NewTypingServerProducer(inputAPI api.TypingServerInputAPI) *TypingServerProducer {
return &TypingServerProducer{
// NewEDUServerProducer creates a new EDUServerProducer
func NewEDUServerProducer(inputAPI api.EDUServerInputAPI) *EDUServerProducer {
return &EDUServerProducer{
InputAPI: inputAPI,
}
}
// Send typing event to typing server
func (p *TypingServerProducer) Send(
// SendTyping sends a typing event to EDU server
func (p *EDUServerProducer) SendTyping(
ctx context.Context, userID, roomID string,
typing bool, timeout int64,
typing bool, timeoutMS int64,
) error {
requestData := api.InputTypingEvent{
UserID: userID,
RoomID: roomID,
Typing: typing,
Timeout: timeout,
TimeoutMS: timeoutMS,
OriginServerTS: gomatrixserverlib.AsTimestamp(time.Now()),
}

View file

@ -104,18 +104,14 @@ func (c *RoomserverProducer) SendInputRoomEvents(
// This should only be needed for invite events that occur outside of a known room.
// If we are in the room then the event should be sent using the SendEvents method.
func (c *RoomserverProducer) SendInvite(
ctx context.Context, inviteEvent gomatrixserverlib.Event,
ctx context.Context, inviteEvent gomatrixserverlib.HeaderedEvent,
inviteRoomState []gomatrixserverlib.InviteV2StrippedState,
) error {
verReq := api.QueryRoomVersionForRoomRequest{RoomID: inviteEvent.RoomID()}
verRes := api.QueryRoomVersionForRoomResponse{}
err := c.QueryAPI.QueryRoomVersionForRoom(ctx, &verReq, &verRes)
if err != nil {
return err
}
request := api.InputRoomEventsRequest{
InputInviteEvents: []api.InputInviteEvent{{
Event: inviteEvent.Headered(verRes.RoomVersion),
Event: inviteEvent,
InviteRoomState: inviteRoomState,
RoomVersion: inviteEvent.RoomVersion,
}},
}
var response api.InputRoomEventsResponse

View file

@ -15,6 +15,7 @@
package routing
import (
"encoding/json"
"io/ioutil"
"net/http"
@ -80,12 +81,26 @@ func SaveAccountData(
defer req.Body.Close() // nolint: errcheck
if req.Body == http.NoBody {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.NotJSON("Content not JSON"),
}
}
body, err := ioutil.ReadAll(req.Body)
if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("ioutil.ReadAll failed")
return jsonerror.InternalServerError()
}
if !json.Valid(body) {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.BadJSON("Bad JSON content"),
}
}
if err := accountDB.SaveAccountData(
req.Context(), localpart, roomID, dataType, string(body),
); err != nil {

View file

@ -29,7 +29,7 @@ func GetCapabilities(
req *http.Request, queryAPI roomserverAPI.RoomserverQueryAPI,
) util.JSONResponse {
roomVersionsQueryReq := roomserverAPI.QueryRoomVersionCapabilitiesRequest{}
var roomVersionsQueryRes roomserverAPI.QueryRoomVersionCapabilitiesResponse
roomVersionsQueryRes := roomserverAPI.QueryRoomVersionCapabilitiesResponse{}
if err := queryAPI.QueryRoomVersionCapabilities(
req.Context(),
&roomVersionsQueryReq,

View file

@ -242,6 +242,9 @@ func (r joinRoomReq) joinRoomUsingServers(
queryRes := roomserverAPI.QueryLatestEventsAndStateResponse{}
event, err := common.BuildEvent(r.req.Context(), &eb, r.cfg, r.evTime, r.queryAPI, &queryRes)
if err == nil {
// If we have successfully built an event at this point then we can
// assert that the room is a local room, as BuildEvent was able to
// add prev_events etc successfully.
if _, err = r.producer.SendEvents(
r.req.Context(),
[]gomatrixserverlib.HeaderedEvent{
@ -260,6 +263,13 @@ func (r joinRoomReq) joinRoomUsingServers(
}{roomID},
}
}
// Otherwise, if we've reached here, then we haven't been able to populate
// prev_events etc for the room, therefore the room is probably federated.
// TODO: This needs to be re-thought, as in the case of an invite, the room
// will exist in the database in roomserver_rooms but won't have any state
// events, therefore this below check fails.
if err != common.ErrRoomNoExists {
util.GetLogger(r.req.Context()).WithError(err).Error("common.BuildEvent failed")
return jsonerror.InternalServerError()
@ -320,14 +330,14 @@ func (r joinRoomReq) joinRoomUsingServer(roomID string, server gomatrixserverlib
respMakeJoin, err := r.federation.MakeJoin(r.req.Context(), server, roomID, r.userID, supportedVersions)
if err != nil {
// TODO: Check if the user was not allowed to join the room.
return nil, err
return nil, fmt.Errorf("r.federation.MakeJoin: %w", err)
}
// Set all the fields to be what they should be, this should be a no-op
// but it's possible that the remote server returned us something "odd"
err = r.writeToBuilder(&respMakeJoin.JoinEvent, roomID)
if err != nil {
return nil, err
return nil, fmt.Errorf("r.writeToBuilder: %w", err)
}
if respMakeJoin.RoomVersion == "" {
@ -347,18 +357,16 @@ func (r joinRoomReq) joinRoomUsingServer(roomID string, server gomatrixserverlib
r.cfg.Matrix.PrivateKey, respMakeJoin.RoomVersion,
)
if err != nil {
util.GetLogger(r.req.Context()).WithError(err).Error("respMakeJoin.JoinEvent.Build failed")
res := jsonerror.InternalServerError()
return &res, nil
return nil, fmt.Errorf("respMakeJoin.JoinEvent.Build: %w", err)
}
respSendJoin, err := r.federation.SendJoin(r.req.Context(), server, event, respMakeJoin.RoomVersion)
if err != nil {
return nil, err
return nil, fmt.Errorf("r.federation.SendJoin: %w", err)
}
if err = respSendJoin.Check(r.req.Context(), r.keyRing, event); err != nil {
return nil, err
return nil, fmt.Errorf("respSendJoin: %w", err)
}
if err = r.producer.SendEventWithState(
@ -366,9 +374,7 @@ func (r joinRoomReq) joinRoomUsingServer(roomID string, server gomatrixserverlib
gomatrixserverlib.RespState(respSendJoin.RespState),
event.Headered(respMakeJoin.RoomVersion),
); err != nil {
util.GetLogger(r.req.Context()).WithError(err).Error("gomatrixserverlib.RespState failed")
res := jsonerror.InternalServerError()
return &res, nil
return nil, fmt.Errorf("r.producer.SendEventWithState: %w", err)
}
return &util.JSONResponse{

View file

@ -58,7 +58,7 @@ func Setup(
keyRing gomatrixserverlib.KeyRing,
userUpdateProducer *producers.UserUpdateProducer,
syncProducer *producers.SyncAPIProducer,
typingProducer *producers.TypingServerProducer,
eduProducer *producers.EDUServerProducer,
transactionsCache *transactions.Cache,
federationSender federationSenderAPI.FederationSenderQueryAPI,
) {
@ -235,7 +235,7 @@ func Setup(
if err != nil {
return util.ErrorResponse(err)
}
return SendTyping(req, device, vars["roomID"], vars["userID"], accountDB, typingProducer)
return SendTyping(req, device, vars["roomID"], vars["userID"], accountDB, eduProducer)
}),
).Methods(http.MethodPut, http.MethodOptions)

View file

@ -35,7 +35,7 @@ type typingContentJSON struct {
func SendTyping(
req *http.Request, device *authtypes.Device, roomID string,
userID string, accountDB accounts.Database,
typingProducer *producers.TypingServerProducer,
eduProducer *producers.EDUServerProducer,
) util.JSONResponse {
if device.UserID != userID {
return util.JSONResponse{
@ -69,10 +69,10 @@ func SendTyping(
return *resErr
}
if err = typingProducer.Send(
if err = eduProducer.SendTyping(
req.Context(), userID, roomID, r.Typing, r.Timeout,
); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("typingProducer.Send failed")
util.GetLogger(req.Context()).WithError(err).Error("eduProducer.Send failed")
return jsonerror.InternalServerError()
}

View file

@ -19,8 +19,8 @@ import (
"github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/common/keydb"
"github.com/matrix-org/dendrite/common/transactions"
"github.com/matrix-org/dendrite/typingserver"
"github.com/matrix-org/dendrite/typingserver/cache"
"github.com/matrix-org/dendrite/eduserver"
"github.com/matrix-org/dendrite/eduserver/cache"
)
func main() {
@ -38,11 +38,11 @@ func main() {
asQuery := base.CreateHTTPAppServiceAPIs()
alias, input, query := base.CreateHTTPRoomserverAPIs()
fedSenderAPI := base.CreateHTTPFederationSenderAPIs()
typingInputAPI := typingserver.SetupTypingServerComponent(base, cache.NewTypingCache())
eduInputAPI := eduserver.SetupEDUServerComponent(base, cache.New())
clientapi.SetupClientAPIComponent(
base, deviceDB, accountDB, federation, &keyRing,
alias, input, query, typingInputAPI, asQuery, transactions.New(), fedSenderAPI,
alias, input, query, eduInputAPI, asQuery, transactions.New(), fedSenderAPI,
)
base.SetupAndServeHTTP(string(base.Cfg.Bind.ClientAPI), string(base.Cfg.Listen.ClientAPI))

View file

@ -16,22 +16,22 @@ import (
_ "net/http/pprof"
"github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/typingserver"
"github.com/matrix-org/dendrite/typingserver/cache"
"github.com/matrix-org/dendrite/eduserver"
"github.com/matrix-org/dendrite/eduserver/cache"
"github.com/sirupsen/logrus"
)
func main() {
cfg := basecomponent.ParseFlags()
base := basecomponent.NewBaseDendrite(cfg, "TypingServerAPI")
base := basecomponent.NewBaseDendrite(cfg, "EDUServerAPI")
defer func() {
if err := base.Close(); err != nil {
logrus.WithError(err).Warn("BaseDendrite close failed")
}
}()
typingserver.SetupTypingServerComponent(base, cache.NewTypingCache())
eduserver.SetupEDUServerComponent(base, cache.New())
base.SetupAndServeHTTP(string(base.Cfg.Bind.TypingServer), string(base.Cfg.Listen.TypingServer))
base.SetupAndServeHTTP(string(base.Cfg.Bind.EDUServer), string(base.Cfg.Listen.EDUServer))
}

View file

@ -15,8 +15,11 @@
package main
import (
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/common/keydb"
"github.com/matrix-org/dendrite/eduserver"
"github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/federationapi"
)
@ -34,10 +37,12 @@ func main() {
alias, input, query := base.CreateHTTPRoomserverAPIs()
asQuery := base.CreateHTTPAppServiceAPIs()
eduInputAPI := eduserver.SetupEDUServerComponent(base, cache.New())
eduProducer := producers.NewEDUServerProducer(eduInputAPI)
federationapi.SetupFederationAPIComponent(
base, accountDB, deviceDB, federation, &keyRing,
alias, input, query, asQuery, federationSender,
alias, input, query, asQuery, federationSender, eduProducer,
)
base.SetupAndServeHTTP(string(base.Cfg.Bind.FederationAPI), string(base.Cfg.Listen.FederationAPI))

View file

@ -20,18 +20,19 @@ import (
"github.com/matrix-org/dendrite/appservice"
"github.com/matrix-org/dendrite/clientapi"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/common/keydb"
"github.com/matrix-org/dendrite/common/transactions"
"github.com/matrix-org/dendrite/eduserver"
"github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/federationapi"
"github.com/matrix-org/dendrite/federationsender"
"github.com/matrix-org/dendrite/mediaapi"
"github.com/matrix-org/dendrite/publicroomsapi"
"github.com/matrix-org/dendrite/roomserver"
"github.com/matrix-org/dendrite/syncapi"
"github.com/matrix-org/dendrite/typingserver"
"github.com/matrix-org/dendrite/typingserver/cache"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"
@ -56,7 +57,7 @@ func main() {
keyRing := keydb.CreateKeyRing(federation.Client, keyDB)
alias, input, query := roomserver.SetupRoomServerComponent(base)
typingInputAPI := typingserver.SetupTypingServerComponent(base, cache.NewTypingCache())
eduInputAPI := eduserver.SetupEDUServerComponent(base, cache.New())
asQuery := appservice.SetupAppServiceAPIComponent(
base, accountDB, deviceDB, federation, alias, query, transactions.New(),
)
@ -65,9 +66,10 @@ func main() {
clientapi.SetupClientAPIComponent(
base, deviceDB, accountDB,
federation, &keyRing, alias, input, query,
typingInputAPI, asQuery, transactions.New(), fedSenderAPI,
eduInputAPI, asQuery, transactions.New(), fedSenderAPI,
)
federationapi.SetupFederationAPIComponent(base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery, fedSenderAPI)
eduProducer := producers.NewEDUServerProducer(eduInputAPI)
federationapi.SetupFederationAPIComponent(base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery, fedSenderAPI, eduProducer)
mediaapi.SetupMediaAPIComponent(base, deviceDB)
publicroomsapi.SetupPublicRoomsAPIComponent(base, deviceDB, query, federation, nil)
syncapi.SetupSyncAPIComponent(base, deviceDB, accountDB, query, federation, cfg)

View file

@ -23,18 +23,19 @@ import (
"github.com/matrix-org/dendrite/appservice"
"github.com/matrix-org/dendrite/clientapi"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/common/transactions"
"github.com/matrix-org/dendrite/eduserver"
"github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/federationapi"
"github.com/matrix-org/dendrite/federationsender"
"github.com/matrix-org/dendrite/mediaapi"
"github.com/matrix-org/dendrite/publicroomsapi"
"github.com/matrix-org/dendrite/roomserver"
"github.com/matrix-org/dendrite/syncapi"
"github.com/matrix-org/dendrite/typingserver"
"github.com/matrix-org/dendrite/typingserver/cache"
"github.com/matrix-org/go-http-js-libp2p/go_http_js_libp2p"
"github.com/matrix-org/gomatrixserverlib"
@ -122,7 +123,7 @@ func main() {
p2pPublicRoomProvider := NewLibP2PPublicRoomsProvider(node)
alias, input, query := roomserver.SetupRoomServerComponent(base)
typingInputAPI := typingserver.SetupTypingServerComponent(base, cache.NewTypingCache())
eduInputAPI := eduserver.SetupEDUServerComponent(base, cache.New())
asQuery := appservice.SetupAppServiceAPIComponent(
base, accountDB, deviceDB, federation, alias, query, transactions.New(),
)
@ -131,9 +132,10 @@ func main() {
clientapi.SetupClientAPIComponent(
base, deviceDB, accountDB,
federation, &keyRing, alias, input, query,
typingInputAPI, asQuery, transactions.New(), fedSenderAPI,
eduInputAPI, asQuery, transactions.New(), fedSenderAPI,
)
federationapi.SetupFederationAPIComponent(base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery, fedSenderAPI)
eduProducer := producers.NewEDUServerProducer(eduInputAPI)
federationapi.SetupFederationAPIComponent(base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery, fedSenderAPI, eduProducer)
mediaapi.SetupMediaAPIComponent(base, deviceDB)
publicroomsapi.SetupPublicRoomsAPIComponent(base, deviceDB, query, federation, p2pPublicRoomProvider)
syncapi.SetupSyncAPIComponent(base, deviceDB, accountDB, query, federation, cfg)

View file

@ -44,6 +44,8 @@ var (
// This needs to be high enough to account for the time it takes to create
// the postgres database tables which can take a while on travis.
timeoutString = defaulting(os.Getenv("TIMEOUT"), "60s")
// Timeout for http client
timeoutHTTPClient = defaulting(os.Getenv("TIMEOUT_HTTP"), "30s")
// The name of maintenance database to connect to in order to create the test database.
postgresDatabase = defaulting(os.Getenv("POSTGRES_DATABASE"), "postgres")
// The name of the test database to create.
@ -68,7 +70,10 @@ func defaulting(value, defaultValue string) string {
return value
}
var timeout time.Duration
var (
timeout time.Duration
timeoutHTTP time.Duration
)
func init() {
var err error
@ -76,6 +81,10 @@ func init() {
if err != nil {
panic(err)
}
timeoutHTTP, err = time.ParseDuration(timeoutHTTPClient)
if err != nil {
panic(err)
}
}
func createDatabase(database string) error {
@ -199,7 +208,10 @@ func writeToRoomServer(input []string, roomserverURL string) error {
return err
}
}
x := api.NewRoomserverInputAPIHTTP(roomserverURL, nil)
x, err := api.NewRoomserverInputAPIHTTP(roomserverURL, &http.Client{Timeout: timeoutHTTP})
if err != nil {
return err
}
return x.InputRoomEvents(context.Background(), &request, &response)
}
@ -258,7 +270,7 @@ func testRoomserver(input []string, wantOutput []string, checkQueries func(api.R
cmd.Args = []string{"dendrite-room-server", "--config", filepath.Join(dir, test.ConfigFile)}
gotOutput, err := runAndReadFromTopic(cmd, cfg.RoomServerURL()+"/metrics", doInput, outputTopic, len(wantOutput), func() {
queryAPI := api.NewRoomserverQueryAPIHTTP("http://"+string(cfg.Listen.RoomServer), nil)
queryAPI, _ := api.NewRoomserverQueryAPIHTTP("http://"+string(cfg.Listen.RoomServer), &http.Client{Timeout: timeoutHTTP})
checkQueries(queryAPI)
})
if err != nil {

View file

@ -19,6 +19,7 @@ import (
"io"
"net/http"
"net/url"
"time"
"golang.org/x/crypto/ed25519"
@ -35,9 +36,9 @@ import (
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
"github.com/matrix-org/dendrite/common/config"
eduServerAPI "github.com/matrix-org/dendrite/eduserver/api"
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
typingServerAPI "github.com/matrix-org/dendrite/typingserver/api"
"github.com/sirupsen/logrus"
)
@ -52,6 +53,7 @@ type BaseDendrite struct {
// APIMux should be used to register new public matrix api endpoints
APIMux *mux.Router
httpClient *http.Client
Cfg *config.Dendrite
KafkaConsumer sarama.Consumer
KafkaProducer sarama.SyncProducer
@ -77,11 +79,14 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string) *BaseDendrite {
kafkaConsumer, kafkaProducer = setupKafka(cfg)
}
const defaultHTTPTimeout = 30 * time.Second
return &BaseDendrite{
componentName: componentName,
tracerCloser: closer,
Cfg: cfg,
APIMux: mux.NewRouter().UseEncodedPath(),
httpClient: &http.Client{Timeout: defaultHTTPTimeout},
KafkaConsumer: kafkaConsumer,
KafkaProducer: kafkaProducer,
}
@ -95,7 +100,11 @@ func (b *BaseDendrite) Close() error {
// CreateHTTPAppServiceAPIs returns the QueryAPI for hitting the appservice
// component over HTTP.
func (b *BaseDendrite) CreateHTTPAppServiceAPIs() appserviceAPI.AppServiceQueryAPI {
return appserviceAPI.NewAppServiceQueryAPIHTTP(b.Cfg.AppServiceURL(), nil)
a, err := appserviceAPI.NewAppServiceQueryAPIHTTP(b.Cfg.AppServiceURL(), b.httpClient)
if err != nil {
logrus.WithError(err).Panic("CreateHTTPAppServiceAPIs failed")
}
return a
}
// CreateHTTPRoomserverAPIs returns the AliasAPI, InputAPI and QueryAPI for hitting
@ -105,22 +114,40 @@ func (b *BaseDendrite) CreateHTTPRoomserverAPIs() (
roomserverAPI.RoomserverInputAPI,
roomserverAPI.RoomserverQueryAPI,
) {
alias := roomserverAPI.NewRoomserverAliasAPIHTTP(b.Cfg.RoomServerURL(), nil)
input := roomserverAPI.NewRoomserverInputAPIHTTP(b.Cfg.RoomServerURL(), nil)
query := roomserverAPI.NewRoomserverQueryAPIHTTP(b.Cfg.RoomServerURL(), nil)
alias, err := roomserverAPI.NewRoomserverAliasAPIHTTP(b.Cfg.RoomServerURL(), b.httpClient)
if err != nil {
logrus.WithError(err).Panic("NewRoomserverAliasAPIHTTP failed")
}
input, err := roomserverAPI.NewRoomserverInputAPIHTTP(b.Cfg.RoomServerURL(), b.httpClient)
if err != nil {
logrus.WithError(err).Panic("NewRoomserverInputAPIHTTP failed", b.httpClient)
}
query, err := roomserverAPI.NewRoomserverQueryAPIHTTP(b.Cfg.RoomServerURL(), nil)
if err != nil {
logrus.WithError(err).Panic("NewRoomserverQueryAPIHTTP failed", b.httpClient)
}
return alias, input, query
}
// CreateHTTPTypingServerAPIs returns typingInputAPI for hitting the typing
// CreateHTTPEDUServerAPIs returns eduInputAPI for hitting the EDU
// server over HTTP
func (b *BaseDendrite) CreateHTTPTypingServerAPIs() typingServerAPI.TypingServerInputAPI {
return typingServerAPI.NewTypingServerInputAPIHTTP(b.Cfg.TypingServerURL(), nil)
func (b *BaseDendrite) CreateHTTPEDUServerAPIs() eduServerAPI.EDUServerInputAPI {
e, err := eduServerAPI.NewEDUServerInputAPIHTTP(b.Cfg.EDUServerURL(), nil)
if err != nil {
logrus.WithError(err).Panic("NewEDUServerInputAPIHTTP failed", b.httpClient)
}
return e
}
// CreateHTTPFederationSenderAPIs returns FederationSenderQueryAPI for hitting
// the federation sender over HTTP
func (b *BaseDendrite) CreateHTTPFederationSenderAPIs() federationSenderAPI.FederationSenderQueryAPI {
return federationSenderAPI.NewFederationSenderQueryAPIHTTP(b.Cfg.FederationSenderURL(), nil)
f, err := federationSenderAPI.NewFederationSenderQueryAPIHTTP(b.Cfg.FederationSenderURL(), nil)
if err != nil {
logrus.WithError(err).Panic("NewFederationSenderQueryAPIHTTP failed", b.httpClient)
}
return f
}
// CreateDeviceDB creates a new instance of the device database. Should only be

View file

@ -134,7 +134,7 @@ type Dendrite struct {
OutputRoomEvent Topic `yaml:"output_room_event"`
// Topic for sending account data from client API to sync API
OutputClientData Topic `yaml:"output_client_data"`
// Topic for typingserver/api.OutputTypingEvent events.
// Topic for eduserver/api.OutputTypingEvent events.
OutputTypingEvent Topic `yaml:"output_typing_event"`
// Topic for user updates (profile, presence)
UserUpdates Topic `yaml:"user_updates"`
@ -206,7 +206,7 @@ type Dendrite struct {
RoomServer Address `yaml:"room_server"`
FederationSender Address `yaml:"federation_sender"`
PublicRoomsAPI Address `yaml:"public_rooms_api"`
TypingServer Address `yaml:"typing_server"`
EDUServer Address `yaml:"edu_server"`
} `yaml:"bind"`
// The addresses for talking to other microservices.
@ -219,7 +219,7 @@ type Dendrite struct {
RoomServer Address `yaml:"room_server"`
FederationSender Address `yaml:"federation_sender"`
PublicRoomsAPI Address `yaml:"public_rooms_api"`
TypingServer Address `yaml:"typing_server"`
EDUServer Address `yaml:"edu_server"`
} `yaml:"listen"`
// The config for tracing the dendrite servers.
@ -571,7 +571,7 @@ func (config *Dendrite) checkListen(configErrs *configErrors) {
checkNotEmpty(configErrs, "listen.federation_api", string(config.Listen.FederationAPI))
checkNotEmpty(configErrs, "listen.sync_api", string(config.Listen.SyncAPI))
checkNotEmpty(configErrs, "listen.room_server", string(config.Listen.RoomServer))
checkNotEmpty(configErrs, "listen.typing_server", string(config.Listen.TypingServer))
checkNotEmpty(configErrs, "listen.edu_server", string(config.Listen.EDUServer))
}
// checkLogging verifies the parameters logging.* are valid.
@ -669,7 +669,7 @@ func fingerprintPEM(data []byte) *gomatrixserverlib.TLSFingerprint {
// AppServiceURL returns a HTTP URL for where the appservice component is listening.
func (config *Dendrite) AppServiceURL() string {
// Hard code the roomserver to talk HTTP for now.
// Hard code the appservice server to talk HTTP for now.
// If we support HTTPS we need to think of a practical way to do certificate validation.
// People setting up servers shouldn't need to get a certificate valid for the public
// internet for an internal API.
@ -685,18 +685,18 @@ func (config *Dendrite) RoomServerURL() string {
return "http://" + string(config.Listen.RoomServer)
}
// TypingServerURL returns an HTTP URL for where the typing server is listening.
func (config *Dendrite) TypingServerURL() string {
// Hard code the typing server to talk HTTP for now.
// EDUServerURL returns an HTTP URL for where the EDU server is listening.
func (config *Dendrite) EDUServerURL() string {
// Hard code the EDU server to talk HTTP for now.
// If we support HTTPS we need to think of a practical way to do certificate validation.
// People setting up servers shouldn't need to get a certificate valid for the public
// internet for an internal API.
return "http://" + string(config.Listen.TypingServer)
return "http://" + string(config.Listen.EDUServer)
}
// FederationSenderURL returns an HTTP URL for where the federation sender is listening.
func (config *Dendrite) FederationSenderURL() string {
// Hard code the typing server to talk HTTP for now.
// Hard code the federation sender server to talk HTTP for now.
// If we support HTTPS we need to think of a practical way to do certificate validation.
// People setting up servers shouldn't need to get a certificate valid for the public
// internet for an internal API.

View file

@ -62,7 +62,7 @@ listen:
sync_api: "localhost:7773"
media_api: "localhost:7774"
appservice_api: "localhost:7777"
typing_server: "localhost:7778"
edu_server: "localhost:7778"
logging:
- type: "file"
level: "info"

View file

@ -17,6 +17,7 @@ package common
import (
"context"
"errors"
"fmt"
"time"
"github.com/matrix-org/dendrite/common/config"
@ -46,6 +47,7 @@ func BuildEvent(
err := AddPrevEventsToEvent(ctx, builder, queryAPI, queryRes)
if err != nil {
// This can pass through a ErrRoomNoExists to the caller
return nil, err
}
@ -68,7 +70,7 @@ func AddPrevEventsToEvent(
) error {
eventsNeeded, err := gomatrixserverlib.StateNeededForEventBuilder(builder)
if err != nil {
return err
return fmt.Errorf("gomatrixserverlib.StateNeededForEventBuilder: %w", err)
}
// Ask the roomserver for information about this room
@ -77,7 +79,7 @@ func AddPrevEventsToEvent(
StateToFetch: eventsNeeded.Tuples(),
}
if err = queryAPI.QueryLatestEventsAndState(ctx, &queryReq, queryRes); err != nil {
return err
return fmt.Errorf("queryAPI.QueryLatestEventsAndState: %w", err)
}
if !queryRes.RoomExists {
@ -86,7 +88,7 @@ func AddPrevEventsToEvent(
eventFormat, err := queryRes.RoomVersion.EventFormat()
if err != nil {
return err
return fmt.Errorf("queryRes.RoomVersion.EventFormat: %w", err)
}
builder.Depth = queryRes.Depth
@ -96,26 +98,26 @@ func AddPrevEventsToEvent(
for i := range queryRes.StateEvents {
err = authEvents.AddEvent(&queryRes.StateEvents[i].Event)
if err != nil {
return err
return fmt.Errorf("authEvents.AddEvent: %w", err)
}
}
refs, err := eventsNeeded.AuthEventReferences(&authEvents)
if err != nil {
return err
return fmt.Errorf("eventsNeeded.AuthEventReferences: %w", err)
}
truncAuth, truncPrev := truncateAuthAndPrevEvents(refs, queryRes.LatestEvents)
switch eventFormat {
case gomatrixserverlib.EventFormatV1:
builder.AuthEvents = refs
builder.PrevEvents = queryRes.LatestEvents
builder.AuthEvents = truncAuth
builder.PrevEvents = truncPrev
case gomatrixserverlib.EventFormatV2:
v2AuthRefs := []string{}
v2PrevRefs := []string{}
for _, ref := range refs {
v2AuthRefs, v2PrevRefs := []string{}, []string{}
for _, ref := range truncAuth {
v2AuthRefs = append(v2AuthRefs, ref.EventID)
}
for _, ref := range queryRes.LatestEvents {
for _, ref := range truncPrev {
v2PrevRefs = append(v2PrevRefs, ref.EventID)
}
builder.AuthEvents = v2AuthRefs
@ -124,3 +126,21 @@ func AddPrevEventsToEvent(
return nil
}
// truncateAuthAndPrevEvents limits the number of events we add into
// an event as prev_events or auth_events.
// NOTSPEC: The limits here feel a bit arbitrary but they are currently
// here because of https://github.com/matrix-org/matrix-doc/issues/2307
// and because Synapse will just drop events that don't comply.
func truncateAuthAndPrevEvents(auth, prev []gomatrixserverlib.EventReference) (
truncAuth, truncPrev []gomatrixserverlib.EventReference,
) {
truncAuth, truncPrev = auth, prev
if len(truncAuth) > 10 {
truncAuth = truncAuth[:10]
}
if len(truncPrev) > 20 {
truncPrev = truncPrev[:20]
}
return
}

View file

@ -106,7 +106,7 @@ func MakeConfig(configDir, kafkaURI, database, host string, startPort int) (*con
cfg.Listen.RoomServer = assignAddress()
cfg.Listen.SyncAPI = assignAddress()
cfg.Listen.PublicRoomsAPI = assignAddress()
cfg.Listen.TypingServer = assignAddress()
cfg.Listen.EDUServer = assignAddress()
// Bind to the same address as the listen address
// All microservices are run on the same host in testing
@ -117,7 +117,7 @@ func MakeConfig(configDir, kafkaURI, database, host string, startPort int) (*con
cfg.Bind.RoomServer = cfg.Listen.RoomServer
cfg.Bind.SyncAPI = cfg.Listen.SyncAPI
cfg.Bind.PublicRoomsAPI = cfg.Listen.PublicRoomsAPI
cfg.Bind.TypingServer = cfg.Listen.TypingServer
cfg.Bind.EDUServer = cfg.Listen.EDUServer
return &cfg, port, nil
}

View file

@ -85,7 +85,7 @@ kafka:
topics:
output_room_event: roomserverOutput
output_client_data: clientapiOutput
output_typing_event: typingServerOutput
output_typing_event: eduServerOutput
user_updates: userUpdates
# The postgres connection configs for connecting to the databases e.g a postgres:// URI
@ -114,7 +114,7 @@ listen:
public_rooms_api: "localhost:7775"
federation_sender: "localhost:7776"
appservice_api: "localhost:7777"
typing_server: "localhost:7778"
edu_server: "localhost:7778"
# The configuration for tracing the dendrite components.
tracing:

View file

@ -58,7 +58,7 @@ docker-compose up kafka zookeeper postgres
and the following dendrite components
```
docker-compose up client_api media_api sync_api room_server public_rooms_api typing_server
docker-compose up client_api media_api sync_api room_server public_rooms_api edu_server
docker-compose up client_api_proxy
```

View file

@ -85,7 +85,7 @@ kafka:
topics:
output_room_event: roomserverOutput
output_client_data: clientapiOutput
output_typing_event: typingServerOutput
output_typing_event: eduServerOutput
user_updates: userUpdates
@ -114,7 +114,7 @@ listen:
media_api: "media_api:7774"
public_rooms_api: "public_rooms_api:7775"
federation_sender: "federation_sender:7776"
typing_server: "typing_server:7777"
edu_server: "typing_server:7777"
# The configuration for tracing the dendrite components.
tracing:

View file

@ -103,10 +103,10 @@ services:
networks:
- internal
typing_server:
container_name: dendrite_typing_server
hostname: typing_server
entrypoint: ["bash", "./docker/services/typing-server.sh"]
edu_server:
container_name: dendrite_edu_server
hostname: edu_server
entrypoint: ["bash", "./docker/services/edu-server.sh"]
build: ./
volumes:
- ..:/build

View file

@ -0,0 +1,5 @@
#!/bin/bash
bash ./docker/build.sh
./bin/dendrite-edu-server --config=dendrite.yaml

View file

@ -1,5 +0,0 @@
#!/bin/bash
bash ./docker/build.sh
./bin/dendrite-typing-server --config=dendrite.yaml

View file

@ -10,9 +10,9 @@ passes.
## Finding out which tests to add
We recommend you run the tests locally by manually setting up SyTest or using a
SyTest docker image. After running the tests, a script will print the tests you
need to add to `sytest-whitelist`.
We recommend you run the tests locally by using the SyTest docker image or
manually setting up SyTest. After running the tests, a script will print the
tests you need to add to `sytest-whitelist`.
You should proceed after you see no build problems for dendrite after running:
@ -20,9 +20,32 @@ You should proceed after you see no build problems for dendrite after running:
./build.sh
```
### Using the SyTest Docker image
Use the following commands to pull the latest SyTest image and run the tests:
```sh
docker pull matrixdotorg/sytest-dendrite
docker run --rm -v /path/to/dendrite/:/src/ -v /path/to/log/output/:/logs/ matrixdotorg/sytest-dendrite
```
`/path/to/dendrite/` should be replaced with the actual path to your dendrite
source code. The test results TAP file and homeserver logging output will go to
`/path/to/log/output`. The output of the command should tell you if you need to
add any tests to `sytest-whitelist`.
When debugging, the following Docker `run` options may also be useful:
* `-v /path/to/sytest/:/sytest/`: Use your local SyTest repository at
`/path/to/sytest` instead of pulling from GitHub. This is useful when you want
to speed things up or make modifications to SyTest.
* `--entrypoint bash`: Prevent the container from automatically starting the
tests. When used, you need to manually run `/bootstrap.sh dendrite` inside
the container to start them.
### Manually Setting up SyTest
Make sure you have Perl v5+ installed, and get SyTest with:
If you don't want to use the Docker image, you can also run SyTest by hand. Make
sure you have Perl 5 or above, and get SyTest with:
(Note that this guide assumes your SyTest checkout is next to your
`dendrite` checkout.)
@ -37,12 +60,23 @@ Set up the database:
```sh
sudo -u postgres psql -c "CREATE USER dendrite PASSWORD 'itsasecret'"
sudo -u postgres psql -c "CREATE DATABASE sytest_template OWNER dendrite"
for i in dendrite0 dendrite1 sytest_template; do sudo -u postgres psql -c "CREATE DATABASE $i OWNER dendrite;"; done
mkdir -p "server-0"
cat > "server-0/database.yaml" << EOF
args:
user: dendrite
database: dendrite
password: itsasecret
database: dendrite0
host: 127.0.0.1
sslmode: disable
type: pg
EOF
mkdir -p "server-1"
cat > "server-1/database.yaml" << EOF
args:
user: dendrite
password: itsasecret
database: dendrite1
host: 127.0.0.1
sslmode: disable
type: pg
@ -52,29 +86,20 @@ EOF
Run the tests:
```sh
./run-tests.pl -I Dendrite::Monolith -d ../dendrite/bin -W ../dendrite/sytest-whitelist -O tap --all | tee results.tap
POSTGRES=1 ./run-tests.pl -I Dendrite::Monolith -d ../dendrite/bin -W ../dendrite/sytest-whitelist -O tap --all | tee results.tap
```
where `tee` lets you see the results while they're being piped to the file.
where `tee` lets you see the results while they're being piped to the file, and
`POSTGRES=1` enables testing with PostgeSQL. If the `POSTGRES` environment
variable is not set or is set to 0, SyTest will fall back to SQLite 3. For more
flags and options, see https://github.com/matrix-org/sytest#running.
Once the tests are complete, run the helper script to see if you need to add
any newly passing test names to `sytest-whitelist` in the project's root directory:
any newly passing test names to `sytest-whitelist` in the project's root
directory:
```sh
../dendrite/show-expected-fail-tests.sh results.tap ../dendrite/sytest-whitelist ../dendrite/sytest-blacklist
```
If the script prints nothing/exits with 0, then you're good to go.
### Using a SyTest Docker image
Ensure you have the latest image for SyTest, then run the tests:
```sh
docker pull matrixdotorg/sytest-dendrite
docker run --rm -v /path/to/dendrite/:/src/ matrixdotorg/sytest-dendrite
```
where `/path/to/dendrite/` should be replaced with the actual path to your
dendrite source code. The output should tell you if you need to add any tests to
`sytest-whitelist`.

View file

@ -15,6 +15,7 @@ package api
import (
"context"
"errors"
"net/http"
commonHTTP "github.com/matrix-org/dendrite/common/http"
@ -30,13 +31,13 @@ type InputTypingEvent struct {
RoomID string `json:"room_id"`
// Typing is true if the user is typing, false if they have stopped.
Typing bool `json:"typing"`
// Timeout is the interval for which the user should be marked as typing.
Timeout int64 `json:"timeout"`
// Timeout is the interval in milliseconds for which the user should be marked as typing.
TimeoutMS int64 `json:"timeout"`
// OriginServerTS when the server received the update.
OriginServerTS gomatrixserverlib.Timestamp `json:"origin_server_ts"`
}
// InputTypingEventRequest is a request to TypingServerInputAPI
// InputTypingEventRequest is a request to EDUServerInputAPI
type InputTypingEventRequest struct {
InputTypingEvent InputTypingEvent `json:"input_typing_event"`
}
@ -44,8 +45,8 @@ type InputTypingEventRequest struct {
// InputTypingEventResponse is a response to InputTypingEvents
type InputTypingEventResponse struct{}
// TypingServerInputAPI is used to write events to the typing server.
type TypingServerInputAPI interface {
// EDUServerInputAPI is used to write events to the typing server.
type EDUServerInputAPI interface {
InputTypingEvent(
ctx context.Context,
request *InputTypingEventRequest,
@ -53,24 +54,24 @@ type TypingServerInputAPI interface {
) error
}
// TypingServerInputTypingEventPath is the HTTP path for the InputTypingEvent API.
const TypingServerInputTypingEventPath = "/api/typingserver/input"
// EDUServerInputTypingEventPath is the HTTP path for the InputTypingEvent API.
const EDUServerInputTypingEventPath = "/api/eduserver/input"
// NewTypingServerInputAPIHTTP creates a TypingServerInputAPI implemented by talking to a HTTP POST API.
func NewTypingServerInputAPIHTTP(typingServerURL string, httpClient *http.Client) TypingServerInputAPI {
// NewEDUServerInputAPIHTTP creates a EDUServerInputAPI implemented by talking to a HTTP POST API.
func NewEDUServerInputAPIHTTP(eduServerURL string, httpClient *http.Client) (EDUServerInputAPI, error) {
if httpClient == nil {
httpClient = http.DefaultClient
return nil, errors.New("NewTypingServerInputAPIHTTP: httpClient is <nil>")
}
return &httpTypingServerInputAPI{typingServerURL, httpClient}
return &httpEDUServerInputAPI{eduServerURL, httpClient}, nil
}
type httpTypingServerInputAPI struct {
typingServerURL string
type httpEDUServerInputAPI struct {
eduServerURL string
httpClient *http.Client
}
// InputRoomEvents implements TypingServerInputAPI
func (h *httpTypingServerInputAPI) InputTypingEvent(
// InputRoomEvents implements EDUServerInputAPI
func (h *httpEDUServerInputAPI) InputTypingEvent(
ctx context.Context,
request *InputTypingEventRequest,
response *InputTypingEventResponse,
@ -78,6 +79,6 @@ func (h *httpTypingServerInputAPI) InputTypingEvent(
span, ctx := opentracing.StartSpanFromContext(ctx, "InputTypingEvent")
defer span.Finish()
apiURL := h.typingServerURL + TypingServerInputTypingEventPath
apiURL := h.eduServerURL + EDUServerInputTypingEventPath
return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}

View file

@ -32,8 +32,8 @@ type roomData struct {
userSet userSet
}
// TypingCache maintains a list of users typing in each room.
type TypingCache struct {
// EDUCache maintains a list of users typing in each room.
type EDUCache struct {
sync.RWMutex
latestSyncPosition int64
data map[string]*roomData
@ -42,26 +42,26 @@ type TypingCache struct {
// Create a roomData with its sync position set to the latest sync position.
// Must only be called after locking the cache.
func (t *TypingCache) newRoomData() *roomData {
func (t *EDUCache) newRoomData() *roomData {
return &roomData{
syncPosition: t.latestSyncPosition,
userSet: make(userSet),
}
}
// NewTypingCache returns a new TypingCache initialised for use.
func NewTypingCache() *TypingCache {
return &TypingCache{data: make(map[string]*roomData)}
// New returns a new EDUCache initialised for use.
func New() *EDUCache {
return &EDUCache{data: make(map[string]*roomData)}
}
// SetTimeoutCallback sets a callback function that is called right after
// a user is removed from the typing user list due to timeout.
func (t *TypingCache) SetTimeoutCallback(fn TimeoutCallbackFn) {
func (t *EDUCache) SetTimeoutCallback(fn TimeoutCallbackFn) {
t.timeoutCallback = fn
}
// GetTypingUsers returns the list of users typing in a room.
func (t *TypingCache) GetTypingUsers(roomID string) []string {
func (t *EDUCache) GetTypingUsers(roomID string) []string {
users, _ := t.GetTypingUsersIfUpdatedAfter(roomID, 0)
// 0 should work above because the first position used will be 1.
return users
@ -70,7 +70,7 @@ func (t *TypingCache) GetTypingUsers(roomID string) []string {
// GetTypingUsersIfUpdatedAfter returns all users typing in this room with
// updated == true if the typing sync position of the room is after the given
// position. Otherwise, returns an empty slice with updated == false.
func (t *TypingCache) GetTypingUsersIfUpdatedAfter(
func (t *EDUCache) GetTypingUsersIfUpdatedAfter(
roomID string, position int64,
) (users []string, updated bool) {
t.RLock()
@ -93,7 +93,7 @@ func (t *TypingCache) GetTypingUsersIfUpdatedAfter(
// expire is the time when the user typing should time out.
// if expire is nil, defaultTypingTimeout is assumed.
// Returns the latest sync position for typing after update.
func (t *TypingCache) AddTypingUser(
func (t *EDUCache) AddTypingUser(
userID, roomID string, expire *time.Time,
) int64 {
expireTime := getExpireTime(expire)
@ -111,7 +111,7 @@ func (t *TypingCache) AddTypingUser(
// addUser with mutex lock & replace the previous timer.
// Returns the latest typing sync position after update.
func (t *TypingCache) addUser(
func (t *EDUCache) addUser(
userID, roomID string, expiryTimer *time.Timer,
) int64 {
t.Lock()
@ -143,7 +143,7 @@ func (t *TypingCache) addUser(
// RemoveUser with mutex lock & stop the timer.
// Returns the latest sync position for typing after update.
func (t *TypingCache) RemoveUser(userID, roomID string) int64 {
func (t *EDUCache) RemoveUser(userID, roomID string) int64 {
t.Lock()
defer t.Unlock()
@ -166,7 +166,7 @@ func (t *TypingCache) RemoveUser(userID, roomID string) int64 {
return t.latestSyncPosition
}
func (t *TypingCache) GetLatestSyncPosition() int64 {
func (t *EDUCache) GetLatestSyncPosition() int64 {
t.Lock()
defer t.Unlock()
return t.latestSyncPosition

View file

@ -19,10 +19,10 @@ import (
"github.com/matrix-org/dendrite/common/test"
)
func TestTypingCache(t *testing.T) {
tCache := NewTypingCache()
func TestEDUCache(t *testing.T) {
tCache := New()
if tCache == nil {
t.Fatal("NewTypingCache failed")
t.Fatal("New failed")
}
t.Run("AddTypingUser", func(t *testing.T) {
@ -38,7 +38,7 @@ func TestTypingCache(t *testing.T) {
})
}
func testAddTypingUser(t *testing.T, tCache *TypingCache) { // nolint: unparam
func testAddTypingUser(t *testing.T, tCache *EDUCache) { // nolint: unparam
present := time.Now()
tests := []struct {
userID string
@ -58,7 +58,7 @@ func testAddTypingUser(t *testing.T, tCache *TypingCache) { // nolint: unparam
}
}
func testGetTypingUsers(t *testing.T, tCache *TypingCache) {
func testGetTypingUsers(t *testing.T, tCache *EDUCache) {
tests := []struct {
roomID string
wantUsers []string
@ -75,7 +75,7 @@ func testGetTypingUsers(t *testing.T, tCache *TypingCache) {
}
}
func testRemoveUser(t *testing.T, tCache *TypingCache) {
func testRemoveUser(t *testing.T, tCache *EDUCache) {
tests := []struct {
roomID string
userIDs []string

View file

@ -10,27 +10,27 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package typingserver
package eduserver
import (
"net/http"
"github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/typingserver/api"
"github.com/matrix-org/dendrite/typingserver/cache"
"github.com/matrix-org/dendrite/typingserver/input"
"github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/eduserver/input"
)
// SetupTypingServerComponent sets up and registers HTTP handlers for the
// TypingServer component. Returns instances of the various roomserver APIs,
// SetupEDUServerComponent sets up and registers HTTP handlers for the
// EDUServer component. Returns instances of the various roomserver APIs,
// allowing other components running in the same process to hit the query the
// APIs directly instead of having to use HTTP.
func SetupTypingServerComponent(
func SetupEDUServerComponent(
base *basecomponent.BaseDendrite,
typingCache *cache.TypingCache,
) api.TypingServerInputAPI {
inputAPI := &input.TypingServerInputAPI{
Cache: typingCache,
eduCache *cache.EDUCache,
) api.EDUServerInputAPI {
inputAPI := &input.EDUServerInputAPI{
Cache: eduCache,
Producer: base.KafkaProducer,
OutputTypingEventTopic: string(base.Cfg.Kafka.Topics.OutputTypingEvent),
}

View file

@ -19,25 +19,25 @@ import (
"time"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/typingserver/api"
"github.com/matrix-org/dendrite/typingserver/cache"
"github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"gopkg.in/Shopify/sarama.v1"
)
// TypingServerInputAPI implements api.TypingServerInputAPI
type TypingServerInputAPI struct {
// EDUServerInputAPI implements api.EDUServerInputAPI
type EDUServerInputAPI struct {
// Cache to store the current typing members in each room.
Cache *cache.TypingCache
Cache *cache.EDUCache
// The kafka topic to output new typing events to.
OutputTypingEventTopic string
// kafka producer
Producer sarama.SyncProducer
}
// InputTypingEvent implements api.TypingServerInputAPI
func (t *TypingServerInputAPI) InputTypingEvent(
// InputTypingEvent implements api.EDUServerInputAPI
func (t *EDUServerInputAPI) InputTypingEvent(
ctx context.Context,
request *api.InputTypingEventRequest,
response *api.InputTypingEventResponse,
@ -46,7 +46,7 @@ func (t *TypingServerInputAPI) InputTypingEvent(
if ite.Typing {
// user is typing, update our current state of users typing.
expireTime := ite.OriginServerTS.Time().Add(
time.Duration(ite.Timeout) * time.Millisecond,
time.Duration(ite.TimeoutMS) * time.Millisecond,
)
t.Cache.AddTypingUser(ite.UserID, ite.RoomID, &expireTime)
} else {
@ -56,7 +56,7 @@ func (t *TypingServerInputAPI) InputTypingEvent(
return t.sendEvent(ite)
}
func (t *TypingServerInputAPI) sendEvent(ite *api.InputTypingEvent) error {
func (t *EDUServerInputAPI) sendEvent(ite *api.InputTypingEvent) error {
ev := &api.TypingEvent{
Type: gomatrixserverlib.MTyping,
RoomID: ite.RoomID,
@ -69,7 +69,7 @@ func (t *TypingServerInputAPI) sendEvent(ite *api.InputTypingEvent) error {
if ev.Typing {
expireTime := ite.OriginServerTS.Time().Add(
time.Duration(ite.Timeout) * time.Millisecond,
time.Duration(ite.TimeoutMS) * time.Millisecond,
)
ote.ExpireTime = &expireTime
}
@ -89,9 +89,9 @@ func (t *TypingServerInputAPI) sendEvent(ite *api.InputTypingEvent) error {
return err
}
// SetupHTTP adds the TypingServerInputAPI handlers to the http.ServeMux.
func (t *TypingServerInputAPI) SetupHTTP(servMux *http.ServeMux) {
servMux.Handle(api.TypingServerInputTypingEventPath,
// SetupHTTP adds the EDUServerInputAPI handlers to the http.ServeMux.
func (t *EDUServerInputAPI) SetupHTTP(servMux *http.ServeMux) {
servMux.Handle(api.EDUServerInputTypingEventPath,
common.MakeInternalAPI("inputTypingEvents", func(req *http.Request) util.JSONResponse {
var request api.InputTypingEventRequest
var response api.InputTypingEventResponse

View file

@ -41,12 +41,13 @@ func SetupFederationAPIComponent(
queryAPI roomserverAPI.RoomserverQueryAPI,
asAPI appserviceAPI.AppServiceQueryAPI,
federationSenderAPI federationSenderAPI.FederationSenderQueryAPI,
eduProducer *producers.EDUServerProducer,
) {
roomserverProducer := producers.NewRoomserverProducer(inputAPI, queryAPI)
routing.Setup(
base.APIMux, base.Cfg, queryAPI, aliasAPI, asAPI,
roomserverProducer, federationSenderAPI, *keyRing,
roomserverProducer, eduProducer, federationSenderAPI, *keyRing,
federation, accountsDB, deviceDB,
)
}

View file

@ -15,18 +15,17 @@
package routing
import (
"context"
"encoding/json"
"net/http"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
)
// Invite implements /_matrix/federation/v1/invite/{roomID}/{eventID}
// Invite implements /_matrix/federation/v2/invite/{roomID}/{eventID}
func Invite(
httpReq *http.Request,
request *gomatrixserverlib.FederationRequest,
@ -36,24 +35,14 @@ func Invite(
producer *producers.RoomserverProducer,
keys gomatrixserverlib.KeyRing,
) util.JSONResponse {
// Look up the room version for the room.
verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID}
verRes := api.QueryRoomVersionForRoomResponse{}
if err := producer.QueryAPI.QueryRoomVersionForRoom(context.Background(), &verReq, &verRes); err != nil {
inviteReq := gomatrixserverlib.InviteV2Request{}
if err := json.Unmarshal(request.Content(), &inviteReq); err != nil {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.UnsupportedRoomVersion(err.Error()),
}
}
// Decode the event JSON from the request.
event, err := gomatrixserverlib.NewEventFromUntrustedJSON(request.Content(), verRes.RoomVersion)
if err != nil {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.NotJSON("The request body could not be decoded into valid JSON. " + err.Error()),
JSON: jsonerror.NotJSON("The request body could not be decoded into an invite request. " + err.Error()),
}
}
event := inviteReq.Event()
// Check that the room ID is correct.
if event.RoomID() != roomID {
@ -71,14 +60,6 @@ func Invite(
}
}
// Check that the event is from the server sending the request.
if event.Origin() != request.Origin() {
return util.JSONResponse{
Code: http.StatusForbidden,
JSON: jsonerror.Forbidden("The invite must be sent by the server it originated on"),
}
}
// Check that the event is signed by the server sending the request.
redacted := event.Redact()
verifyRequests := []gomatrixserverlib.VerifyJSONRequest{{
@ -104,7 +85,11 @@ func Invite(
)
// Add the invite event to the roomserver.
if err = producer.SendInvite(httpReq.Context(), signedEvent); err != nil {
if err = producer.SendInvite(
httpReq.Context(),
signedEvent.Headered(inviteReq.RoomVersion()),
inviteReq.InviteRoomState(),
); err != nil {
util.GetLogger(httpReq.Context()).WithError(err).Error("producer.SendInvite failed")
return jsonerror.InternalServerError()
}

View file

@ -15,6 +15,7 @@
package routing
import (
"fmt"
"net/http"
"time"
@ -34,6 +35,7 @@ func MakeJoin(
cfg *config.Dendrite,
query api.RoomserverQueryAPI,
roomID, userID string,
remoteVersions []gomatrixserverlib.RoomVersion,
) util.JSONResponse {
verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID}
verRes := api.QueryRoomVersionForRoomResponse{}
@ -44,6 +46,27 @@ func MakeJoin(
}
}
// Check that the room that the remote side is trying to join is actually
// one of the room versions that they listed in their supported ?ver= in
// the make_join URL.
// https://matrix.org/docs/spec/server_server/r0.1.3#get-matrix-federation-v1-make-join-roomid-userid
remoteSupportsVersion := false
for _, v := range remoteVersions {
if v == verRes.RoomVersion {
remoteSupportsVersion = true
break
}
}
// If it isn't, stop trying to join the room.
if !remoteSupportsVersion {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.UnsupportedRoomVersion(
fmt.Sprintf("Joining server does not support room version %s", verRes.RoomVersion),
),
}
}
_, domain, err := gomatrixserverlib.SplitID('@', userID)
if err != nil {
return util.JSONResponse{
@ -140,7 +163,12 @@ func SendJoin(
if event.RoomID() != roomID {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.BadJSON("The room ID in the request path must match the room ID in the join event JSON"),
JSON: jsonerror.BadJSON(
fmt.Sprintf(
"The room ID in the request path (%q) must match the room ID in the join event JSON (%q)",
roomID, event.RoomID(),
),
),
}
}
@ -148,7 +176,12 @@ func SendJoin(
if event.EventID() != eventID {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.BadJSON("The event ID in the request path must match the event ID in the join event JSON"),
JSON: jsonerror.BadJSON(
fmt.Sprintf(
"The event ID in the request path (%q) must match the event ID in the join event JSON (%q)",
eventID, event.EventID(),
),
),
}
}
@ -186,6 +219,7 @@ func SendJoin(
PrevEventIDs: event.PrevEventIDs(),
AuthEventIDs: event.AuthEventIDs(),
RoomID: roomID,
ResolveState: true,
}, &stateAndAuthChainResponse)
if err != nil {
util.GetLogger(httpReq.Context()).WithError(err).Error("query.QueryStateAndAuthChain failed")

View file

@ -48,6 +48,7 @@ func Setup(
aliasAPI roomserverAPI.RoomserverAliasAPI,
asAPI appserviceAPI.AppServiceQueryAPI,
producer *producers.RoomserverProducer,
eduProducer *producers.EDUServerProducer,
federationSenderAPI federationSenderAPI.FederationSenderQueryAPI,
keys gomatrixserverlib.KeyRing,
federation *gomatrixserverlib.FederationClient,
@ -79,12 +80,12 @@ func Setup(
}
return Send(
httpReq, request, gomatrixserverlib.TransactionID(vars["txnID"]),
cfg, query, producer, keys, federation,
cfg, query, producer, eduProducer, keys, federation,
)
},
)).Methods(http.MethodPut, http.MethodOptions)
v1fedmux.Handle("/invite/{roomID}/{eventID}", common.MakeFedAPI(
v2fedmux.Handle("/invite/{roomID}/{eventID}", common.MakeFedAPI(
"federation_invite", cfg.Matrix.ServerName, keys,
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest) util.JSONResponse {
vars, err := common.URLDecodeMapValues(mux.Vars(httpReq))
@ -197,7 +198,7 @@ func Setup(
},
)).Methods(http.MethodGet)
v1fedmux.Handle("/make_join/{roomID}/{userID}", common.MakeFedAPI(
v1fedmux.Handle("/make_join/{roomID}/{eventID}", common.MakeFedAPI(
"federation_make_join", cfg.Matrix.ServerName, keys,
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest) util.JSONResponse {
vars, err := common.URLDecodeMapValues(mux.Vars(httpReq))
@ -205,14 +206,28 @@ func Setup(
return util.ErrorResponse(err)
}
roomID := vars["roomID"]
userID := vars["userID"]
eventID := vars["eventID"]
queryVars := httpReq.URL.Query()
remoteVersions := []gomatrixserverlib.RoomVersion{}
if vers, ok := queryVars["ver"]; ok {
// The remote side supplied a ?=ver so use that to build up the list
// of supported room versions
for _, v := range vers {
remoteVersions = append(remoteVersions, gomatrixserverlib.RoomVersion(v))
}
} else {
// The remote side didn't supply a ?ver= so just assume that they only
// support room version 1, as per the spec
// https://matrix.org/docs/spec/server_server/r0.1.3#get-matrix-federation-v1-make-join-roomid-userid
remoteVersions = append(remoteVersions, gomatrixserverlib.RoomVersionV1)
}
return MakeJoin(
httpReq, request, cfg, query, roomID, userID,
httpReq, request, cfg, query, roomID, eventID, remoteVersions,
)
},
)).Methods(http.MethodGet)
v2fedmux.Handle("/send_join/{roomID}/{userID}", common.MakeFedAPI(
v2fedmux.Handle("/send_join/{roomID}/{eventID}", common.MakeFedAPI(
"federation_send_join", cfg.Matrix.ServerName, keys,
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest) util.JSONResponse {
vars, err := common.URLDecodeMapValues(mux.Vars(httpReq))
@ -220,14 +235,14 @@ func Setup(
return util.ErrorResponse(err)
}
roomID := vars["roomID"]
userID := vars["userID"]
eventID := vars["eventID"]
return SendJoin(
httpReq, request, cfg, query, producer, keys, roomID, userID,
httpReq, request, cfg, query, producer, keys, roomID, eventID,
)
},
)).Methods(http.MethodPut)
v1fedmux.Handle("/make_leave/{roomID}/{userID}", common.MakeFedAPI(
v1fedmux.Handle("/make_leave/{roomID}/{eventID}", common.MakeFedAPI(
"federation_make_leave", cfg.Matrix.ServerName, keys,
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest) util.JSONResponse {
vars, err := common.URLDecodeMapValues(mux.Vars(httpReq))
@ -235,14 +250,14 @@ func Setup(
return util.ErrorResponse(err)
}
roomID := vars["roomID"]
userID := vars["userID"]
eventID := vars["eventID"]
return MakeLeave(
httpReq, request, cfg, query, roomID, userID,
httpReq, request, cfg, query, roomID, eventID,
)
},
)).Methods(http.MethodGet)
v2fedmux.Handle("/send_leave/{roomID}/{userID}", common.MakeFedAPI(
v2fedmux.Handle("/send_leave/{roomID}/{eventID}", common.MakeFedAPI(
"federation_send_leave", cfg.Matrix.ServerName, keys,
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest) util.JSONResponse {
vars, err := common.URLDecodeMapValues(mux.Vars(httpReq))
@ -250,9 +265,9 @@ func Setup(
return util.ErrorResponse(err)
}
roomID := vars["roomID"]
userID := vars["userID"]
eventID := vars["eventID"]
return SendLeave(
httpReq, request, cfg, producer, keys, roomID, userID,
httpReq, request, cfg, producer, keys, roomID, eventID,
)
},
)).Methods(http.MethodPut)

View file

@ -36,6 +36,7 @@ func Send(
cfg *config.Dendrite,
query api.RoomserverQueryAPI,
producer *producers.RoomserverProducer,
eduProducer *producers.EDUServerProducer,
keys gomatrixserverlib.KeyRing,
federation *gomatrixserverlib.FederationClient,
) util.JSONResponse {
@ -43,13 +44,14 @@ func Send(
context: httpReq.Context(),
query: query,
producer: producer,
eduProducer: eduProducer,
keys: keys,
federation: federation,
}
var txnEvents struct {
PDUs []json.RawMessage `json:"pdus"`
EDUs []json.RawMessage `json:"edus"`
EDUs []gomatrixserverlib.EDU `json:"edus"`
}
if err := json.Unmarshal(request.Content(), &txnEvents); err != nil {
@ -59,7 +61,9 @@ func Send(
}
}
// TODO: Really we should have a function to convert FederationRequest to txnReq
t.PDUs = txnEvents.PDUs
t.EDUs = txnEvents.EDUs
t.Origin = request.Origin()
t.TransactionID = txnID
t.Destination = cfg.Matrix.ServerName
@ -83,6 +87,7 @@ type txnReq struct {
context context.Context
query api.RoomserverQueryAPI
producer *producers.RoomserverProducer
eduProducer *producers.EDUServerProducer
keys gomatrixserverlib.KeyRing
federation *gomatrixserverlib.FederationClient
}
@ -152,7 +157,7 @@ func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) {
}
}
// TODO: Process the EDUs.
t.processEDUs(t.EDUs)
util.GetLogger(t.context).Infof("Processed %d PDUs from transaction %q", len(results), t.TransactionID)
return &gomatrixserverlib.RespSend{PDUs: results}, nil
}
@ -163,6 +168,29 @@ type unknownRoomError struct {
func (e unknownRoomError) Error() string { return fmt.Sprintf("unknown room %q", e.roomID) }
func (t *txnReq) processEDUs(edus []gomatrixserverlib.EDU) {
for _, e := range edus {
switch e.Type {
case gomatrixserverlib.MTyping:
// https://matrix.org/docs/spec/server_server/latest#typing-notifications
var typingPayload struct {
RoomID string `json:"room_id"`
UserID string `json:"user_id"`
Typing bool `json:"typing"`
}
if err := json.Unmarshal(e.Content, &typingPayload); err != nil {
util.GetLogger(t.context).WithError(err).Error("Failed to unmarshal typing event")
continue
}
if err := t.eduProducer.SendTyping(t.context, typingPayload.UserID, typingPayload.RoomID, typingPayload.Typing, 30*1000); err != nil {
util.GetLogger(t.context).WithError(err).Error("Failed to send typing event to edu server")
}
default:
util.GetLogger(t.context).WithField("type", e.Type).Warn("unhandled edu")
}
}
}
func (t *txnReq) processEvent(e gomatrixserverlib.Event) error {
prevEventIDs := e.PrevEventIDs()

View file

@ -107,7 +107,6 @@ func getState(
return nil, &util.JSONResponse{Code: http.StatusNotFound, JSON: nil}
}
prevEventIDs := getIDsFromEventRef(event.PrevEvents())
authEventIDs := getIDsFromEventRef(event.AuthEvents())
var response api.QueryStateAndAuthChainResponse
@ -115,7 +114,7 @@ func getState(
ctx,
&api.QueryStateAndAuthChainRequest{
RoomID: roomID,
PrevEventIDs: prevEventIDs,
PrevEventIDs: []string{eventID},
AuthEventIDs: authEventIDs,
},
&response,

View file

@ -2,6 +2,7 @@ package api
import (
"context"
"errors"
"net/http"
commonHTTP "github.com/matrix-org/dendrite/common/http"
@ -58,12 +59,12 @@ const FederationSenderQueryJoinedHostsInRoomPath = "/api/federationsender/queryJ
const FederationSenderQueryJoinedHostServerNamesInRoomPath = "/api/federationsender/queryJoinedHostServerNamesInRoom"
// NewFederationSenderQueryAPIHTTP creates a FederationSenderQueryAPI implemented by talking to a HTTP POST API.
// If httpClient is nil then it uses the http.DefaultClient
func NewFederationSenderQueryAPIHTTP(federationSenderURL string, httpClient *http.Client) FederationSenderQueryAPI {
// If httpClient is nil an error is returned
func NewFederationSenderQueryAPIHTTP(federationSenderURL string, httpClient *http.Client) (FederationSenderQueryAPI, error) {
if httpClient == nil {
httpClient = http.DefaultClient
return nil, errors.New("NewFederationSenderQueryAPIHTTP: httpClient is <nil>")
}
return &httpFederationSenderQueryAPI{federationSenderURL, httpClient}
return &httpFederationSenderQueryAPI{federationSenderURL, httpClient}, nil
}
type httpFederationSenderQueryAPI struct {

View file

@ -18,15 +18,15 @@ import (
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/federationsender/queue"
"github.com/matrix-org/dendrite/federationsender/storage"
"github.com/matrix-org/dendrite/typingserver/api"
"github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus"
"gopkg.in/Shopify/sarama.v1"
)
// OutputTypingEventConsumer consumes events that originate in typing server.
// OutputTypingEventConsumer consumes events that originate in EDU server.
type OutputTypingEventConsumer struct {
consumer *common.ContinualConsumer
db storage.Database
@ -34,7 +34,7 @@ type OutputTypingEventConsumer struct {
ServerName gomatrixserverlib.ServerName
}
// NewOutputTypingEventConsumer creates a new OutputTypingEventConsumer. Call Start() to begin consuming from typing servers.
// NewOutputTypingEventConsumer creates a new OutputTypingEventConsumer. Call Start() to begin consuming from EDU servers.
func NewOutputTypingEventConsumer(
cfg *config.Dendrite,
kafkaConsumer sarama.Consumer,
@ -57,19 +57,30 @@ func NewOutputTypingEventConsumer(
return c
}
// Start consuming from typing servers
// Start consuming from EDU servers
func (t *OutputTypingEventConsumer) Start() error {
return t.consumer.Start()
}
// onMessage is called for OutputTypingEvent received from the typing servers.
// onMessage is called for OutputTypingEvent received from the EDU servers.
// Parses the msg, creates a matrix federation EDU and sends it to joined hosts.
func (t *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
// Extract the typing event from msg.
var ote api.OutputTypingEvent
if err := json.Unmarshal(msg.Value, &ote); err != nil {
// Skip this msg but continue processing messages.
log.WithError(err).Errorf("typingserver output log: message parse failed")
log.WithError(err).Errorf("eduserver output log: message parse failed")
return nil
}
// only send typing events which originated from us
_, typingServerName, err := gomatrixserverlib.SplitID('@', ote.Event.UserID)
if err != nil {
log.WithError(err).WithField("user_id", ote.Event.UserID).Error("Failed to extract domain from typing sender")
return nil
}
if typingServerName != t.ServerName {
log.WithField("other_server", typingServerName).Info("Suppressing typing notif: originated elsewhere")
return nil
}

View file

@ -32,6 +32,7 @@ import (
// OutputRoomEventConsumer consumes events that originated in the room server.
type OutputRoomEventConsumer struct {
cfg *config.Dendrite
roomServerConsumer *common.ContinualConsumer
db storage.Database
queues *queue.OutgoingQueues
@ -52,6 +53,7 @@ func NewOutputRoomEventConsumer(
PartitionStore: store,
}
s := &OutputRoomEventConsumer{
cfg: cfg,
roomServerConsumer: &consumer,
db: store,
queues: queues,
@ -79,27 +81,46 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
log.WithError(err).Errorf("roomserver output log: message parse failure")
return nil
}
if output.Type != api.OutputTypeNewRoomEvent {
log.WithField("type", output.Type).Debug(
"roomserver output log: ignoring unknown output type",
)
return nil
}
switch output.Type {
case api.OutputTypeNewRoomEvent:
ev := &output.NewRoomEvent.Event
log.WithFields(log.Fields{
"event_id": ev.EventID(),
"room_id": ev.RoomID(),
"send_as_server": output.NewRoomEvent.SendAsServer,
}).Info("received event from roomserver")
}).Info("received room event from roomserver")
if err := s.processMessage(*output.NewRoomEvent); err != nil {
// panic rather than continue with an inconsistent database
log.WithFields(log.Fields{
"event": string(ev.JSON()),
log.ErrorKey: err,
"add": output.NewRoomEvent.AddsStateEventIDs,
"del": output.NewRoomEvent.RemovesStateEventIDs,
}).Panicf("roomserver output log: write event failure")
log.ErrorKey: err,
}).Panicf("roomserver output log: write room event failure")
return nil
}
case api.OutputTypeNewInviteEvent:
ev := &output.NewInviteEvent.Event
log.WithFields(log.Fields{
"event_id": ev.EventID(),
"room_id": ev.RoomID(),
"state_key": ev.StateKey(),
}).Info("received invite event from roomserver")
if err := s.processInvite(*output.NewInviteEvent); err != nil {
// panic rather than continue with an inconsistent database
log.WithFields(log.Fields{
"event": string(ev.JSON()),
log.ErrorKey: err,
}).Panicf("roomserver output log: write invite event failure")
return nil
}
default:
log.WithField("type", output.Type).Debug(
"roomserver output log: ignoring unknown output type",
)
return nil
}
@ -159,6 +180,69 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err
)
}
// processInvite handles an invite event for sending over federation.
func (s *OutputRoomEventConsumer) processInvite(oie api.OutputNewInviteEvent) error {
// Don't try to reflect and resend invites that didn't originate from us.
if s.cfg.Matrix.ServerName != oie.Event.Origin() {
return nil
}
// When sending a v2 invite, the inviting server should try and include
// a "stripped down" version of the room state. This is pretty much just
// enough information for the remote side to show something useful to the
// user, like the room name, aliases etc.
strippedState := []gomatrixserverlib.InviteV2StrippedState{}
stateWanted := []string{
gomatrixserverlib.MRoomName, gomatrixserverlib.MRoomCanonicalAlias,
gomatrixserverlib.MRoomAliases, gomatrixserverlib.MRoomJoinRules,
}
// For each of the state keys that we want to try and send, ask the
// roomserver if we have a state event for that room that matches the
// state key.
for _, wanted := range stateWanted {
queryReq := api.QueryLatestEventsAndStateRequest{
RoomID: oie.Event.RoomID(),
StateToFetch: []gomatrixserverlib.StateKeyTuple{
gomatrixserverlib.StateKeyTuple{
EventType: wanted,
StateKey: "",
},
},
}
// If this fails then we just move onto the next event - we don't
// actually know at this point whether the room even has that type
// of state.
queryRes := api.QueryLatestEventsAndStateResponse{}
if err := s.query.QueryLatestEventsAndState(context.TODO(), &queryReq, &queryRes); err != nil {
log.WithFields(log.Fields{
"room_id": queryReq.RoomID,
"event_type": wanted,
}).WithError(err).Info("couldn't find state to strip")
continue
}
// Append the stripped down copy of the state to our list.
for _, headeredEvent := range queryRes.StateEvents {
event := headeredEvent.Unwrap()
strippedState = append(strippedState, gomatrixserverlib.NewInviteV2StrippedState(&event))
log.WithFields(log.Fields{
"room_id": queryReq.RoomID,
"event_type": event.Type(),
}).Info("adding stripped state")
}
}
// Build the invite request with the info we've got.
inviteReq, err := gomatrixserverlib.NewInviteV2Request(&oie.Event, strippedState)
if err != nil {
return fmt.Errorf("gomatrixserverlib.NewInviteV2Request: %w", err)
}
// Send the event.
return s.queues.SendInvite(&inviteReq)
}
// joinedHostsAtEvent works out a list of matrix servers that were joined to
// the room at the event.
// It is important to use the state at the event for sending messages because:

View file

@ -24,6 +24,7 @@ import (
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
log "github.com/sirupsen/logrus"
"go.uber.org/atomic"
)
// destinationQueue is a queue of events for a single destination.
@ -34,14 +35,15 @@ type destinationQueue struct {
client *gomatrixserverlib.FederationClient
origin gomatrixserverlib.ServerName
destination gomatrixserverlib.ServerName
// The running mutex protects running, sentCounter, lastTransactionIDs and
running atomic.Bool
// The running mutex protects sentCounter, lastTransactionIDs and
// pendingEvents, pendingEDUs.
runningMutex sync.Mutex
running bool
sentCounter int
lastTransactionIDs []gomatrixserverlib.TransactionID
pendingEvents []*gomatrixserverlib.HeaderedEvent
pendingEDUs []*gomatrixserverlib.EDU
pendingInvites []*gomatrixserverlib.InviteV2Request
}
// Send event adds the event to the pending queue for the destination.
@ -51,29 +53,43 @@ func (oq *destinationQueue) sendEvent(ev *gomatrixserverlib.HeaderedEvent) {
oq.runningMutex.Lock()
defer oq.runningMutex.Unlock()
oq.pendingEvents = append(oq.pendingEvents, ev)
if !oq.running {
oq.running = true
if !oq.running.Load() {
go oq.backgroundSend()
}
}
// sendEDU adds the EDU event to the pending queue for the destination.
// If the queue is empty then it starts a background goroutine to
// start sending event to that destination.
// start sending events to that destination.
func (oq *destinationQueue) sendEDU(e *gomatrixserverlib.EDU) {
oq.runningMutex.Lock()
defer oq.runningMutex.Unlock()
oq.pendingEDUs = append(oq.pendingEDUs, e)
if !oq.running {
oq.running = true
if !oq.running.Load() {
go oq.backgroundSend()
}
}
// sendInvite adds the invite event to the pending queue for the
// destination. If the queue is empty then it starts a background
// goroutine to start sending events to that destination.
func (oq *destinationQueue) sendInvite(ev *gomatrixserverlib.InviteV2Request) {
oq.runningMutex.Lock()
defer oq.runningMutex.Unlock()
oq.pendingInvites = append(oq.pendingInvites, ev)
if !oq.running.Load() {
go oq.backgroundSend()
}
}
// backgroundSend is the worker goroutine for sending events.
func (oq *destinationQueue) backgroundSend() {
oq.running.Store(true)
defer oq.running.Store(false)
for {
t := oq.next()
if t == nil {
transaction, invites := oq.nextTransaction(), oq.nextInvites()
if !transaction && !invites {
// If the queue is empty then stop processing for this destination.
// TODO: Remove this destination from the queue map.
return
@ -81,29 +97,18 @@ func (oq *destinationQueue) backgroundSend() {
// TODO: handle retries.
// TODO: blacklist uncooperative servers.
util.GetLogger(context.TODO()).Infof("Sending transaction %q containing %d PDUs, %d EDUs", t.TransactionID, len(t.PDUs), len(t.EDUs))
_, err := oq.client.SendTransaction(context.TODO(), *t)
if err != nil {
log.WithFields(log.Fields{
"destination": oq.destination,
log.ErrorKey: err,
}).Info("problem sending transaction")
}
}
}
// next creates a new transaction from the pending event queue
// and flushes the queue.
// Returns nil if the queue was empty.
func (oq *destinationQueue) next() *gomatrixserverlib.Transaction {
// nextTransaction creates a new transaction from the pending event
// queue and sends it. Returns true if a transaction was sent or
// false otherwise.
func (oq *destinationQueue) nextTransaction() bool {
oq.runningMutex.Lock()
defer oq.runningMutex.Unlock()
if len(oq.pendingEvents) == 0 && len(oq.pendingEDUs) == 0 {
oq.running = false
return nil
return false
}
t := gomatrixserverlib.Transaction{
@ -136,5 +141,46 @@ func (oq *destinationQueue) next() *gomatrixserverlib.Transaction {
oq.pendingEDUs = nil
oq.sentCounter += len(t.EDUs)
return &t
util.GetLogger(context.TODO()).Infof("Sending transaction %q containing %d PDUs, %d EDUs", t.TransactionID, len(t.PDUs), len(t.EDUs))
_, err := oq.client.SendTransaction(context.TODO(), t)
if err != nil {
log.WithFields(log.Fields{
"destination": oq.destination,
log.ErrorKey: err,
}).Info("problem sending transaction")
}
return true
}
// nextInvite takes pending invite events from the queue and sends
// them. Returns true if a transaction was sent or false otherwise.
func (oq *destinationQueue) nextInvites() bool {
oq.runningMutex.Lock()
defer oq.runningMutex.Unlock()
if len(oq.pendingInvites) == 0 {
return false
}
for _, inviteReq := range oq.pendingInvites {
ev := inviteReq.Event()
if _, err := oq.client.SendInviteV2(
context.TODO(),
oq.destination,
*inviteReq,
); err != nil {
log.WithFields(log.Fields{
"event_id": ev.EventID(),
"state_key": ev.StateKey(),
"destination": oq.destination,
}).WithError(err).Error("failed to send invite")
}
}
oq.pendingInvites = nil
return true
}

View file

@ -80,6 +80,49 @@ func (oqs *OutgoingQueues) SendEvent(
return nil
}
// SendEvent sends an event to the destinations
func (oqs *OutgoingQueues) SendInvite(
inviteReq *gomatrixserverlib.InviteV2Request,
) error {
ev := inviteReq.Event()
stateKey := ev.StateKey()
if stateKey == nil {
log.WithFields(log.Fields{
"event_id": ev.EventID(),
}).Info("invite had no state key, dropping")
return nil
}
_, destination, err := gomatrixserverlib.SplitID('@', *stateKey)
if err != nil {
log.WithFields(log.Fields{
"event_id": ev.EventID(),
"state_key": stateKey,
}).Info("failed to split destination from state key")
return nil
}
log.WithFields(log.Fields{
"event_id": ev.EventID(),
}).Info("Sending invite")
oqs.queuesMutex.Lock()
defer oqs.queuesMutex.Unlock()
oq := oqs.queues[destination]
if oq == nil {
oq = &destinationQueue{
origin: oqs.origin,
destination: destination,
client: oqs.client,
}
oqs.queues[destination] = oq
}
oq.sendInvite(inviteReq)
return nil
}
// SendEDU sends an EDU event to the destinations
func (oqs *OutgoingQueues) SendEDU(
e *gomatrixserverlib.EDU, origin gomatrixserverlib.ServerName,

8
go.mod
View file

@ -7,9 +7,9 @@ require (
github.com/libp2p/go-libp2p-core v0.5.0
github.com/matrix-org/dugong v0.0.0-20171220115018-ea0a4690a0d5
github.com/matrix-org/go-http-js-libp2p v0.0.0-20200318135427-31631a9ef51f
github.com/matrix-org/go-sqlite3-js v0.0.0-20200304164012-aa524245b658
github.com/matrix-org/go-sqlite3-js v0.0.0-20200325174927-327088cdef10
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26
github.com/matrix-org/gomatrixserverlib v0.0.0-20200327155501-33fb4c7049dc
github.com/matrix-org/gomatrixserverlib v0.0.0-20200409140603-8b9a51fe9b89
github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7
github.com/mattn/go-sqlite3 v2.0.2+incompatible
@ -23,9 +23,9 @@ require (
github.com/tidwall/pretty v1.0.1 // indirect
github.com/uber/jaeger-client-go v2.22.1+incompatible
github.com/uber/jaeger-lib v2.2.0+incompatible
go.uber.org/atomic v1.6.0 // indirect
go.uber.org/atomic v1.6.0
golang.org/x/crypto v0.0.0-20200115085410-6d4e4cb37c7d
golang.org/x/net v0.0.0-20190909003024-a7b16738d86b // indirect
golang.org/x/tools v0.0.0-20200402223321-bcf690261a44 // indirect
gopkg.in/Shopify/sarama.v1 v1.20.1
gopkg.in/h2non/bimg.v1 v1.0.18
gopkg.in/yaml.v2 v2.2.5

17
go.sum
View file

@ -122,14 +122,16 @@ github.com/matrix-org/go-http-js-libp2p v0.0.0-20200318135427-31631a9ef51f h1:5T
github.com/matrix-org/go-http-js-libp2p v0.0.0-20200318135427-31631a9ef51f/go.mod h1:qK3LUW7RCLhFM7gC3pabj3EXT9A1DsCK33MHstUhhbk=
github.com/matrix-org/go-sqlite3-js v0.0.0-20200304164012-aa524245b658 h1:UlhTKClOgWnSB25Rv+BS/Vc1mRinjNUErfyGEVOBP04=
github.com/matrix-org/go-sqlite3-js v0.0.0-20200304164012-aa524245b658/go.mod h1:e+cg2q7C7yE5QnAXgzo512tgFh1RbQLC0+jozuegKgo=
github.com/matrix-org/go-sqlite3-js v0.0.0-20200325174927-327088cdef10 h1:SnhC7/o87ueVwEWI3mUYtrs+s8VnYq3KZtpWsFQOLFE=
github.com/matrix-org/go-sqlite3-js v0.0.0-20200325174927-327088cdef10/go.mod h1:e+cg2q7C7yE5QnAXgzo512tgFh1RbQLC0+jozuegKgo=
github.com/matrix-org/gomatrix v0.0.0-20190130130140-385f072fe9af h1:piaIBNQGIHnni27xRB7VKkEwoWCgAmeuYf8pxAyG0bI=
github.com/matrix-org/gomatrix v0.0.0-20190130130140-385f072fe9af/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0=
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 h1:Hr3zjRsq2bhrnp3Ky1qgx/fzCtCALOoGYylh2tpS9K4=
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200124100636-0c2ec91d1df5 h1:kmRjpmFOenVpOaV/DRlo9p6z/IbOKlUC+hhKsAAh8Qg=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200124100636-0c2ec91d1df5/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200327155501-33fb4c7049dc h1:qrRu4/AlulnldLiyGpYYm+ELIkrP51XCRlA3txWpN30=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200327155501-33fb4c7049dc/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200409140603-8b9a51fe9b89 h1:YAlUJK/Ty2ZrP/DL41CiR0Cp3pteshnyIS420KVs220=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200409140603-8b9a51fe9b89/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI=
github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1 h1:osLoFdOy+ChQqVUn2PeTDETFftVkl4w9t/OW18g3lnk=
github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1/go.mod h1:cXoYQIENbdWIQHt1SyCo6Bl3C3raHwJ0wgVrXHSqf+A=
github.com/matrix-org/util v0.0.0-20171127121716-2e2df66af2f5 h1:W7l5CP4V7wPyPb4tYE11dbmeAOwtFQBTW0rf4OonOS8=
@ -251,6 +253,7 @@ github.com/uber/jaeger-lib v1.5.0/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/Aaua
github.com/uber/jaeger-lib v2.2.0+incompatible h1:MxZXOiR2JuoANZ3J6DE/U0kSFv/eJ/GfSYVCjK7dyaw=
github.com/uber/jaeger-lib v2.2.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
github.com/x-cray/logrus-prefixed-formatter v0.5.2/go.mod h1:2duySbKsL6M18s5GU7VPsoEPHyzalCE06qoARUCeBBE=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.uber.org/atomic v1.3.0 h1:vs7fgriifsPbGdK3bNuMWapNn3qnZhCRXc19NRdq010=
go.uber.org/atomic v1.3.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
@ -274,6 +277,8 @@ golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvx
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mod v0.2.0 h1:KU7oHjnv3XNWfa5COkzUifxZmxp1TyI7ImMXqFxLwvQ=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@ -286,8 +291,8 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190909003024-a7b16738d86b h1:XfVGCX+0T4WOStkaOsJRllbsiImhB2jgVBGc9L0lPGc=
golang.org/x/net v0.0.0-20190909003024-a7b16738d86b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b h1:0mm1VjtFUOIlE1SbDlwjYaDxZVDP2S5ou6y0gSgXHu8=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@ -322,7 +327,11 @@ golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c h1:IGkKhmfzcztjm6gYkykvu/NiS8kaqbCWAEWWAyf8J5U=
golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200402223321-bcf690261a44 h1:bMm0eoDiGkM5VfIyKjxDvoflW5GLp7+VCo+60n8F+TE=
golang.org/x/tools v0.0.0-20200402223321-bcf690261a44/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=

View file

@ -16,6 +16,7 @@ package api
import (
"context"
"errors"
"net/http"
commonHTTP "github.com/matrix-org/dendrite/common/http"
@ -139,12 +140,12 @@ const RoomserverGetCreatorIDForAliasPath = "/api/roomserver/GetCreatorIDForAlias
const RoomserverRemoveRoomAliasPath = "/api/roomserver/removeRoomAlias"
// NewRoomserverAliasAPIHTTP creates a RoomserverAliasAPI implemented by talking to a HTTP POST API.
// If httpClient is nil then it uses the http.DefaultClient
func NewRoomserverAliasAPIHTTP(roomserverURL string, httpClient *http.Client) RoomserverAliasAPI {
// If httpClient is nil an error is returned
func NewRoomserverAliasAPIHTTP(roomserverURL string, httpClient *http.Client) (RoomserverAliasAPI, error) {
if httpClient == nil {
httpClient = http.DefaultClient
return nil, errors.New("NewRoomserverAliasAPIHTTP: httpClient is <nil>")
}
return &httpRoomserverAliasAPI{roomserverURL, httpClient}
return &httpRoomserverAliasAPI{roomserverURL, httpClient}, nil
}
type httpRoomserverAliasAPI struct {

View file

@ -17,6 +17,7 @@ package api
import (
"context"
"errors"
"net/http"
commonHTTP "github.com/matrix-org/dendrite/common/http"
@ -85,7 +86,9 @@ type TransactionID struct {
// the usual context a matrix room event would have. We usually do not have
// access to the events needed to check the event auth rules for the invite.
type InputInviteEvent struct {
RoomVersion gomatrixserverlib.RoomVersion `json:"room_version"`
Event gomatrixserverlib.HeaderedEvent `json:"event"`
InviteRoomState []gomatrixserverlib.InviteV2StrippedState `json:"invite_room_state"`
}
// InputRoomEventsRequest is a request to InputRoomEvents
@ -112,12 +115,12 @@ type RoomserverInputAPI interface {
const RoomserverInputRoomEventsPath = "/api/roomserver/inputRoomEvents"
// NewRoomserverInputAPIHTTP creates a RoomserverInputAPI implemented by talking to a HTTP POST API.
// If httpClient is nil then it uses the http.DefaultClient
func NewRoomserverInputAPIHTTP(roomserverURL string, httpClient *http.Client) RoomserverInputAPI {
// If httpClient is nil an error is returned
func NewRoomserverInputAPIHTTP(roomserverURL string, httpClient *http.Client) (RoomserverInputAPI, error) {
if httpClient == nil {
httpClient = http.DefaultClient
return nil, errors.New("NewRoomserverInputAPIHTTP: httpClient is <nil>")
}
return &httpRoomserverInputAPI{roomserverURL, httpClient}
return &httpRoomserverInputAPI{roomserverURL, httpClient}, nil
}
type httpRoomserverInputAPI struct {

View file

@ -116,6 +116,8 @@ type OutputNewRoomEvent struct {
// Invite events can be received outside of an existing room so have to be
// tracked separately from the room events themselves.
type OutputNewInviteEvent struct {
// The room version of the invited room.
RoomVersion gomatrixserverlib.RoomVersion `json:"room_version"`
// The "m.room.member" invite event.
Event gomatrixserverlib.HeaderedEvent `json:"event"`
}

View file

@ -18,6 +18,7 @@ package api
import (
"context"
"errors"
"net/http"
commonHTTP "github.com/matrix-org/dendrite/common/http"
@ -202,6 +203,9 @@ type QueryStateAndAuthChainRequest struct {
PrevEventIDs []string `json:"prev_event_ids"`
// The list of auth events for the event. Used to calculate the auth chain
AuthEventIDs []string `json:"auth_event_ids"`
// Should state resolution be ran on the result events?
// TODO: check call sites and remove if we always want to do state res
ResolveState bool `json:"resolve_state"`
}
// QueryStateAndAuthChainResponse is a response to QueryStateAndAuthChain
@ -406,12 +410,12 @@ const RoomserverQueryRoomVersionCapabilitiesPath = "/api/roomserver/queryRoomVer
const RoomserverQueryRoomVersionForRoomPath = "/api/roomserver/queryRoomVersionForRoom"
// NewRoomserverQueryAPIHTTP creates a RoomserverQueryAPI implemented by talking to a HTTP POST API.
// If httpClient is nil then it uses the http.DefaultClient
func NewRoomserverQueryAPIHTTP(roomserverURL string, httpClient *http.Client) RoomserverQueryAPI {
// If httpClient is nil an error is returned
func NewRoomserverQueryAPIHTTP(roomserverURL string, httpClient *http.Client) (RoomserverQueryAPI, error) {
if httpClient == nil {
httpClient = http.DefaultClient
return nil, errors.New("NewRoomserverQueryAPIHTTP: httpClient is <nil>")
}
return &httpRoomserverQueryAPI{roomserverURL, httpClient}
return &httpRoomserverQueryAPI{roomserverURL, httpClient}, nil
}
type httpRoomserverQueryAPI struct {

View file

@ -26,6 +26,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/state/database"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus"
)
// A RoomEventDatabase has the storage APIs needed to store a room event.
@ -64,6 +65,7 @@ type RoomEventDatabase interface {
// Build a membership updater for the target user in a room.
MembershipUpdater(
ctx context.Context, roomID, targerUserID string,
roomVersion gomatrixserverlib.RoomVersion,
) (types.MembershipUpdater, error)
// Look up event ID by transaction's info.
// This is used to determine if the room event is processed/processing already.
@ -193,7 +195,14 @@ func processInviteEvent(
roomID := input.Event.RoomID()
targetUserID := *input.Event.StateKey()
updater, err := db.MembershipUpdater(ctx, roomID, targetUserID)
log.WithFields(log.Fields{
"event_id": input.Event.EventID(),
"room_id": roomID,
"room_version": input.RoomVersion,
"target_user_id": targetUserID,
}).Info("processing invite event")
updater, err := db.MembershipUpdater(ctx, roomID, targetUserID, input.RoomVersion)
if err != nil {
return err
}
@ -237,7 +246,12 @@ func processInviteEvent(
}
event := input.Event.Unwrap()
outputUpdates, err := updateToInviteMembership(updater, &event, nil)
if err = event.SetUnsignedField("invite_room_state", input.InviteRoomState); err != nil {
return err
}
outputUpdates, err := updateToInviteMembership(updater, &event, nil, input.Event.RoomVersion)
if err != nil {
return err
}

View file

@ -112,7 +112,7 @@ func updateMembership(
switch newMembership {
case gomatrixserverlib.Invite:
return updateToInviteMembership(mu, add, updates)
return updateToInviteMembership(mu, add, updates, updater.RoomVersion())
case gomatrixserverlib.Join:
return updateToJoinMembership(mu, add, updates)
case gomatrixserverlib.Leave, gomatrixserverlib.Ban:
@ -126,6 +126,7 @@ func updateMembership(
func updateToInviteMembership(
mu types.MembershipUpdater, add *gomatrixserverlib.Event, updates []api.OutputEvent,
roomVersion gomatrixserverlib.RoomVersion,
) ([]api.OutputEvent, error) {
// We may have already sent the invite to the user, either because we are
// reprocessing this event, or because the we received this invite from a
@ -136,7 +137,6 @@ func updateToInviteMembership(
return nil, err
}
if needsSending {
roomVersion := gomatrixserverlib.RoomVersionV1
// We notify the consumers using a special event even though we will
// notify them about the change in current state as part of the normal
// room event stream. This ensures that the consumers only have to
@ -144,6 +144,7 @@ func updateToInviteMembership(
// is invited, rather than having to combine multiple streams themselves.
onie := api.OutputNewInviteEvent{
Event: (*add).Headered(roomVersion),
RoomVersion: roomVersion,
}
updates = append(updates, api.OutputEvent{
Type: api.OutputTypeNewInviteEvent,

View file

@ -132,7 +132,7 @@ func (r *RoomserverQueryAPI) QueryLatestEventsAndState(
return err
}
// Look up the currrent state for the requested tuples.
// Look up the current state for the requested tuples.
stateEntries, err := roomState.LoadStateAtSnapshotForStringTuples(
ctx, currentStateSnapshotNID, request.StateToFetch,
)
@ -736,6 +736,14 @@ func (r *RoomserverQueryAPI) QueryStateAndAuthChain(
return err
}
if request.ResolveState {
if stateEvents, err = state.ResolveConflictsAdhoc(
roomVersion, stateEvents, authEvents,
); err != nil {
return err
}
}
for _, event := range stateEvents {
response.StateEvents = append(response.StateEvents, event.Headered(roomVersion))
}

View file

@ -18,7 +18,6 @@ package state
import (
"context"
"errors"
"fmt"
"sort"
"time"
@ -672,7 +671,7 @@ func (v StateResolution) calculateStateAfterManyEvents(
return
}
algorithm = "full_state_with_conflicts"
state = resolved
state = resolved[:util.SortAndUnique(stateEntrySorter(resolved))]
} else {
algorithm = "full_state_no_conflicts"
// 6) There weren't any conflicts
@ -681,6 +680,83 @@ func (v StateResolution) calculateStateAfterManyEvents(
return
}
// ResolveConflictsAdhoc is a helper function to assist the query API in
// performing state resolution when requested. This is a different code
// path to the rest of state.go because this assumes you already have
// gomatrixserverlib.Event objects and not just a bunch of NIDs like
// elsewhere in the state resolution.
// TODO: Some of this can possibly be deduplicated
func ResolveConflictsAdhoc(
version gomatrixserverlib.RoomVersion,
events []gomatrixserverlib.Event,
authEvents []gomatrixserverlib.Event,
) ([]gomatrixserverlib.Event, error) {
type stateKeyTuple struct {
Type string
StateKey string
}
// Prepare our data structures.
eventMap := make(map[stateKeyTuple][]gomatrixserverlib.Event)
var conflicted, notConflicted, resolved []gomatrixserverlib.Event
// Run through all of the events that we were given and sort them
// into a map, sorted by (event_type, state_key) tuple. This means
// that we can easily spot events that are "conflicted", e.g.
// there are duplicate values for the same tuple key.
for _, event := range events {
if event.StateKey() == nil {
// Ignore events that are not state events.
continue
}
// Append the events if there is already a conflicted list for
// this tuple key, create it if not.
tuple := stateKeyTuple{event.Type(), *event.StateKey()}
if _, ok := eventMap[tuple]; ok {
eventMap[tuple] = append(eventMap[tuple], event)
} else {
eventMap[tuple] = []gomatrixserverlib.Event{event}
}
}
// Split out the events in the map into conflicted and unconflicted
// buckets. The conflicted events will be ran through state res,
// whereas unconfliced events will always going to appear in the
// final resolved state.
for _, list := range eventMap {
if len(list) > 1 {
conflicted = append(conflicted, list...)
} else {
notConflicted = append(notConflicted, list...)
}
}
// Work out which state resolution algorithm we want to run for
// the room version.
stateResAlgo, err := version.StateResAlgorithm()
if err != nil {
return nil, err
}
switch stateResAlgo {
case gomatrixserverlib.StateResV1:
// Currently state res v1 doesn't handle unconflicted events
// for us, like state res v2 does, so we will need to add the
// unconflicted events into the state ourselves.
// TODO: Fix state res v1 so this is handled for the caller.
resolved = gomatrixserverlib.ResolveStateConflicts(conflicted, authEvents)
resolved = append(resolved, notConflicted...)
case gomatrixserverlib.StateResV2:
// TODO: auth difference here?
resolved = gomatrixserverlib.ResolveStateConflictsV2(conflicted, notConflicted, authEvents, authEvents)
default:
return nil, fmt.Errorf("unsupported state resolution algorithm %v", stateResAlgo)
}
// Return the final resolved state events, including both the
// resolved set of conflicted events, and the unconflicted events.
return resolved, nil
}
func (v StateResolution) resolveConflicts(
ctx context.Context, version gomatrixserverlib.RoomVersion,
notConflicted, conflicted []types.StateEntry,
@ -695,7 +771,7 @@ func (v StateResolution) resolveConflicts(
case gomatrixserverlib.StateResV2:
return v.resolveConflictsV2(ctx, notConflicted, conflicted)
}
return nil, errors.New("unsupported state resolution algorithm")
return nil, fmt.Errorf("unsupported state resolution algorithm %v", stateResAlgo)
}
// resolveConflicts resolves a list of conflicted state entries. It takes two lists.

View file

@ -41,7 +41,7 @@ type Database interface {
GetAliasesForRoomID(ctx context.Context, roomID string) ([]string, error)
GetCreatorIDForAlias(ctx context.Context, alias string) (string, error)
RemoveRoomAlias(ctx context.Context, alias string) error
MembershipUpdater(ctx context.Context, roomID, targetUserID string) (types.MembershipUpdater, error)
MembershipUpdater(ctx context.Context, roomID, targetUserID string, roomVersion gomatrixserverlib.RoomVersion) (types.MembershipUpdater, error)
GetMembership(ctx context.Context, roomNID types.RoomNID, requestSenderUserID string) (membershipEventNID types.EventNID, stillInRoom bool, err error)
GetMembershipEventNIDsForRoom(ctx context.Context, roomNID types.RoomNID, joinOnly bool) ([]types.EventNID, error)
EventsFromIDs(ctx context.Context, eventIDs []string) ([]types.Event, error)

View file

@ -393,6 +393,12 @@ type roomRecentEventsUpdater struct {
currentStateSnapshotNID types.StateSnapshotNID
}
// RoomVersion implements types.RoomRecentEventsUpdater
func (u *roomRecentEventsUpdater) RoomVersion() (version gomatrixserverlib.RoomVersion) {
version, _ = u.d.GetRoomVersionForRoomNID(u.ctx, u.roomNID)
return
}
// LatestEvents implements types.RoomRecentEventsUpdater
func (u *roomRecentEventsUpdater) LatestEvents() []types.StateAtEventAndReference {
return u.latestEvents
@ -534,6 +540,7 @@ func (d *Database) StateEntriesForTuples(
// MembershipUpdater implements input.RoomEventDatabase
func (d *Database) MembershipUpdater(
ctx context.Context, roomID, targetUserID string,
roomVersion gomatrixserverlib.RoomVersion,
) (types.MembershipUpdater, error) {
txn, err := d.db.Begin()
if err != nil {
@ -546,8 +553,7 @@ func (d *Database) MembershipUpdater(
}
}()
// TODO: Room version here
roomNID, err := d.assignRoomNID(ctx, txn, roomID, "1")
roomNID, err := d.assignRoomNID(ctx, txn, roomID, roomVersion)
if err != nil {
return nil, err
}

View file

@ -486,6 +486,12 @@ type roomRecentEventsUpdater struct {
currentStateSnapshotNID types.StateSnapshotNID
}
// RoomVersion implements types.RoomRecentEventsUpdater
func (u *roomRecentEventsUpdater) RoomVersion() (version gomatrixserverlib.RoomVersion) {
version, _ = u.d.GetRoomVersionForRoomNID(u.ctx, u.roomNID)
return
}
// LatestEvents implements types.RoomRecentEventsUpdater
func (u *roomRecentEventsUpdater) LatestEvents() []types.StateAtEventAndReference {
return u.latestEvents
@ -657,6 +663,7 @@ func (d *Database) StateEntriesForTuples(
// MembershipUpdater implements input.RoomEventDatabase
func (d *Database) MembershipUpdater(
ctx context.Context, roomID, targetUserID string,
roomVersion gomatrixserverlib.RoomVersion,
) (updater types.MembershipUpdater, err error) {
var txn *sql.Tx
txn, err = d.db.Begin()
@ -682,8 +689,7 @@ func (d *Database) MembershipUpdater(
}
}()
// TODO: Room version here
roomNID, err := d.assignRoomNID(ctx, txn, roomID, "1")
roomNID, err := d.assignRoomNID(ctx, txn, roomID, roomVersion)
if err != nil {
return nil, err
}

View file

@ -140,6 +140,8 @@ type StateEntryList struct {
// (On postgresql this wraps a database transaction that holds a "FOR UPDATE"
// lock on the row in the rooms table holding the latest events for the room.)
type RoomRecentEventsUpdater interface {
// The room version of the room.
RoomVersion() gomatrixserverlib.RoomVersion
// The latest event IDs and state in the room.
LatestEvents() []StateAtEventAndReference
// The event ID of the latest event written to the output log in the room.

View file

@ -43,12 +43,12 @@ var roomVersions = map[gomatrixserverlib.RoomVersion]RoomVersionDescription{
Stable: true,
},
gomatrixserverlib.RoomVersionV3: RoomVersionDescription{
Supported: false,
Stable: false,
Supported: true,
Stable: true,
},
gomatrixserverlib.RoomVersionV4: RoomVersionDescription{
Supported: false,
Stable: false,
Supported: true,
Stable: true,
},
gomatrixserverlib.RoomVersionV5: RoomVersionDescription{
Supported: false,

View file

@ -60,7 +60,7 @@ while read -r test_name; do
# Ignore empty lines
[ "${test_name}" = "" ] && continue
grep "${test_name}" "${whitelist_file}" > /dev/null 2>&1
grep "^${test_name}" "${whitelist_file}" > /dev/null 2>&1
if [ "$?" != "0" ]; then
# Check if this test name is blacklisted
if printf '%s\n' "${blacklisted_tests[@]}" | grep -q -P "^${test_name}$"; then
@ -80,8 +80,8 @@ done <<< "${passed_but_expected_fail}"
# TODO: Check that the same test doesn't appear twice in the whitelist|blacklist
# Trim test output strings
tests_to_add=$(echo -e $tests_to_add | xargs)
already_in_whitelist=$(echo -e $already_in_whitelist | xargs)
tests_to_add=$(echo -e $tests_to_add | xargs -d '\n')
already_in_whitelist=$(echo -e $already_in_whitelist | xargs -d '\n')
# Format output with markdown for buildkite annotation rendering purposes
if [ -n "${tests_to_add}" ] && [ -n "${already_in_whitelist}" ]; then

View file

@ -19,15 +19,15 @@ import (
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/sync"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/dendrite/typingserver/api"
log "github.com/sirupsen/logrus"
sarama "gopkg.in/Shopify/sarama.v1"
)
// OutputTypingEventConsumer consumes events that originated in the typing server.
// OutputTypingEventConsumer consumes events that originated in the EDU server.
type OutputTypingEventConsumer struct {
typingConsumer *common.ContinualConsumer
db storage.Database
@ -35,7 +35,7 @@ type OutputTypingEventConsumer struct {
}
// NewOutputTypingEventConsumer creates a new OutputTypingEventConsumer.
// Call Start() to begin consuming from the typing server.
// Call Start() to begin consuming from the EDU server.
func NewOutputTypingEventConsumer(
cfg *config.Dendrite,
kafkaConsumer sarama.Consumer,
@ -60,7 +60,7 @@ func NewOutputTypingEventConsumer(
return s
}
// Start consuming from typing api
// Start consuming from EDU api
func (s *OutputTypingEventConsumer) Start() error {
s.db.SetTypingTimeoutCallback(func(userID, roomID string, latestSyncPosition int64) {
s.notifier.OnNewEvent(
@ -78,7 +78,7 @@ func (s *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error
var output api.OutputTypingEvent
if err := json.Unmarshal(msg.Value, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("typing server output log: message parse failure")
log.WithError(err).Errorf("EDU server output log: message parse failure")
return nil
}
@ -86,7 +86,7 @@ func (s *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error
"room_id": output.Event.RoomID,
"user_id": output.Event.UserID,
"typing": output.Event.Typing,
}).Debug("received data from typing server")
}).Debug("received data from EDU server")
var typingPos types.StreamPosition
typingEvent := output.Event

View file

@ -20,9 +20,9 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/dendrite/typingserver/cache"
"github.com/matrix-org/gomatrixserverlib"
)

View file

@ -30,8 +30,8 @@ import (
// Import the postgres database driver.
_ "github.com/lib/pq"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/dendrite/typingserver/cache"
"github.com/matrix-org/gomatrixserverlib"
)
@ -53,7 +53,7 @@ type SyncServerDatasource struct {
events outputRoomEventsStatements
roomstate currentRoomStateStatements
invites inviteEventsStatements
typingCache *cache.TypingCache
eduCache *cache.EDUCache
topology outputRoomEventsTopologyStatements
backwardExtremities backwardExtremitiesStatements
}
@ -86,7 +86,7 @@ func NewSyncServerDatasource(dbDataSourceName string) (*SyncServerDatasource, er
if err := d.backwardExtremities.prepare(d.db); err != nil {
return nil, err
}
d.typingCache = cache.NewTypingCache()
d.eduCache = cache.New()
return &d, nil
}
@ -395,7 +395,7 @@ func (d *SyncServerDatasource) syncPositionTx(
maxEventID = maxInviteID
}
sp.PDUPosition = types.StreamPosition(maxEventID)
sp.EDUTypingPosition = types.StreamPosition(d.typingCache.GetLatestSyncPosition())
sp.EDUTypingPosition = types.StreamPosition(d.eduCache.GetLatestSyncPosition())
return
}
@ -468,7 +468,7 @@ func (d *SyncServerDatasource) addTypingDeltaToResponse(
var ok bool
var err error
for _, roomID := range joinedRoomIDs {
if typingUsers, updated := d.typingCache.GetTypingUsersIfUpdatedAfter(
if typingUsers, updated := d.eduCache.GetTypingUsersIfUpdatedAfter(
roomID, int64(since.EDUTypingPosition),
); updated {
ev := gomatrixserverlib.ClientEvent{
@ -719,7 +719,7 @@ func (d *SyncServerDatasource) RetireInviteEvent(
}
func (d *SyncServerDatasource) SetTypingTimeoutCallback(fn cache.TimeoutCallbackFn) {
d.typingCache.SetTimeoutCallback(fn)
d.eduCache.SetTimeoutCallback(fn)
}
// AddTypingUser adds a typing user to the typing cache.
@ -727,7 +727,7 @@ func (d *SyncServerDatasource) SetTypingTimeoutCallback(fn cache.TimeoutCallback
func (d *SyncServerDatasource) AddTypingUser(
userID, roomID string, expireTime *time.Time,
) types.StreamPosition {
return types.StreamPosition(d.typingCache.AddTypingUser(userID, roomID, expireTime))
return types.StreamPosition(d.eduCache.AddTypingUser(userID, roomID, expireTime))
}
// RemoveTypingUser removes a typing user from the typing cache.
@ -735,7 +735,7 @@ func (d *SyncServerDatasource) AddTypingUser(
func (d *SyncServerDatasource) RemoveTypingUser(
userID, roomID string,
) types.StreamPosition {
return types.StreamPosition(d.typingCache.RemoveUser(userID, roomID))
return types.StreamPosition(d.eduCache.RemoveUser(userID, roomID))
}
func (d *SyncServerDatasource) addInvitesToResponse(

View file

@ -33,8 +33,8 @@ import (
_ "github.com/mattn/go-sqlite3"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/dendrite/typingserver/cache"
"github.com/matrix-org/gomatrixserverlib"
)
@ -57,7 +57,7 @@ type SyncServerDatasource struct {
events outputRoomEventsStatements
roomstate currentRoomStateStatements
invites inviteEventsStatements
typingCache *cache.TypingCache
eduCache *cache.EDUCache
topology outputRoomEventsTopologyStatements
backwardExtremities backwardExtremitiesStatements
}
@ -84,7 +84,7 @@ func NewSyncServerDatasource(dataSourceName string) (*SyncServerDatasource, erro
if err = d.prepare(); err != nil {
return nil, err
}
d.typingCache = cache.NewTypingCache()
d.eduCache = cache.New()
return &d, nil
}
@ -429,7 +429,7 @@ func (d *SyncServerDatasource) syncPositionTx(
maxEventID = maxInviteID
}
sp.PDUPosition = types.StreamPosition(maxEventID)
sp.EDUTypingPosition = types.StreamPosition(d.typingCache.GetLatestSyncPosition())
sp.EDUTypingPosition = types.StreamPosition(d.eduCache.GetLatestSyncPosition())
return
}
@ -502,7 +502,7 @@ func (d *SyncServerDatasource) addTypingDeltaToResponse(
var ok bool
var err error
for _, roomID := range joinedRoomIDs {
if typingUsers, updated := d.typingCache.GetTypingUsersIfUpdatedAfter(
if typingUsers, updated := d.eduCache.GetTypingUsersIfUpdatedAfter(
roomID, int64(since.EDUTypingPosition),
); updated {
ev := gomatrixserverlib.ClientEvent{
@ -766,7 +766,7 @@ func (d *SyncServerDatasource) RetireInviteEvent(
}
func (d *SyncServerDatasource) SetTypingTimeoutCallback(fn cache.TimeoutCallbackFn) {
d.typingCache.SetTimeoutCallback(fn)
d.eduCache.SetTimeoutCallback(fn)
}
// AddTypingUser adds a typing user to the typing cache.
@ -774,7 +774,7 @@ func (d *SyncServerDatasource) SetTypingTimeoutCallback(fn cache.TimeoutCallback
func (d *SyncServerDatasource) AddTypingUser(
userID, roomID string, expireTime *time.Time,
) types.StreamPosition {
return types.StreamPosition(d.typingCache.AddTypingUser(userID, roomID, expireTime))
return types.StreamPosition(d.eduCache.AddTypingUser(userID, roomID, expireTime))
}
// RemoveTypingUser removes a typing user from the typing cache.
@ -782,7 +782,7 @@ func (d *SyncServerDatasource) AddTypingUser(
func (d *SyncServerDatasource) RemoveTypingUser(
userID, roomID string,
) types.StreamPosition {
return types.StreamPosition(d.typingCache.RemoveUser(userID, roomID))
return types.StreamPosition(d.eduCache.RemoveUser(userID, roomID))
}
func (d *SyncServerDatasource) addInvitesToResponse(

View file

@ -32,3 +32,6 @@ Alias creators can delete canonical alias with no ops
# Blacklisted because we need to implement v2 invite endpoints for room versions
# to be supported (currently fails with M_UNSUPPORTED_ROOM_VERSION)
Inbound federation rejects invites which are not signed by the sender
# Blacklisted because we don't support ignores yet
Ignore invite in incremental sync

View file

@ -215,7 +215,6 @@ Guest users can sync from default guest_access rooms if joined
Real non-joined users cannot room initalSync for non-world_readable rooms
Push rules come down in an initial /sync
Regular users can add and delete aliases in the default room configuration
Regular users can add and delete aliases when m.room.aliases is restricted
GET /r0/capabilities is not public
GET /joined_rooms lists newly-created room
/joined_rooms returns only joined rooms
@ -225,9 +224,24 @@ Remote user can backfill in a room with version 1
POST /createRoom creates a room with the given version
POST /createRoom rejects attempts to create rooms with numeric versions
POST /createRoom rejects attempts to create rooms with unknown versions
Regular users can add and delete aliases when m.room.aliases is restricted
User can create and send/receive messages in a room with version 2
local user can join room with version 2
remote user can join room with version 2
User can invite local user to room with version 2
Remote user can backfill in a room with version 2
Inbound federation accepts attempts to join v2 rooms from servers with support
Outbound federation can send invites via v2 API
User can create and send/receive messages in a room with version 3
local user can join room with version 3
Remote user can backfill in a room with version 3
User can create and send/receive messages in a room with version 4
local user can join room with version 4
remote user can join room with version 3
remote user can join room with version 4
Remote user can backfill in a room with version 4
# We don't support ignores yet, so ignore this for now - ha ha.
# Ignore invite in incremental sync
Outbound federation can send invites via v2 API
User can invite local user to room with version 3
User can invite local user to room with version 4