diff --git a/WIRING.md b/WIRING.md index bddb1614c..8ec5b0432 100644 --- a/WIRING.md +++ b/WIRING.md @@ -72,7 +72,7 @@ Diagram: | | | | | | +---+ | | | | +----------| S | | | - | | | Typing +---+ | | + | | | EDU +---+ | | | |>=========================================>| Server |>=====================>| | +------------+ | | +----------+ +---+ | | @@ -156,7 +156,7 @@ choke-point to implement ratelimiting and backoff correctly. * It may be impossible to implement without folding it into the Room Server forever coupling the components together. -## Typing Server +## EDU Server * Reads new updates to typing from the logs written by the FS and CTS. * Updates the current list of people typing in a room. @@ -179,7 +179,7 @@ choke-point to implement ratelimiting and backoff correctly. * Reads new events and the current state of the rooms from logs written by the Room Server. * Reads new receipts positions from the logs written by the Receipts Server. * Reads changes to presence from the logs written by the Presence Server. - * Reads changes to typing from the logs written by the Typing Server. + * Reads changes to typing from the logs written by the EDU Server. * Writes when a client starts and stops syncing to the logs. ## Client Search diff --git a/appservice/api/query.go b/appservice/api/query.go index 7e61d6233..afd5c5d76 100644 --- a/appservice/api/query.go +++ b/appservice/api/query.go @@ -20,6 +20,7 @@ package api import ( "context" "database/sql" + "errors" "net/http" "github.com/matrix-org/dendrite/clientapi/auth/authtypes" @@ -97,15 +98,15 @@ type httpAppServiceQueryAPI struct { // NewAppServiceQueryAPIHTTP creates a AppServiceQueryAPI implemented by talking // to a HTTP POST API. -// If httpClient is nil then it uses http.DefaultClient +// If httpClient is nil an error is returned func NewAppServiceQueryAPIHTTP( appserviceURL string, httpClient *http.Client, -) AppServiceQueryAPI { +) (AppServiceQueryAPI, error) { if httpClient == nil { - httpClient = http.DefaultClient + return nil, errors.New("NewRoomserverAliasAPIHTTP: httpClient is ") } - return &httpAppServiceQueryAPI{appserviceURL, httpClient} + return &httpAppServiceQueryAPI{appserviceURL, httpClient}, nil } // RoomAliasExists implements AppServiceQueryAPI diff --git a/clientapi/clientapi.go b/clientapi/clientapi.go index e608b69f3..1339f7c8c 100644 --- a/clientapi/clientapi.go +++ b/clientapi/clientapi.go @@ -23,9 +23,9 @@ import ( "github.com/matrix-org/dendrite/clientapi/routing" "github.com/matrix-org/dendrite/common/basecomponent" "github.com/matrix-org/dendrite/common/transactions" + eduServerAPI "github.com/matrix-org/dendrite/eduserver/api" federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" - typingServerAPI "github.com/matrix-org/dendrite/typingserver/api" "github.com/matrix-org/gomatrixserverlib" "github.com/sirupsen/logrus" ) @@ -41,13 +41,13 @@ func SetupClientAPIComponent( aliasAPI roomserverAPI.RoomserverAliasAPI, inputAPI roomserverAPI.RoomserverInputAPI, queryAPI roomserverAPI.RoomserverQueryAPI, - typingInputAPI typingServerAPI.TypingServerInputAPI, + eduInputAPI eduServerAPI.EDUServerInputAPI, asAPI appserviceAPI.AppServiceQueryAPI, transactionsCache *transactions.Cache, fedSenderAPI federationSenderAPI.FederationSenderQueryAPI, ) { roomserverProducer := producers.NewRoomserverProducer(inputAPI, queryAPI) - typingProducer := producers.NewTypingServerProducer(typingInputAPI) + eduProducer := producers.NewEDUServerProducer(eduInputAPI) userUpdateProducer := &producers.UserUpdateProducer{ Producer: base.KafkaProducer, @@ -69,6 +69,6 @@ func SetupClientAPIComponent( routing.Setup( base.APIMux, base.Cfg, roomserverProducer, queryAPI, aliasAPI, asAPI, accountsDB, deviceDB, federation, *keyRing, userUpdateProducer, - syncProducer, typingProducer, transactionsCache, fedSenderAPI, + syncProducer, eduProducer, transactionsCache, fedSenderAPI, ) } diff --git a/clientapi/producers/typingserver.go b/clientapi/producers/eduserver.go similarity index 68% rename from clientapi/producers/typingserver.go rename to clientapi/producers/eduserver.go index f4d0bcba7..30c40fb7f 100644 --- a/clientapi/producers/typingserver.go +++ b/clientapi/producers/eduserver.go @@ -16,32 +16,32 @@ import ( "context" "time" - "github.com/matrix-org/dendrite/typingserver/api" + "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/gomatrixserverlib" ) -// TypingServerProducer produces events for the typing server to consume -type TypingServerProducer struct { - InputAPI api.TypingServerInputAPI +// EDUServerProducer produces events for the EDU server to consume +type EDUServerProducer struct { + InputAPI api.EDUServerInputAPI } -// NewTypingServerProducer creates a new TypingServerProducer -func NewTypingServerProducer(inputAPI api.TypingServerInputAPI) *TypingServerProducer { - return &TypingServerProducer{ +// NewEDUServerProducer creates a new EDUServerProducer +func NewEDUServerProducer(inputAPI api.EDUServerInputAPI) *EDUServerProducer { + return &EDUServerProducer{ InputAPI: inputAPI, } } -// Send typing event to typing server -func (p *TypingServerProducer) Send( +// SendTyping sends a typing event to EDU server +func (p *EDUServerProducer) SendTyping( ctx context.Context, userID, roomID string, - typing bool, timeout int64, + typing bool, timeoutMS int64, ) error { requestData := api.InputTypingEvent{ UserID: userID, RoomID: roomID, Typing: typing, - Timeout: timeout, + TimeoutMS: timeoutMS, OriginServerTS: gomatrixserverlib.AsTimestamp(time.Now()), } diff --git a/clientapi/producers/roomserver.go b/clientapi/producers/roomserver.go index 06af54404..391ea07bf 100644 --- a/clientapi/producers/roomserver.go +++ b/clientapi/producers/roomserver.go @@ -104,18 +104,14 @@ func (c *RoomserverProducer) SendInputRoomEvents( // This should only be needed for invite events that occur outside of a known room. // If we are in the room then the event should be sent using the SendEvents method. func (c *RoomserverProducer) SendInvite( - ctx context.Context, inviteEvent gomatrixserverlib.Event, + ctx context.Context, inviteEvent gomatrixserverlib.HeaderedEvent, + inviteRoomState []gomatrixserverlib.InviteV2StrippedState, ) error { - verReq := api.QueryRoomVersionForRoomRequest{RoomID: inviteEvent.RoomID()} - verRes := api.QueryRoomVersionForRoomResponse{} - err := c.QueryAPI.QueryRoomVersionForRoom(ctx, &verReq, &verRes) - if err != nil { - return err - } - request := api.InputRoomEventsRequest{ InputInviteEvents: []api.InputInviteEvent{{ - Event: inviteEvent.Headered(verRes.RoomVersion), + Event: inviteEvent, + InviteRoomState: inviteRoomState, + RoomVersion: inviteEvent.RoomVersion, }}, } var response api.InputRoomEventsResponse diff --git a/clientapi/routing/account_data.go b/clientapi/routing/account_data.go index 24db41f5f..a5d53c326 100644 --- a/clientapi/routing/account_data.go +++ b/clientapi/routing/account_data.go @@ -15,6 +15,7 @@ package routing import ( + "encoding/json" "io/ioutil" "net/http" @@ -80,12 +81,26 @@ func SaveAccountData( defer req.Body.Close() // nolint: errcheck + if req.Body == http.NoBody { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.NotJSON("Content not JSON"), + } + } + body, err := ioutil.ReadAll(req.Body) if err != nil { util.GetLogger(req.Context()).WithError(err).Error("ioutil.ReadAll failed") return jsonerror.InternalServerError() } + if !json.Valid(body) { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.BadJSON("Bad JSON content"), + } + } + if err := accountDB.SaveAccountData( req.Context(), localpart, roomID, dataType, string(body), ); err != nil { diff --git a/clientapi/routing/capabilities.go b/clientapi/routing/capabilities.go index 0c583055e..1792c6308 100644 --- a/clientapi/routing/capabilities.go +++ b/clientapi/routing/capabilities.go @@ -29,7 +29,7 @@ func GetCapabilities( req *http.Request, queryAPI roomserverAPI.RoomserverQueryAPI, ) util.JSONResponse { roomVersionsQueryReq := roomserverAPI.QueryRoomVersionCapabilitiesRequest{} - var roomVersionsQueryRes roomserverAPI.QueryRoomVersionCapabilitiesResponse + roomVersionsQueryRes := roomserverAPI.QueryRoomVersionCapabilitiesResponse{} if err := queryAPI.QueryRoomVersionCapabilities( req.Context(), &roomVersionsQueryReq, diff --git a/clientapi/routing/joinroom.go b/clientapi/routing/joinroom.go index 745b4eecc..0f1a9ba4d 100644 --- a/clientapi/routing/joinroom.go +++ b/clientapi/routing/joinroom.go @@ -242,6 +242,9 @@ func (r joinRoomReq) joinRoomUsingServers( queryRes := roomserverAPI.QueryLatestEventsAndStateResponse{} event, err := common.BuildEvent(r.req.Context(), &eb, r.cfg, r.evTime, r.queryAPI, &queryRes) if err == nil { + // If we have successfully built an event at this point then we can + // assert that the room is a local room, as BuildEvent was able to + // add prev_events etc successfully. if _, err = r.producer.SendEvents( r.req.Context(), []gomatrixserverlib.HeaderedEvent{ @@ -260,6 +263,13 @@ func (r joinRoomReq) joinRoomUsingServers( }{roomID}, } } + + // Otherwise, if we've reached here, then we haven't been able to populate + // prev_events etc for the room, therefore the room is probably federated. + + // TODO: This needs to be re-thought, as in the case of an invite, the room + // will exist in the database in roomserver_rooms but won't have any state + // events, therefore this below check fails. if err != common.ErrRoomNoExists { util.GetLogger(r.req.Context()).WithError(err).Error("common.BuildEvent failed") return jsonerror.InternalServerError() @@ -320,14 +330,14 @@ func (r joinRoomReq) joinRoomUsingServer(roomID string, server gomatrixserverlib respMakeJoin, err := r.federation.MakeJoin(r.req.Context(), server, roomID, r.userID, supportedVersions) if err != nil { // TODO: Check if the user was not allowed to join the room. - return nil, err + return nil, fmt.Errorf("r.federation.MakeJoin: %w", err) } // Set all the fields to be what they should be, this should be a no-op // but it's possible that the remote server returned us something "odd" err = r.writeToBuilder(&respMakeJoin.JoinEvent, roomID) if err != nil { - return nil, err + return nil, fmt.Errorf("r.writeToBuilder: %w", err) } if respMakeJoin.RoomVersion == "" { @@ -347,18 +357,16 @@ func (r joinRoomReq) joinRoomUsingServer(roomID string, server gomatrixserverlib r.cfg.Matrix.PrivateKey, respMakeJoin.RoomVersion, ) if err != nil { - util.GetLogger(r.req.Context()).WithError(err).Error("respMakeJoin.JoinEvent.Build failed") - res := jsonerror.InternalServerError() - return &res, nil + return nil, fmt.Errorf("respMakeJoin.JoinEvent.Build: %w", err) } respSendJoin, err := r.federation.SendJoin(r.req.Context(), server, event, respMakeJoin.RoomVersion) if err != nil { - return nil, err + return nil, fmt.Errorf("r.federation.SendJoin: %w", err) } if err = respSendJoin.Check(r.req.Context(), r.keyRing, event); err != nil { - return nil, err + return nil, fmt.Errorf("respSendJoin: %w", err) } if err = r.producer.SendEventWithState( @@ -366,9 +374,7 @@ func (r joinRoomReq) joinRoomUsingServer(roomID string, server gomatrixserverlib gomatrixserverlib.RespState(respSendJoin.RespState), event.Headered(respMakeJoin.RoomVersion), ); err != nil { - util.GetLogger(r.req.Context()).WithError(err).Error("gomatrixserverlib.RespState failed") - res := jsonerror.InternalServerError() - return &res, nil + return nil, fmt.Errorf("r.producer.SendEventWithState: %w", err) } return &util.JSONResponse{ diff --git a/clientapi/routing/routing.go b/clientapi/routing/routing.go index 22ff12b02..91a1588cb 100644 --- a/clientapi/routing/routing.go +++ b/clientapi/routing/routing.go @@ -58,7 +58,7 @@ func Setup( keyRing gomatrixserverlib.KeyRing, userUpdateProducer *producers.UserUpdateProducer, syncProducer *producers.SyncAPIProducer, - typingProducer *producers.TypingServerProducer, + eduProducer *producers.EDUServerProducer, transactionsCache *transactions.Cache, federationSender federationSenderAPI.FederationSenderQueryAPI, ) { @@ -235,7 +235,7 @@ func Setup( if err != nil { return util.ErrorResponse(err) } - return SendTyping(req, device, vars["roomID"], vars["userID"], accountDB, typingProducer) + return SendTyping(req, device, vars["roomID"], vars["userID"], accountDB, eduProducer) }), ).Methods(http.MethodPut, http.MethodOptions) diff --git a/clientapi/routing/sendtyping.go b/clientapi/routing/sendtyping.go index 29953c32d..ffaa0e662 100644 --- a/clientapi/routing/sendtyping.go +++ b/clientapi/routing/sendtyping.go @@ -35,7 +35,7 @@ type typingContentJSON struct { func SendTyping( req *http.Request, device *authtypes.Device, roomID string, userID string, accountDB accounts.Database, - typingProducer *producers.TypingServerProducer, + eduProducer *producers.EDUServerProducer, ) util.JSONResponse { if device.UserID != userID { return util.JSONResponse{ @@ -69,10 +69,10 @@ func SendTyping( return *resErr } - if err = typingProducer.Send( + if err = eduProducer.SendTyping( req.Context(), userID, roomID, r.Typing, r.Timeout, ); err != nil { - util.GetLogger(req.Context()).WithError(err).Error("typingProducer.Send failed") + util.GetLogger(req.Context()).WithError(err).Error("eduProducer.Send failed") return jsonerror.InternalServerError() } diff --git a/cmd/dendrite-client-api-server/main.go b/cmd/dendrite-client-api-server/main.go index 2bde0f4cf..a7e241b13 100644 --- a/cmd/dendrite-client-api-server/main.go +++ b/cmd/dendrite-client-api-server/main.go @@ -19,8 +19,8 @@ import ( "github.com/matrix-org/dendrite/common/basecomponent" "github.com/matrix-org/dendrite/common/keydb" "github.com/matrix-org/dendrite/common/transactions" - "github.com/matrix-org/dendrite/typingserver" - "github.com/matrix-org/dendrite/typingserver/cache" + "github.com/matrix-org/dendrite/eduserver" + "github.com/matrix-org/dendrite/eduserver/cache" ) func main() { @@ -38,11 +38,11 @@ func main() { asQuery := base.CreateHTTPAppServiceAPIs() alias, input, query := base.CreateHTTPRoomserverAPIs() fedSenderAPI := base.CreateHTTPFederationSenderAPIs() - typingInputAPI := typingserver.SetupTypingServerComponent(base, cache.NewTypingCache()) + eduInputAPI := eduserver.SetupEDUServerComponent(base, cache.New()) clientapi.SetupClientAPIComponent( base, deviceDB, accountDB, federation, &keyRing, - alias, input, query, typingInputAPI, asQuery, transactions.New(), fedSenderAPI, + alias, input, query, eduInputAPI, asQuery, transactions.New(), fedSenderAPI, ) base.SetupAndServeHTTP(string(base.Cfg.Bind.ClientAPI), string(base.Cfg.Listen.ClientAPI)) diff --git a/cmd/dendrite-typing-server/main.go b/cmd/dendrite-edu-server/main.go similarity index 72% rename from cmd/dendrite-typing-server/main.go rename to cmd/dendrite-edu-server/main.go index 461eb7144..a4511f1ba 100644 --- a/cmd/dendrite-typing-server/main.go +++ b/cmd/dendrite-edu-server/main.go @@ -16,22 +16,22 @@ import ( _ "net/http/pprof" "github.com/matrix-org/dendrite/common/basecomponent" - "github.com/matrix-org/dendrite/typingserver" - "github.com/matrix-org/dendrite/typingserver/cache" + "github.com/matrix-org/dendrite/eduserver" + "github.com/matrix-org/dendrite/eduserver/cache" "github.com/sirupsen/logrus" ) func main() { cfg := basecomponent.ParseFlags() - base := basecomponent.NewBaseDendrite(cfg, "TypingServerAPI") + base := basecomponent.NewBaseDendrite(cfg, "EDUServerAPI") defer func() { if err := base.Close(); err != nil { logrus.WithError(err).Warn("BaseDendrite close failed") } }() - typingserver.SetupTypingServerComponent(base, cache.NewTypingCache()) + eduserver.SetupEDUServerComponent(base, cache.New()) - base.SetupAndServeHTTP(string(base.Cfg.Bind.TypingServer), string(base.Cfg.Listen.TypingServer)) + base.SetupAndServeHTTP(string(base.Cfg.Bind.EDUServer), string(base.Cfg.Listen.EDUServer)) } diff --git a/cmd/dendrite-federation-api-server/main.go b/cmd/dendrite-federation-api-server/main.go index 367f5dc0c..d18926a68 100644 --- a/cmd/dendrite-federation-api-server/main.go +++ b/cmd/dendrite-federation-api-server/main.go @@ -15,8 +15,11 @@ package main import ( + "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/common/basecomponent" "github.com/matrix-org/dendrite/common/keydb" + "github.com/matrix-org/dendrite/eduserver" + "github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/federationapi" ) @@ -34,10 +37,12 @@ func main() { alias, input, query := base.CreateHTTPRoomserverAPIs() asQuery := base.CreateHTTPAppServiceAPIs() + eduInputAPI := eduserver.SetupEDUServerComponent(base, cache.New()) + eduProducer := producers.NewEDUServerProducer(eduInputAPI) federationapi.SetupFederationAPIComponent( base, accountDB, deviceDB, federation, &keyRing, - alias, input, query, asQuery, federationSender, + alias, input, query, asQuery, federationSender, eduProducer, ) base.SetupAndServeHTTP(string(base.Cfg.Bind.FederationAPI), string(base.Cfg.Listen.FederationAPI)) diff --git a/cmd/dendrite-monolith-server/main.go b/cmd/dendrite-monolith-server/main.go index 27c3054b8..9f6531ed3 100644 --- a/cmd/dendrite-monolith-server/main.go +++ b/cmd/dendrite-monolith-server/main.go @@ -20,18 +20,19 @@ import ( "github.com/matrix-org/dendrite/appservice" "github.com/matrix-org/dendrite/clientapi" + "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common/basecomponent" "github.com/matrix-org/dendrite/common/keydb" "github.com/matrix-org/dendrite/common/transactions" + "github.com/matrix-org/dendrite/eduserver" + "github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/federationapi" "github.com/matrix-org/dendrite/federationsender" "github.com/matrix-org/dendrite/mediaapi" "github.com/matrix-org/dendrite/publicroomsapi" "github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/syncapi" - "github.com/matrix-org/dendrite/typingserver" - "github.com/matrix-org/dendrite/typingserver/cache" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/sirupsen/logrus" @@ -56,7 +57,7 @@ func main() { keyRing := keydb.CreateKeyRing(federation.Client, keyDB) alias, input, query := roomserver.SetupRoomServerComponent(base) - typingInputAPI := typingserver.SetupTypingServerComponent(base, cache.NewTypingCache()) + eduInputAPI := eduserver.SetupEDUServerComponent(base, cache.New()) asQuery := appservice.SetupAppServiceAPIComponent( base, accountDB, deviceDB, federation, alias, query, transactions.New(), ) @@ -65,9 +66,10 @@ func main() { clientapi.SetupClientAPIComponent( base, deviceDB, accountDB, federation, &keyRing, alias, input, query, - typingInputAPI, asQuery, transactions.New(), fedSenderAPI, + eduInputAPI, asQuery, transactions.New(), fedSenderAPI, ) - federationapi.SetupFederationAPIComponent(base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery, fedSenderAPI) + eduProducer := producers.NewEDUServerProducer(eduInputAPI) + federationapi.SetupFederationAPIComponent(base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery, fedSenderAPI, eduProducer) mediaapi.SetupMediaAPIComponent(base, deviceDB) publicroomsapi.SetupPublicRoomsAPIComponent(base, deviceDB, query, federation, nil) syncapi.SetupSyncAPIComponent(base, deviceDB, accountDB, query, federation, cfg) diff --git a/cmd/dendritejs/main.go b/cmd/dendritejs/main.go index 7c8526715..05802725d 100644 --- a/cmd/dendritejs/main.go +++ b/cmd/dendritejs/main.go @@ -23,18 +23,19 @@ import ( "github.com/matrix-org/dendrite/appservice" "github.com/matrix-org/dendrite/clientapi" + "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common/basecomponent" "github.com/matrix-org/dendrite/common/config" "github.com/matrix-org/dendrite/common/transactions" + "github.com/matrix-org/dendrite/eduserver" + "github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/federationapi" "github.com/matrix-org/dendrite/federationsender" "github.com/matrix-org/dendrite/mediaapi" "github.com/matrix-org/dendrite/publicroomsapi" "github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/syncapi" - "github.com/matrix-org/dendrite/typingserver" - "github.com/matrix-org/dendrite/typingserver/cache" "github.com/matrix-org/go-http-js-libp2p/go_http_js_libp2p" "github.com/matrix-org/gomatrixserverlib" @@ -122,7 +123,7 @@ func main() { p2pPublicRoomProvider := NewLibP2PPublicRoomsProvider(node) alias, input, query := roomserver.SetupRoomServerComponent(base) - typingInputAPI := typingserver.SetupTypingServerComponent(base, cache.NewTypingCache()) + eduInputAPI := eduserver.SetupEDUServerComponent(base, cache.New()) asQuery := appservice.SetupAppServiceAPIComponent( base, accountDB, deviceDB, federation, alias, query, transactions.New(), ) @@ -131,9 +132,10 @@ func main() { clientapi.SetupClientAPIComponent( base, deviceDB, accountDB, federation, &keyRing, alias, input, query, - typingInputAPI, asQuery, transactions.New(), fedSenderAPI, + eduInputAPI, asQuery, transactions.New(), fedSenderAPI, ) - federationapi.SetupFederationAPIComponent(base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery, fedSenderAPI) + eduProducer := producers.NewEDUServerProducer(eduInputAPI) + federationapi.SetupFederationAPIComponent(base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery, fedSenderAPI, eduProducer) mediaapi.SetupMediaAPIComponent(base, deviceDB) publicroomsapi.SetupPublicRoomsAPIComponent(base, deviceDB, query, federation, p2pPublicRoomProvider) syncapi.SetupSyncAPIComponent(base, deviceDB, accountDB, query, federation, cfg) diff --git a/cmd/roomserver-integration-tests/main.go b/cmd/roomserver-integration-tests/main.go index d4a8a1d10..df5607bcb 100644 --- a/cmd/roomserver-integration-tests/main.go +++ b/cmd/roomserver-integration-tests/main.go @@ -44,6 +44,8 @@ var ( // This needs to be high enough to account for the time it takes to create // the postgres database tables which can take a while on travis. timeoutString = defaulting(os.Getenv("TIMEOUT"), "60s") + // Timeout for http client + timeoutHTTPClient = defaulting(os.Getenv("TIMEOUT_HTTP"), "30s") // The name of maintenance database to connect to in order to create the test database. postgresDatabase = defaulting(os.Getenv("POSTGRES_DATABASE"), "postgres") // The name of the test database to create. @@ -68,7 +70,10 @@ func defaulting(value, defaultValue string) string { return value } -var timeout time.Duration +var ( + timeout time.Duration + timeoutHTTP time.Duration +) func init() { var err error @@ -76,6 +81,10 @@ func init() { if err != nil { panic(err) } + timeoutHTTP, err = time.ParseDuration(timeoutHTTPClient) + if err != nil { + panic(err) + } } func createDatabase(database string) error { @@ -199,7 +208,10 @@ func writeToRoomServer(input []string, roomserverURL string) error { return err } } - x := api.NewRoomserverInputAPIHTTP(roomserverURL, nil) + x, err := api.NewRoomserverInputAPIHTTP(roomserverURL, &http.Client{Timeout: timeoutHTTP}) + if err != nil { + return err + } return x.InputRoomEvents(context.Background(), &request, &response) } @@ -258,7 +270,7 @@ func testRoomserver(input []string, wantOutput []string, checkQueries func(api.R cmd.Args = []string{"dendrite-room-server", "--config", filepath.Join(dir, test.ConfigFile)} gotOutput, err := runAndReadFromTopic(cmd, cfg.RoomServerURL()+"/metrics", doInput, outputTopic, len(wantOutput), func() { - queryAPI := api.NewRoomserverQueryAPIHTTP("http://"+string(cfg.Listen.RoomServer), nil) + queryAPI, _ := api.NewRoomserverQueryAPIHTTP("http://"+string(cfg.Listen.RoomServer), &http.Client{Timeout: timeoutHTTP}) checkQueries(queryAPI) }) if err != nil { diff --git a/common/basecomponent/base.go b/common/basecomponent/base.go index d1d953f7b..432819a23 100644 --- a/common/basecomponent/base.go +++ b/common/basecomponent/base.go @@ -19,6 +19,7 @@ import ( "io" "net/http" "net/url" + "time" "golang.org/x/crypto/ed25519" @@ -35,9 +36,9 @@ import ( appserviceAPI "github.com/matrix-org/dendrite/appservice/api" "github.com/matrix-org/dendrite/common/config" + eduServerAPI "github.com/matrix-org/dendrite/eduserver/api" federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" - typingServerAPI "github.com/matrix-org/dendrite/typingserver/api" "github.com/sirupsen/logrus" ) @@ -52,6 +53,7 @@ type BaseDendrite struct { // APIMux should be used to register new public matrix api endpoints APIMux *mux.Router + httpClient *http.Client Cfg *config.Dendrite KafkaConsumer sarama.Consumer KafkaProducer sarama.SyncProducer @@ -77,11 +79,14 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string) *BaseDendrite { kafkaConsumer, kafkaProducer = setupKafka(cfg) } + const defaultHTTPTimeout = 30 * time.Second + return &BaseDendrite{ componentName: componentName, tracerCloser: closer, Cfg: cfg, APIMux: mux.NewRouter().UseEncodedPath(), + httpClient: &http.Client{Timeout: defaultHTTPTimeout}, KafkaConsumer: kafkaConsumer, KafkaProducer: kafkaProducer, } @@ -95,7 +100,11 @@ func (b *BaseDendrite) Close() error { // CreateHTTPAppServiceAPIs returns the QueryAPI for hitting the appservice // component over HTTP. func (b *BaseDendrite) CreateHTTPAppServiceAPIs() appserviceAPI.AppServiceQueryAPI { - return appserviceAPI.NewAppServiceQueryAPIHTTP(b.Cfg.AppServiceURL(), nil) + a, err := appserviceAPI.NewAppServiceQueryAPIHTTP(b.Cfg.AppServiceURL(), b.httpClient) + if err != nil { + logrus.WithError(err).Panic("CreateHTTPAppServiceAPIs failed") + } + return a } // CreateHTTPRoomserverAPIs returns the AliasAPI, InputAPI and QueryAPI for hitting @@ -105,22 +114,40 @@ func (b *BaseDendrite) CreateHTTPRoomserverAPIs() ( roomserverAPI.RoomserverInputAPI, roomserverAPI.RoomserverQueryAPI, ) { - alias := roomserverAPI.NewRoomserverAliasAPIHTTP(b.Cfg.RoomServerURL(), nil) - input := roomserverAPI.NewRoomserverInputAPIHTTP(b.Cfg.RoomServerURL(), nil) - query := roomserverAPI.NewRoomserverQueryAPIHTTP(b.Cfg.RoomServerURL(), nil) + + alias, err := roomserverAPI.NewRoomserverAliasAPIHTTP(b.Cfg.RoomServerURL(), b.httpClient) + if err != nil { + logrus.WithError(err).Panic("NewRoomserverAliasAPIHTTP failed") + } + input, err := roomserverAPI.NewRoomserverInputAPIHTTP(b.Cfg.RoomServerURL(), b.httpClient) + if err != nil { + logrus.WithError(err).Panic("NewRoomserverInputAPIHTTP failed", b.httpClient) + } + query, err := roomserverAPI.NewRoomserverQueryAPIHTTP(b.Cfg.RoomServerURL(), nil) + if err != nil { + logrus.WithError(err).Panic("NewRoomserverQueryAPIHTTP failed", b.httpClient) + } return alias, input, query } -// CreateHTTPTypingServerAPIs returns typingInputAPI for hitting the typing +// CreateHTTPEDUServerAPIs returns eduInputAPI for hitting the EDU // server over HTTP -func (b *BaseDendrite) CreateHTTPTypingServerAPIs() typingServerAPI.TypingServerInputAPI { - return typingServerAPI.NewTypingServerInputAPIHTTP(b.Cfg.TypingServerURL(), nil) +func (b *BaseDendrite) CreateHTTPEDUServerAPIs() eduServerAPI.EDUServerInputAPI { + e, err := eduServerAPI.NewEDUServerInputAPIHTTP(b.Cfg.EDUServerURL(), nil) + if err != nil { + logrus.WithError(err).Panic("NewEDUServerInputAPIHTTP failed", b.httpClient) + } + return e } // CreateHTTPFederationSenderAPIs returns FederationSenderQueryAPI for hitting // the federation sender over HTTP func (b *BaseDendrite) CreateHTTPFederationSenderAPIs() federationSenderAPI.FederationSenderQueryAPI { - return federationSenderAPI.NewFederationSenderQueryAPIHTTP(b.Cfg.FederationSenderURL(), nil) + f, err := federationSenderAPI.NewFederationSenderQueryAPIHTTP(b.Cfg.FederationSenderURL(), nil) + if err != nil { + logrus.WithError(err).Panic("NewFederationSenderQueryAPIHTTP failed", b.httpClient) + } + return f } // CreateDeviceDB creates a new instance of the device database. Should only be diff --git a/common/config/config.go b/common/config/config.go index bd83cbf8b..e2f5e6635 100644 --- a/common/config/config.go +++ b/common/config/config.go @@ -134,7 +134,7 @@ type Dendrite struct { OutputRoomEvent Topic `yaml:"output_room_event"` // Topic for sending account data from client API to sync API OutputClientData Topic `yaml:"output_client_data"` - // Topic for typingserver/api.OutputTypingEvent events. + // Topic for eduserver/api.OutputTypingEvent events. OutputTypingEvent Topic `yaml:"output_typing_event"` // Topic for user updates (profile, presence) UserUpdates Topic `yaml:"user_updates"` @@ -206,7 +206,7 @@ type Dendrite struct { RoomServer Address `yaml:"room_server"` FederationSender Address `yaml:"federation_sender"` PublicRoomsAPI Address `yaml:"public_rooms_api"` - TypingServer Address `yaml:"typing_server"` + EDUServer Address `yaml:"edu_server"` } `yaml:"bind"` // The addresses for talking to other microservices. @@ -219,7 +219,7 @@ type Dendrite struct { RoomServer Address `yaml:"room_server"` FederationSender Address `yaml:"federation_sender"` PublicRoomsAPI Address `yaml:"public_rooms_api"` - TypingServer Address `yaml:"typing_server"` + EDUServer Address `yaml:"edu_server"` } `yaml:"listen"` // The config for tracing the dendrite servers. @@ -571,7 +571,7 @@ func (config *Dendrite) checkListen(configErrs *configErrors) { checkNotEmpty(configErrs, "listen.federation_api", string(config.Listen.FederationAPI)) checkNotEmpty(configErrs, "listen.sync_api", string(config.Listen.SyncAPI)) checkNotEmpty(configErrs, "listen.room_server", string(config.Listen.RoomServer)) - checkNotEmpty(configErrs, "listen.typing_server", string(config.Listen.TypingServer)) + checkNotEmpty(configErrs, "listen.edu_server", string(config.Listen.EDUServer)) } // checkLogging verifies the parameters logging.* are valid. @@ -669,7 +669,7 @@ func fingerprintPEM(data []byte) *gomatrixserverlib.TLSFingerprint { // AppServiceURL returns a HTTP URL for where the appservice component is listening. func (config *Dendrite) AppServiceURL() string { - // Hard code the roomserver to talk HTTP for now. + // Hard code the appservice server to talk HTTP for now. // If we support HTTPS we need to think of a practical way to do certificate validation. // People setting up servers shouldn't need to get a certificate valid for the public // internet for an internal API. @@ -685,18 +685,18 @@ func (config *Dendrite) RoomServerURL() string { return "http://" + string(config.Listen.RoomServer) } -// TypingServerURL returns an HTTP URL for where the typing server is listening. -func (config *Dendrite) TypingServerURL() string { - // Hard code the typing server to talk HTTP for now. +// EDUServerURL returns an HTTP URL for where the EDU server is listening. +func (config *Dendrite) EDUServerURL() string { + // Hard code the EDU server to talk HTTP for now. // If we support HTTPS we need to think of a practical way to do certificate validation. // People setting up servers shouldn't need to get a certificate valid for the public // internet for an internal API. - return "http://" + string(config.Listen.TypingServer) + return "http://" + string(config.Listen.EDUServer) } // FederationSenderURL returns an HTTP URL for where the federation sender is listening. func (config *Dendrite) FederationSenderURL() string { - // Hard code the typing server to talk HTTP for now. + // Hard code the federation sender server to talk HTTP for now. // If we support HTTPS we need to think of a practical way to do certificate validation. // People setting up servers shouldn't need to get a certificate valid for the public // internet for an internal API. diff --git a/common/config/config_test.go b/common/config/config_test.go index 110c8b84c..b72f5fad0 100644 --- a/common/config/config_test.go +++ b/common/config/config_test.go @@ -62,7 +62,7 @@ listen: sync_api: "localhost:7773" media_api: "localhost:7774" appservice_api: "localhost:7777" - typing_server: "localhost:7778" + edu_server: "localhost:7778" logging: - type: "file" level: "info" diff --git a/common/events.go b/common/events.go index b79998a73..adbdf3389 100644 --- a/common/events.go +++ b/common/events.go @@ -17,6 +17,7 @@ package common import ( "context" "errors" + "fmt" "time" "github.com/matrix-org/dendrite/common/config" @@ -46,6 +47,7 @@ func BuildEvent( err := AddPrevEventsToEvent(ctx, builder, queryAPI, queryRes) if err != nil { + // This can pass through a ErrRoomNoExists to the caller return nil, err } @@ -68,7 +70,7 @@ func AddPrevEventsToEvent( ) error { eventsNeeded, err := gomatrixserverlib.StateNeededForEventBuilder(builder) if err != nil { - return err + return fmt.Errorf("gomatrixserverlib.StateNeededForEventBuilder: %w", err) } // Ask the roomserver for information about this room @@ -77,7 +79,7 @@ func AddPrevEventsToEvent( StateToFetch: eventsNeeded.Tuples(), } if err = queryAPI.QueryLatestEventsAndState(ctx, &queryReq, queryRes); err != nil { - return err + return fmt.Errorf("queryAPI.QueryLatestEventsAndState: %w", err) } if !queryRes.RoomExists { @@ -86,7 +88,7 @@ func AddPrevEventsToEvent( eventFormat, err := queryRes.RoomVersion.EventFormat() if err != nil { - return err + return fmt.Errorf("queryRes.RoomVersion.EventFormat: %w", err) } builder.Depth = queryRes.Depth @@ -96,26 +98,26 @@ func AddPrevEventsToEvent( for i := range queryRes.StateEvents { err = authEvents.AddEvent(&queryRes.StateEvents[i].Event) if err != nil { - return err + return fmt.Errorf("authEvents.AddEvent: %w", err) } } refs, err := eventsNeeded.AuthEventReferences(&authEvents) if err != nil { - return err + return fmt.Errorf("eventsNeeded.AuthEventReferences: %w", err) } + truncAuth, truncPrev := truncateAuthAndPrevEvents(refs, queryRes.LatestEvents) switch eventFormat { case gomatrixserverlib.EventFormatV1: - builder.AuthEvents = refs - builder.PrevEvents = queryRes.LatestEvents + builder.AuthEvents = truncAuth + builder.PrevEvents = truncPrev case gomatrixserverlib.EventFormatV2: - v2AuthRefs := []string{} - v2PrevRefs := []string{} - for _, ref := range refs { + v2AuthRefs, v2PrevRefs := []string{}, []string{} + for _, ref := range truncAuth { v2AuthRefs = append(v2AuthRefs, ref.EventID) } - for _, ref := range queryRes.LatestEvents { + for _, ref := range truncPrev { v2PrevRefs = append(v2PrevRefs, ref.EventID) } builder.AuthEvents = v2AuthRefs @@ -124,3 +126,21 @@ func AddPrevEventsToEvent( return nil } + +// truncateAuthAndPrevEvents limits the number of events we add into +// an event as prev_events or auth_events. +// NOTSPEC: The limits here feel a bit arbitrary but they are currently +// here because of https://github.com/matrix-org/matrix-doc/issues/2307 +// and because Synapse will just drop events that don't comply. +func truncateAuthAndPrevEvents(auth, prev []gomatrixserverlib.EventReference) ( + truncAuth, truncPrev []gomatrixserverlib.EventReference, +) { + truncAuth, truncPrev = auth, prev + if len(truncAuth) > 10 { + truncAuth = truncAuth[:10] + } + if len(truncPrev) > 20 { + truncPrev = truncPrev[:20] + } + return +} diff --git a/common/test/config.go b/common/test/config.go index 0fed252ae..f88e45125 100644 --- a/common/test/config.go +++ b/common/test/config.go @@ -106,7 +106,7 @@ func MakeConfig(configDir, kafkaURI, database, host string, startPort int) (*con cfg.Listen.RoomServer = assignAddress() cfg.Listen.SyncAPI = assignAddress() cfg.Listen.PublicRoomsAPI = assignAddress() - cfg.Listen.TypingServer = assignAddress() + cfg.Listen.EDUServer = assignAddress() // Bind to the same address as the listen address // All microservices are run on the same host in testing @@ -117,7 +117,7 @@ func MakeConfig(configDir, kafkaURI, database, host string, startPort int) (*con cfg.Bind.RoomServer = cfg.Listen.RoomServer cfg.Bind.SyncAPI = cfg.Listen.SyncAPI cfg.Bind.PublicRoomsAPI = cfg.Listen.PublicRoomsAPI - cfg.Bind.TypingServer = cfg.Listen.TypingServer + cfg.Bind.EDUServer = cfg.Listen.EDUServer return &cfg, port, nil } diff --git a/dendrite-config.yaml b/dendrite-config.yaml index a8d39aa1e..7436af7a3 100644 --- a/dendrite-config.yaml +++ b/dendrite-config.yaml @@ -85,7 +85,7 @@ kafka: topics: output_room_event: roomserverOutput output_client_data: clientapiOutput - output_typing_event: typingServerOutput + output_typing_event: eduServerOutput user_updates: userUpdates # The postgres connection configs for connecting to the databases e.g a postgres:// URI @@ -114,7 +114,7 @@ listen: public_rooms_api: "localhost:7775" federation_sender: "localhost:7776" appservice_api: "localhost:7777" - typing_server: "localhost:7778" + edu_server: "localhost:7778" # The configuration for tracing the dendrite components. tracing: diff --git a/docker/README.md b/docker/README.md index ee4f0f96f..83d0b6a87 100644 --- a/docker/README.md +++ b/docker/README.md @@ -58,7 +58,7 @@ docker-compose up kafka zookeeper postgres and the following dendrite components ``` -docker-compose up client_api media_api sync_api room_server public_rooms_api typing_server +docker-compose up client_api media_api sync_api room_server public_rooms_api edu_server docker-compose up client_api_proxy ``` diff --git a/docker/dendrite-docker.yml b/docker/dendrite-docker.yml index abb8c3307..a72ff3ddc 100644 --- a/docker/dendrite-docker.yml +++ b/docker/dendrite-docker.yml @@ -85,7 +85,7 @@ kafka: topics: output_room_event: roomserverOutput output_client_data: clientapiOutput - output_typing_event: typingServerOutput + output_typing_event: eduServerOutput user_updates: userUpdates @@ -114,7 +114,7 @@ listen: media_api: "media_api:7774" public_rooms_api: "public_rooms_api:7775" federation_sender: "federation_sender:7776" - typing_server: "typing_server:7777" + edu_server: "typing_server:7777" # The configuration for tracing the dendrite components. tracing: diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index d738ed3f0..957c3bf3f 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -103,10 +103,10 @@ services: networks: - internal - typing_server: - container_name: dendrite_typing_server - hostname: typing_server - entrypoint: ["bash", "./docker/services/typing-server.sh"] + edu_server: + container_name: dendrite_edu_server + hostname: edu_server + entrypoint: ["bash", "./docker/services/edu-server.sh"] build: ./ volumes: - ..:/build diff --git a/docker/services/edu-server.sh b/docker/services/edu-server.sh new file mode 100644 index 000000000..d40b9fa7e --- /dev/null +++ b/docker/services/edu-server.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +bash ./docker/build.sh + +./bin/dendrite-edu-server --config=dendrite.yaml diff --git a/docker/services/typing-server.sh b/docker/services/typing-server.sh deleted file mode 100644 index 16ee0fa62..000000000 --- a/docker/services/typing-server.sh +++ /dev/null @@ -1,5 +0,0 @@ -#!/bin/bash - -bash ./docker/build.sh - -./bin/dendrite-typing-server --config=dendrite.yaml diff --git a/docs/sytest.md b/docs/sytest.md index 9385ebff3..26a46c0bb 100644 --- a/docs/sytest.md +++ b/docs/sytest.md @@ -10,9 +10,9 @@ passes. ## Finding out which tests to add -We recommend you run the tests locally by manually setting up SyTest or using a -SyTest docker image. After running the tests, a script will print the tests you -need to add to `sytest-whitelist`. +We recommend you run the tests locally by using the SyTest docker image or +manually setting up SyTest. After running the tests, a script will print the +tests you need to add to `sytest-whitelist`. You should proceed after you see no build problems for dendrite after running: @@ -20,9 +20,32 @@ You should proceed after you see no build problems for dendrite after running: ./build.sh ``` +### Using the SyTest Docker image + +Use the following commands to pull the latest SyTest image and run the tests: + +```sh +docker pull matrixdotorg/sytest-dendrite +docker run --rm -v /path/to/dendrite/:/src/ -v /path/to/log/output/:/logs/ matrixdotorg/sytest-dendrite +``` + +`/path/to/dendrite/` should be replaced with the actual path to your dendrite +source code. The test results TAP file and homeserver logging output will go to +`/path/to/log/output`. The output of the command should tell you if you need to +add any tests to `sytest-whitelist`. + +When debugging, the following Docker `run` options may also be useful: +* `-v /path/to/sytest/:/sytest/`: Use your local SyTest repository at + `/path/to/sytest` instead of pulling from GitHub. This is useful when you want + to speed things up or make modifications to SyTest. +* `--entrypoint bash`: Prevent the container from automatically starting the + tests. When used, you need to manually run `/bootstrap.sh dendrite` inside + the container to start them. + ### Manually Setting up SyTest -Make sure you have Perl v5+ installed, and get SyTest with: +If you don't want to use the Docker image, you can also run SyTest by hand. Make +sure you have Perl 5 or above, and get SyTest with: (Note that this guide assumes your SyTest checkout is next to your `dendrite` checkout.) @@ -37,12 +60,23 @@ Set up the database: ```sh sudo -u postgres psql -c "CREATE USER dendrite PASSWORD 'itsasecret'" -sudo -u postgres psql -c "CREATE DATABASE sytest_template OWNER dendrite" +for i in dendrite0 dendrite1 sytest_template; do sudo -u postgres psql -c "CREATE DATABASE $i OWNER dendrite;"; done mkdir -p "server-0" cat > "server-0/database.yaml" << EOF args: user: dendrite - database: dendrite + password: itsasecret + database: dendrite0 + host: 127.0.0.1 + sslmode: disable +type: pg +EOF +mkdir -p "server-1" +cat > "server-1/database.yaml" << EOF +args: + user: dendrite + password: itsasecret + database: dendrite1 host: 127.0.0.1 sslmode: disable type: pg @@ -52,29 +86,20 @@ EOF Run the tests: ```sh -./run-tests.pl -I Dendrite::Monolith -d ../dendrite/bin -W ../dendrite/sytest-whitelist -O tap --all | tee results.tap +POSTGRES=1 ./run-tests.pl -I Dendrite::Monolith -d ../dendrite/bin -W ../dendrite/sytest-whitelist -O tap --all | tee results.tap ``` -where `tee` lets you see the results while they're being piped to the file. +where `tee` lets you see the results while they're being piped to the file, and +`POSTGRES=1` enables testing with PostgeSQL. If the `POSTGRES` environment +variable is not set or is set to 0, SyTest will fall back to SQLite 3. For more +flags and options, see https://github.com/matrix-org/sytest#running. Once the tests are complete, run the helper script to see if you need to add -any newly passing test names to `sytest-whitelist` in the project's root directory: +any newly passing test names to `sytest-whitelist` in the project's root +directory: ```sh ../dendrite/show-expected-fail-tests.sh results.tap ../dendrite/sytest-whitelist ../dendrite/sytest-blacklist ``` If the script prints nothing/exits with 0, then you're good to go. - -### Using a SyTest Docker image - -Ensure you have the latest image for SyTest, then run the tests: - -```sh -docker pull matrixdotorg/sytest-dendrite -docker run --rm -v /path/to/dendrite/:/src/ matrixdotorg/sytest-dendrite -``` - -where `/path/to/dendrite/` should be replaced with the actual path to your -dendrite source code. The output should tell you if you need to add any tests to -`sytest-whitelist`. diff --git a/typingserver/api/input.go b/eduserver/api/input.go similarity index 66% rename from typingserver/api/input.go rename to eduserver/api/input.go index 25e2ea228..be2d4c56a 100644 --- a/typingserver/api/input.go +++ b/eduserver/api/input.go @@ -15,6 +15,7 @@ package api import ( "context" + "errors" "net/http" commonHTTP "github.com/matrix-org/dendrite/common/http" @@ -30,13 +31,13 @@ type InputTypingEvent struct { RoomID string `json:"room_id"` // Typing is true if the user is typing, false if they have stopped. Typing bool `json:"typing"` - // Timeout is the interval for which the user should be marked as typing. - Timeout int64 `json:"timeout"` + // Timeout is the interval in milliseconds for which the user should be marked as typing. + TimeoutMS int64 `json:"timeout"` // OriginServerTS when the server received the update. OriginServerTS gomatrixserverlib.Timestamp `json:"origin_server_ts"` } -// InputTypingEventRequest is a request to TypingServerInputAPI +// InputTypingEventRequest is a request to EDUServerInputAPI type InputTypingEventRequest struct { InputTypingEvent InputTypingEvent `json:"input_typing_event"` } @@ -44,8 +45,8 @@ type InputTypingEventRequest struct { // InputTypingEventResponse is a response to InputTypingEvents type InputTypingEventResponse struct{} -// TypingServerInputAPI is used to write events to the typing server. -type TypingServerInputAPI interface { +// EDUServerInputAPI is used to write events to the typing server. +type EDUServerInputAPI interface { InputTypingEvent( ctx context.Context, request *InputTypingEventRequest, @@ -53,24 +54,24 @@ type TypingServerInputAPI interface { ) error } -// TypingServerInputTypingEventPath is the HTTP path for the InputTypingEvent API. -const TypingServerInputTypingEventPath = "/api/typingserver/input" +// EDUServerInputTypingEventPath is the HTTP path for the InputTypingEvent API. +const EDUServerInputTypingEventPath = "/api/eduserver/input" -// NewTypingServerInputAPIHTTP creates a TypingServerInputAPI implemented by talking to a HTTP POST API. -func NewTypingServerInputAPIHTTP(typingServerURL string, httpClient *http.Client) TypingServerInputAPI { +// NewEDUServerInputAPIHTTP creates a EDUServerInputAPI implemented by talking to a HTTP POST API. +func NewEDUServerInputAPIHTTP(eduServerURL string, httpClient *http.Client) (EDUServerInputAPI, error) { if httpClient == nil { - httpClient = http.DefaultClient + return nil, errors.New("NewTypingServerInputAPIHTTP: httpClient is ") } - return &httpTypingServerInputAPI{typingServerURL, httpClient} + return &httpEDUServerInputAPI{eduServerURL, httpClient}, nil } -type httpTypingServerInputAPI struct { - typingServerURL string - httpClient *http.Client +type httpEDUServerInputAPI struct { + eduServerURL string + httpClient *http.Client } -// InputRoomEvents implements TypingServerInputAPI -func (h *httpTypingServerInputAPI) InputTypingEvent( +// InputRoomEvents implements EDUServerInputAPI +func (h *httpEDUServerInputAPI) InputTypingEvent( ctx context.Context, request *InputTypingEventRequest, response *InputTypingEventResponse, @@ -78,6 +79,6 @@ func (h *httpTypingServerInputAPI) InputTypingEvent( span, ctx := opentracing.StartSpanFromContext(ctx, "InputTypingEvent") defer span.Finish() - apiURL := h.typingServerURL + TypingServerInputTypingEventPath + apiURL := h.eduServerURL + EDUServerInputTypingEventPath return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response) } diff --git a/typingserver/api/output.go b/eduserver/api/output.go similarity index 100% rename from typingserver/api/output.go rename to eduserver/api/output.go diff --git a/typingserver/cache/cache.go b/eduserver/cache/cache.go similarity index 87% rename from typingserver/cache/cache.go rename to eduserver/cache/cache.go index 3f05c938e..46f7a2b13 100644 --- a/typingserver/cache/cache.go +++ b/eduserver/cache/cache.go @@ -32,8 +32,8 @@ type roomData struct { userSet userSet } -// TypingCache maintains a list of users typing in each room. -type TypingCache struct { +// EDUCache maintains a list of users typing in each room. +type EDUCache struct { sync.RWMutex latestSyncPosition int64 data map[string]*roomData @@ -42,26 +42,26 @@ type TypingCache struct { // Create a roomData with its sync position set to the latest sync position. // Must only be called after locking the cache. -func (t *TypingCache) newRoomData() *roomData { +func (t *EDUCache) newRoomData() *roomData { return &roomData{ syncPosition: t.latestSyncPosition, userSet: make(userSet), } } -// NewTypingCache returns a new TypingCache initialised for use. -func NewTypingCache() *TypingCache { - return &TypingCache{data: make(map[string]*roomData)} +// New returns a new EDUCache initialised for use. +func New() *EDUCache { + return &EDUCache{data: make(map[string]*roomData)} } // SetTimeoutCallback sets a callback function that is called right after // a user is removed from the typing user list due to timeout. -func (t *TypingCache) SetTimeoutCallback(fn TimeoutCallbackFn) { +func (t *EDUCache) SetTimeoutCallback(fn TimeoutCallbackFn) { t.timeoutCallback = fn } // GetTypingUsers returns the list of users typing in a room. -func (t *TypingCache) GetTypingUsers(roomID string) []string { +func (t *EDUCache) GetTypingUsers(roomID string) []string { users, _ := t.GetTypingUsersIfUpdatedAfter(roomID, 0) // 0 should work above because the first position used will be 1. return users @@ -70,7 +70,7 @@ func (t *TypingCache) GetTypingUsers(roomID string) []string { // GetTypingUsersIfUpdatedAfter returns all users typing in this room with // updated == true if the typing sync position of the room is after the given // position. Otherwise, returns an empty slice with updated == false. -func (t *TypingCache) GetTypingUsersIfUpdatedAfter( +func (t *EDUCache) GetTypingUsersIfUpdatedAfter( roomID string, position int64, ) (users []string, updated bool) { t.RLock() @@ -93,7 +93,7 @@ func (t *TypingCache) GetTypingUsersIfUpdatedAfter( // expire is the time when the user typing should time out. // if expire is nil, defaultTypingTimeout is assumed. // Returns the latest sync position for typing after update. -func (t *TypingCache) AddTypingUser( +func (t *EDUCache) AddTypingUser( userID, roomID string, expire *time.Time, ) int64 { expireTime := getExpireTime(expire) @@ -111,7 +111,7 @@ func (t *TypingCache) AddTypingUser( // addUser with mutex lock & replace the previous timer. // Returns the latest typing sync position after update. -func (t *TypingCache) addUser( +func (t *EDUCache) addUser( userID, roomID string, expiryTimer *time.Timer, ) int64 { t.Lock() @@ -143,7 +143,7 @@ func (t *TypingCache) addUser( // RemoveUser with mutex lock & stop the timer. // Returns the latest sync position for typing after update. -func (t *TypingCache) RemoveUser(userID, roomID string) int64 { +func (t *EDUCache) RemoveUser(userID, roomID string) int64 { t.Lock() defer t.Unlock() @@ -166,7 +166,7 @@ func (t *TypingCache) RemoveUser(userID, roomID string) int64 { return t.latestSyncPosition } -func (t *TypingCache) GetLatestSyncPosition() int64 { +func (t *EDUCache) GetLatestSyncPosition() int64 { t.Lock() defer t.Unlock() return t.latestSyncPosition diff --git a/typingserver/cache/cache_test.go b/eduserver/cache/cache_test.go similarity index 88% rename from typingserver/cache/cache_test.go rename to eduserver/cache/cache_test.go index 2a6ffa50e..8a1b6f797 100644 --- a/typingserver/cache/cache_test.go +++ b/eduserver/cache/cache_test.go @@ -19,10 +19,10 @@ import ( "github.com/matrix-org/dendrite/common/test" ) -func TestTypingCache(t *testing.T) { - tCache := NewTypingCache() +func TestEDUCache(t *testing.T) { + tCache := New() if tCache == nil { - t.Fatal("NewTypingCache failed") + t.Fatal("New failed") } t.Run("AddTypingUser", func(t *testing.T) { @@ -38,7 +38,7 @@ func TestTypingCache(t *testing.T) { }) } -func testAddTypingUser(t *testing.T, tCache *TypingCache) { // nolint: unparam +func testAddTypingUser(t *testing.T, tCache *EDUCache) { // nolint: unparam present := time.Now() tests := []struct { userID string @@ -58,7 +58,7 @@ func testAddTypingUser(t *testing.T, tCache *TypingCache) { // nolint: unparam } } -func testGetTypingUsers(t *testing.T, tCache *TypingCache) { +func testGetTypingUsers(t *testing.T, tCache *EDUCache) { tests := []struct { roomID string wantUsers []string @@ -75,7 +75,7 @@ func testGetTypingUsers(t *testing.T, tCache *TypingCache) { } } -func testRemoveUser(t *testing.T, tCache *TypingCache) { +func testRemoveUser(t *testing.T, tCache *EDUCache) { tests := []struct { roomID string userIDs []string diff --git a/typingserver/typingserver.go b/eduserver/eduserver.go similarity index 66% rename from typingserver/typingserver.go rename to eduserver/eduserver.go index b43f72f75..8ddd2c527 100644 --- a/typingserver/typingserver.go +++ b/eduserver/eduserver.go @@ -10,27 +10,27 @@ // See the License for the specific language governing permissions and // limitations under the License. -package typingserver +package eduserver import ( "net/http" "github.com/matrix-org/dendrite/common/basecomponent" - "github.com/matrix-org/dendrite/typingserver/api" - "github.com/matrix-org/dendrite/typingserver/cache" - "github.com/matrix-org/dendrite/typingserver/input" + "github.com/matrix-org/dendrite/eduserver/api" + "github.com/matrix-org/dendrite/eduserver/cache" + "github.com/matrix-org/dendrite/eduserver/input" ) -// SetupTypingServerComponent sets up and registers HTTP handlers for the -// TypingServer component. Returns instances of the various roomserver APIs, +// SetupEDUServerComponent sets up and registers HTTP handlers for the +// EDUServer component. Returns instances of the various roomserver APIs, // allowing other components running in the same process to hit the query the // APIs directly instead of having to use HTTP. -func SetupTypingServerComponent( +func SetupEDUServerComponent( base *basecomponent.BaseDendrite, - typingCache *cache.TypingCache, -) api.TypingServerInputAPI { - inputAPI := &input.TypingServerInputAPI{ - Cache: typingCache, + eduCache *cache.EDUCache, +) api.EDUServerInputAPI { + inputAPI := &input.EDUServerInputAPI{ + Cache: eduCache, Producer: base.KafkaProducer, OutputTypingEventTopic: string(base.Cfg.Kafka.Topics.OutputTypingEvent), } diff --git a/typingserver/input/input.go b/eduserver/input/input.go similarity index 77% rename from typingserver/input/input.go rename to eduserver/input/input.go index 0e2fbe51f..845909452 100644 --- a/typingserver/input/input.go +++ b/eduserver/input/input.go @@ -19,25 +19,25 @@ import ( "time" "github.com/matrix-org/dendrite/common" - "github.com/matrix-org/dendrite/typingserver/api" - "github.com/matrix-org/dendrite/typingserver/cache" + "github.com/matrix-org/dendrite/eduserver/api" + "github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" "gopkg.in/Shopify/sarama.v1" ) -// TypingServerInputAPI implements api.TypingServerInputAPI -type TypingServerInputAPI struct { +// EDUServerInputAPI implements api.EDUServerInputAPI +type EDUServerInputAPI struct { // Cache to store the current typing members in each room. - Cache *cache.TypingCache + Cache *cache.EDUCache // The kafka topic to output new typing events to. OutputTypingEventTopic string // kafka producer Producer sarama.SyncProducer } -// InputTypingEvent implements api.TypingServerInputAPI -func (t *TypingServerInputAPI) InputTypingEvent( +// InputTypingEvent implements api.EDUServerInputAPI +func (t *EDUServerInputAPI) InputTypingEvent( ctx context.Context, request *api.InputTypingEventRequest, response *api.InputTypingEventResponse, @@ -46,7 +46,7 @@ func (t *TypingServerInputAPI) InputTypingEvent( if ite.Typing { // user is typing, update our current state of users typing. expireTime := ite.OriginServerTS.Time().Add( - time.Duration(ite.Timeout) * time.Millisecond, + time.Duration(ite.TimeoutMS) * time.Millisecond, ) t.Cache.AddTypingUser(ite.UserID, ite.RoomID, &expireTime) } else { @@ -56,7 +56,7 @@ func (t *TypingServerInputAPI) InputTypingEvent( return t.sendEvent(ite) } -func (t *TypingServerInputAPI) sendEvent(ite *api.InputTypingEvent) error { +func (t *EDUServerInputAPI) sendEvent(ite *api.InputTypingEvent) error { ev := &api.TypingEvent{ Type: gomatrixserverlib.MTyping, RoomID: ite.RoomID, @@ -69,7 +69,7 @@ func (t *TypingServerInputAPI) sendEvent(ite *api.InputTypingEvent) error { if ev.Typing { expireTime := ite.OriginServerTS.Time().Add( - time.Duration(ite.Timeout) * time.Millisecond, + time.Duration(ite.TimeoutMS) * time.Millisecond, ) ote.ExpireTime = &expireTime } @@ -89,9 +89,9 @@ func (t *TypingServerInputAPI) sendEvent(ite *api.InputTypingEvent) error { return err } -// SetupHTTP adds the TypingServerInputAPI handlers to the http.ServeMux. -func (t *TypingServerInputAPI) SetupHTTP(servMux *http.ServeMux) { - servMux.Handle(api.TypingServerInputTypingEventPath, +// SetupHTTP adds the EDUServerInputAPI handlers to the http.ServeMux. +func (t *EDUServerInputAPI) SetupHTTP(servMux *http.ServeMux) { + servMux.Handle(api.EDUServerInputTypingEventPath, common.MakeInternalAPI("inputTypingEvents", func(req *http.Request) util.JSONResponse { var request api.InputTypingEventRequest var response api.InputTypingEventResponse diff --git a/federationapi/federationapi.go b/federationapi/federationapi.go index 90db95b3a..ed96322b8 100644 --- a/federationapi/federationapi.go +++ b/federationapi/federationapi.go @@ -41,12 +41,13 @@ func SetupFederationAPIComponent( queryAPI roomserverAPI.RoomserverQueryAPI, asAPI appserviceAPI.AppServiceQueryAPI, federationSenderAPI federationSenderAPI.FederationSenderQueryAPI, + eduProducer *producers.EDUServerProducer, ) { roomserverProducer := producers.NewRoomserverProducer(inputAPI, queryAPI) routing.Setup( base.APIMux, base.Cfg, queryAPI, aliasAPI, asAPI, - roomserverProducer, federationSenderAPI, *keyRing, + roomserverProducer, eduProducer, federationSenderAPI, *keyRing, federation, accountsDB, deviceDB, ) } diff --git a/federationapi/routing/invite.go b/federationapi/routing/invite.go index 09c3734be..6c3e12e23 100644 --- a/federationapi/routing/invite.go +++ b/federationapi/routing/invite.go @@ -15,18 +15,17 @@ package routing import ( - "context" + "encoding/json" "net/http" "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/common/config" - "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" ) -// Invite implements /_matrix/federation/v1/invite/{roomID}/{eventID} +// Invite implements /_matrix/federation/v2/invite/{roomID}/{eventID} func Invite( httpReq *http.Request, request *gomatrixserverlib.FederationRequest, @@ -36,24 +35,14 @@ func Invite( producer *producers.RoomserverProducer, keys gomatrixserverlib.KeyRing, ) util.JSONResponse { - // Look up the room version for the room. - verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID} - verRes := api.QueryRoomVersionForRoomResponse{} - if err := producer.QueryAPI.QueryRoomVersionForRoom(context.Background(), &verReq, &verRes); err != nil { + inviteReq := gomatrixserverlib.InviteV2Request{} + if err := json.Unmarshal(request.Content(), &inviteReq); err != nil { return util.JSONResponse{ Code: http.StatusBadRequest, - JSON: jsonerror.UnsupportedRoomVersion(err.Error()), - } - } - - // Decode the event JSON from the request. - event, err := gomatrixserverlib.NewEventFromUntrustedJSON(request.Content(), verRes.RoomVersion) - if err != nil { - return util.JSONResponse{ - Code: http.StatusBadRequest, - JSON: jsonerror.NotJSON("The request body could not be decoded into valid JSON. " + err.Error()), + JSON: jsonerror.NotJSON("The request body could not be decoded into an invite request. " + err.Error()), } } + event := inviteReq.Event() // Check that the room ID is correct. if event.RoomID() != roomID { @@ -71,14 +60,6 @@ func Invite( } } - // Check that the event is from the server sending the request. - if event.Origin() != request.Origin() { - return util.JSONResponse{ - Code: http.StatusForbidden, - JSON: jsonerror.Forbidden("The invite must be sent by the server it originated on"), - } - } - // Check that the event is signed by the server sending the request. redacted := event.Redact() verifyRequests := []gomatrixserverlib.VerifyJSONRequest{{ @@ -104,7 +85,11 @@ func Invite( ) // Add the invite event to the roomserver. - if err = producer.SendInvite(httpReq.Context(), signedEvent); err != nil { + if err = producer.SendInvite( + httpReq.Context(), + signedEvent.Headered(inviteReq.RoomVersion()), + inviteReq.InviteRoomState(), + ); err != nil { util.GetLogger(httpReq.Context()).WithError(err).Error("producer.SendInvite failed") return jsonerror.InternalServerError() } diff --git a/federationapi/routing/join.go b/federationapi/routing/join.go index a39ff6394..0a7b23000 100644 --- a/federationapi/routing/join.go +++ b/federationapi/routing/join.go @@ -15,6 +15,7 @@ package routing import ( + "fmt" "net/http" "time" @@ -34,6 +35,7 @@ func MakeJoin( cfg *config.Dendrite, query api.RoomserverQueryAPI, roomID, userID string, + remoteVersions []gomatrixserverlib.RoomVersion, ) util.JSONResponse { verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID} verRes := api.QueryRoomVersionForRoomResponse{} @@ -44,6 +46,27 @@ func MakeJoin( } } + // Check that the room that the remote side is trying to join is actually + // one of the room versions that they listed in their supported ?ver= in + // the make_join URL. + // https://matrix.org/docs/spec/server_server/r0.1.3#get-matrix-federation-v1-make-join-roomid-userid + remoteSupportsVersion := false + for _, v := range remoteVersions { + if v == verRes.RoomVersion { + remoteSupportsVersion = true + break + } + } + // If it isn't, stop trying to join the room. + if !remoteSupportsVersion { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.UnsupportedRoomVersion( + fmt.Sprintf("Joining server does not support room version %s", verRes.RoomVersion), + ), + } + } + _, domain, err := gomatrixserverlib.SplitID('@', userID) if err != nil { return util.JSONResponse{ @@ -140,7 +163,12 @@ func SendJoin( if event.RoomID() != roomID { return util.JSONResponse{ Code: http.StatusBadRequest, - JSON: jsonerror.BadJSON("The room ID in the request path must match the room ID in the join event JSON"), + JSON: jsonerror.BadJSON( + fmt.Sprintf( + "The room ID in the request path (%q) must match the room ID in the join event JSON (%q)", + roomID, event.RoomID(), + ), + ), } } @@ -148,7 +176,12 @@ func SendJoin( if event.EventID() != eventID { return util.JSONResponse{ Code: http.StatusBadRequest, - JSON: jsonerror.BadJSON("The event ID in the request path must match the event ID in the join event JSON"), + JSON: jsonerror.BadJSON( + fmt.Sprintf( + "The event ID in the request path (%q) must match the event ID in the join event JSON (%q)", + eventID, event.EventID(), + ), + ), } } @@ -186,6 +219,7 @@ func SendJoin( PrevEventIDs: event.PrevEventIDs(), AuthEventIDs: event.AuthEventIDs(), RoomID: roomID, + ResolveState: true, }, &stateAndAuthChainResponse) if err != nil { util.GetLogger(httpReq.Context()).WithError(err).Error("query.QueryStateAndAuthChain failed") diff --git a/federationapi/routing/routing.go b/federationapi/routing/routing.go index b5c8e53de..83bac5550 100644 --- a/federationapi/routing/routing.go +++ b/federationapi/routing/routing.go @@ -48,6 +48,7 @@ func Setup( aliasAPI roomserverAPI.RoomserverAliasAPI, asAPI appserviceAPI.AppServiceQueryAPI, producer *producers.RoomserverProducer, + eduProducer *producers.EDUServerProducer, federationSenderAPI federationSenderAPI.FederationSenderQueryAPI, keys gomatrixserverlib.KeyRing, federation *gomatrixserverlib.FederationClient, @@ -79,12 +80,12 @@ func Setup( } return Send( httpReq, request, gomatrixserverlib.TransactionID(vars["txnID"]), - cfg, query, producer, keys, federation, + cfg, query, producer, eduProducer, keys, federation, ) }, )).Methods(http.MethodPut, http.MethodOptions) - v1fedmux.Handle("/invite/{roomID}/{eventID}", common.MakeFedAPI( + v2fedmux.Handle("/invite/{roomID}/{eventID}", common.MakeFedAPI( "federation_invite", cfg.Matrix.ServerName, keys, func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest) util.JSONResponse { vars, err := common.URLDecodeMapValues(mux.Vars(httpReq)) @@ -197,7 +198,7 @@ func Setup( }, )).Methods(http.MethodGet) - v1fedmux.Handle("/make_join/{roomID}/{userID}", common.MakeFedAPI( + v1fedmux.Handle("/make_join/{roomID}/{eventID}", common.MakeFedAPI( "federation_make_join", cfg.Matrix.ServerName, keys, func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest) util.JSONResponse { vars, err := common.URLDecodeMapValues(mux.Vars(httpReq)) @@ -205,14 +206,28 @@ func Setup( return util.ErrorResponse(err) } roomID := vars["roomID"] - userID := vars["userID"] + eventID := vars["eventID"] + queryVars := httpReq.URL.Query() + remoteVersions := []gomatrixserverlib.RoomVersion{} + if vers, ok := queryVars["ver"]; ok { + // The remote side supplied a ?=ver so use that to build up the list + // of supported room versions + for _, v := range vers { + remoteVersions = append(remoteVersions, gomatrixserverlib.RoomVersion(v)) + } + } else { + // The remote side didn't supply a ?ver= so just assume that they only + // support room version 1, as per the spec + // https://matrix.org/docs/spec/server_server/r0.1.3#get-matrix-federation-v1-make-join-roomid-userid + remoteVersions = append(remoteVersions, gomatrixserverlib.RoomVersionV1) + } return MakeJoin( - httpReq, request, cfg, query, roomID, userID, + httpReq, request, cfg, query, roomID, eventID, remoteVersions, ) }, )).Methods(http.MethodGet) - v2fedmux.Handle("/send_join/{roomID}/{userID}", common.MakeFedAPI( + v2fedmux.Handle("/send_join/{roomID}/{eventID}", common.MakeFedAPI( "federation_send_join", cfg.Matrix.ServerName, keys, func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest) util.JSONResponse { vars, err := common.URLDecodeMapValues(mux.Vars(httpReq)) @@ -220,14 +235,14 @@ func Setup( return util.ErrorResponse(err) } roomID := vars["roomID"] - userID := vars["userID"] + eventID := vars["eventID"] return SendJoin( - httpReq, request, cfg, query, producer, keys, roomID, userID, + httpReq, request, cfg, query, producer, keys, roomID, eventID, ) }, )).Methods(http.MethodPut) - v1fedmux.Handle("/make_leave/{roomID}/{userID}", common.MakeFedAPI( + v1fedmux.Handle("/make_leave/{roomID}/{eventID}", common.MakeFedAPI( "federation_make_leave", cfg.Matrix.ServerName, keys, func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest) util.JSONResponse { vars, err := common.URLDecodeMapValues(mux.Vars(httpReq)) @@ -235,14 +250,14 @@ func Setup( return util.ErrorResponse(err) } roomID := vars["roomID"] - userID := vars["userID"] + eventID := vars["eventID"] return MakeLeave( - httpReq, request, cfg, query, roomID, userID, + httpReq, request, cfg, query, roomID, eventID, ) }, )).Methods(http.MethodGet) - v2fedmux.Handle("/send_leave/{roomID}/{userID}", common.MakeFedAPI( + v2fedmux.Handle("/send_leave/{roomID}/{eventID}", common.MakeFedAPI( "federation_send_leave", cfg.Matrix.ServerName, keys, func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest) util.JSONResponse { vars, err := common.URLDecodeMapValues(mux.Vars(httpReq)) @@ -250,9 +265,9 @@ func Setup( return util.ErrorResponse(err) } roomID := vars["roomID"] - userID := vars["userID"] + eventID := vars["eventID"] return SendLeave( - httpReq, request, cfg, producer, keys, roomID, userID, + httpReq, request, cfg, producer, keys, roomID, eventID, ) }, )).Methods(http.MethodPut) diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index 4c92c7e5e..1013a44cf 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -36,20 +36,22 @@ func Send( cfg *config.Dendrite, query api.RoomserverQueryAPI, producer *producers.RoomserverProducer, + eduProducer *producers.EDUServerProducer, keys gomatrixserverlib.KeyRing, federation *gomatrixserverlib.FederationClient, ) util.JSONResponse { t := txnReq{ - context: httpReq.Context(), - query: query, - producer: producer, - keys: keys, - federation: federation, + context: httpReq.Context(), + query: query, + producer: producer, + eduProducer: eduProducer, + keys: keys, + federation: federation, } var txnEvents struct { - PDUs []json.RawMessage `json:"pdus"` - EDUs []json.RawMessage `json:"edus"` + PDUs []json.RawMessage `json:"pdus"` + EDUs []gomatrixserverlib.EDU `json:"edus"` } if err := json.Unmarshal(request.Content(), &txnEvents); err != nil { @@ -59,7 +61,9 @@ func Send( } } + // TODO: Really we should have a function to convert FederationRequest to txnReq t.PDUs = txnEvents.PDUs + t.EDUs = txnEvents.EDUs t.Origin = request.Origin() t.TransactionID = txnID t.Destination = cfg.Matrix.ServerName @@ -80,11 +84,12 @@ func Send( type txnReq struct { gomatrixserverlib.Transaction - context context.Context - query api.RoomserverQueryAPI - producer *producers.RoomserverProducer - keys gomatrixserverlib.KeyRing - federation *gomatrixserverlib.FederationClient + context context.Context + query api.RoomserverQueryAPI + producer *producers.RoomserverProducer + eduProducer *producers.EDUServerProducer + keys gomatrixserverlib.KeyRing + federation *gomatrixserverlib.FederationClient } func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) { @@ -152,7 +157,7 @@ func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) { } } - // TODO: Process the EDUs. + t.processEDUs(t.EDUs) util.GetLogger(t.context).Infof("Processed %d PDUs from transaction %q", len(results), t.TransactionID) return &gomatrixserverlib.RespSend{PDUs: results}, nil } @@ -163,6 +168,29 @@ type unknownRoomError struct { func (e unknownRoomError) Error() string { return fmt.Sprintf("unknown room %q", e.roomID) } +func (t *txnReq) processEDUs(edus []gomatrixserverlib.EDU) { + for _, e := range edus { + switch e.Type { + case gomatrixserverlib.MTyping: + // https://matrix.org/docs/spec/server_server/latest#typing-notifications + var typingPayload struct { + RoomID string `json:"room_id"` + UserID string `json:"user_id"` + Typing bool `json:"typing"` + } + if err := json.Unmarshal(e.Content, &typingPayload); err != nil { + util.GetLogger(t.context).WithError(err).Error("Failed to unmarshal typing event") + continue + } + if err := t.eduProducer.SendTyping(t.context, typingPayload.UserID, typingPayload.RoomID, typingPayload.Typing, 30*1000); err != nil { + util.GetLogger(t.context).WithError(err).Error("Failed to send typing event to edu server") + } + default: + util.GetLogger(t.context).WithField("type", e.Type).Warn("unhandled edu") + } + } +} + func (t *txnReq) processEvent(e gomatrixserverlib.Event) error { prevEventIDs := e.PrevEventIDs() diff --git a/federationapi/routing/state.go b/federationapi/routing/state.go index 6a47882b7..548598dd7 100644 --- a/federationapi/routing/state.go +++ b/federationapi/routing/state.go @@ -107,7 +107,6 @@ func getState( return nil, &util.JSONResponse{Code: http.StatusNotFound, JSON: nil} } - prevEventIDs := getIDsFromEventRef(event.PrevEvents()) authEventIDs := getIDsFromEventRef(event.AuthEvents()) var response api.QueryStateAndAuthChainResponse @@ -115,7 +114,7 @@ func getState( ctx, &api.QueryStateAndAuthChainRequest{ RoomID: roomID, - PrevEventIDs: prevEventIDs, + PrevEventIDs: []string{eventID}, AuthEventIDs: authEventIDs, }, &response, diff --git a/federationsender/api/query.go b/federationsender/api/query.go index ebc6e833f..7c0ca7ff2 100644 --- a/federationsender/api/query.go +++ b/federationsender/api/query.go @@ -2,6 +2,7 @@ package api import ( "context" + "errors" "net/http" commonHTTP "github.com/matrix-org/dendrite/common/http" @@ -58,12 +59,12 @@ const FederationSenderQueryJoinedHostsInRoomPath = "/api/federationsender/queryJ const FederationSenderQueryJoinedHostServerNamesInRoomPath = "/api/federationsender/queryJoinedHostServerNamesInRoom" // NewFederationSenderQueryAPIHTTP creates a FederationSenderQueryAPI implemented by talking to a HTTP POST API. -// If httpClient is nil then it uses the http.DefaultClient -func NewFederationSenderQueryAPIHTTP(federationSenderURL string, httpClient *http.Client) FederationSenderQueryAPI { +// If httpClient is nil an error is returned +func NewFederationSenderQueryAPIHTTP(federationSenderURL string, httpClient *http.Client) (FederationSenderQueryAPI, error) { if httpClient == nil { - httpClient = http.DefaultClient + return nil, errors.New("NewFederationSenderQueryAPIHTTP: httpClient is ") } - return &httpFederationSenderQueryAPI{federationSenderURL, httpClient} + return &httpFederationSenderQueryAPI{federationSenderURL, httpClient}, nil } type httpFederationSenderQueryAPI struct { diff --git a/federationsender/consumers/typingserver.go b/federationsender/consumers/eduserver.go similarity index 76% rename from federationsender/consumers/typingserver.go rename to federationsender/consumers/eduserver.go index 590fcb257..4d2445f3c 100644 --- a/federationsender/consumers/typingserver.go +++ b/federationsender/consumers/eduserver.go @@ -18,15 +18,15 @@ import ( "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common/config" + "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/federationsender/queue" "github.com/matrix-org/dendrite/federationsender/storage" - "github.com/matrix-org/dendrite/typingserver/api" "github.com/matrix-org/gomatrixserverlib" log "github.com/sirupsen/logrus" "gopkg.in/Shopify/sarama.v1" ) -// OutputTypingEventConsumer consumes events that originate in typing server. +// OutputTypingEventConsumer consumes events that originate in EDU server. type OutputTypingEventConsumer struct { consumer *common.ContinualConsumer db storage.Database @@ -34,7 +34,7 @@ type OutputTypingEventConsumer struct { ServerName gomatrixserverlib.ServerName } -// NewOutputTypingEventConsumer creates a new OutputTypingEventConsumer. Call Start() to begin consuming from typing servers. +// NewOutputTypingEventConsumer creates a new OutputTypingEventConsumer. Call Start() to begin consuming from EDU servers. func NewOutputTypingEventConsumer( cfg *config.Dendrite, kafkaConsumer sarama.Consumer, @@ -57,19 +57,30 @@ func NewOutputTypingEventConsumer( return c } -// Start consuming from typing servers +// Start consuming from EDU servers func (t *OutputTypingEventConsumer) Start() error { return t.consumer.Start() } -// onMessage is called for OutputTypingEvent received from the typing servers. +// onMessage is called for OutputTypingEvent received from the EDU servers. // Parses the msg, creates a matrix federation EDU and sends it to joined hosts. func (t *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { // Extract the typing event from msg. var ote api.OutputTypingEvent if err := json.Unmarshal(msg.Value, &ote); err != nil { // Skip this msg but continue processing messages. - log.WithError(err).Errorf("typingserver output log: message parse failed") + log.WithError(err).Errorf("eduserver output log: message parse failed") + return nil + } + + // only send typing events which originated from us + _, typingServerName, err := gomatrixserverlib.SplitID('@', ote.Event.UserID) + if err != nil { + log.WithError(err).WithField("user_id", ote.Event.UserID).Error("Failed to extract domain from typing sender") + return nil + } + if typingServerName != t.ServerName { + log.WithField("other_server", typingServerName).Info("Suppressing typing notif: originated elsewhere") return nil } diff --git a/federationsender/consumers/roomserver.go b/federationsender/consumers/roomserver.go index 8ab2affe2..f59405af0 100644 --- a/federationsender/consumers/roomserver.go +++ b/federationsender/consumers/roomserver.go @@ -32,6 +32,7 @@ import ( // OutputRoomEventConsumer consumes events that originated in the room server. type OutputRoomEventConsumer struct { + cfg *config.Dendrite roomServerConsumer *common.ContinualConsumer db storage.Database queues *queue.OutgoingQueues @@ -52,6 +53,7 @@ func NewOutputRoomEventConsumer( PartitionStore: store, } s := &OutputRoomEventConsumer{ + cfg: cfg, roomServerConsumer: &consumer, db: store, queues: queues, @@ -79,29 +81,48 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { log.WithError(err).Errorf("roomserver output log: message parse failure") return nil } - if output.Type != api.OutputTypeNewRoomEvent { + + switch output.Type { + case api.OutputTypeNewRoomEvent: + ev := &output.NewRoomEvent.Event + log.WithFields(log.Fields{ + "event_id": ev.EventID(), + "room_id": ev.RoomID(), + "send_as_server": output.NewRoomEvent.SendAsServer, + }).Info("received room event from roomserver") + + if err := s.processMessage(*output.NewRoomEvent); err != nil { + // panic rather than continue with an inconsistent database + log.WithFields(log.Fields{ + "event": string(ev.JSON()), + "add": output.NewRoomEvent.AddsStateEventIDs, + "del": output.NewRoomEvent.RemovesStateEventIDs, + log.ErrorKey: err, + }).Panicf("roomserver output log: write room event failure") + return nil + } + case api.OutputTypeNewInviteEvent: + ev := &output.NewInviteEvent.Event + log.WithFields(log.Fields{ + "event_id": ev.EventID(), + "room_id": ev.RoomID(), + "state_key": ev.StateKey(), + }).Info("received invite event from roomserver") + + if err := s.processInvite(*output.NewInviteEvent); err != nil { + // panic rather than continue with an inconsistent database + log.WithFields(log.Fields{ + "event": string(ev.JSON()), + log.ErrorKey: err, + }).Panicf("roomserver output log: write invite event failure") + return nil + } + default: log.WithField("type", output.Type).Debug( "roomserver output log: ignoring unknown output type", ) return nil } - ev := &output.NewRoomEvent.Event - log.WithFields(log.Fields{ - "event_id": ev.EventID(), - "room_id": ev.RoomID(), - "send_as_server": output.NewRoomEvent.SendAsServer, - }).Info("received event from roomserver") - - if err := s.processMessage(*output.NewRoomEvent); err != nil { - // panic rather than continue with an inconsistent database - log.WithFields(log.Fields{ - "event": string(ev.JSON()), - log.ErrorKey: err, - "add": output.NewRoomEvent.AddsStateEventIDs, - "del": output.NewRoomEvent.RemovesStateEventIDs, - }).Panicf("roomserver output log: write event failure") - return nil - } return nil } @@ -159,6 +180,69 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err ) } +// processInvite handles an invite event for sending over federation. +func (s *OutputRoomEventConsumer) processInvite(oie api.OutputNewInviteEvent) error { + // Don't try to reflect and resend invites that didn't originate from us. + if s.cfg.Matrix.ServerName != oie.Event.Origin() { + return nil + } + + // When sending a v2 invite, the inviting server should try and include + // a "stripped down" version of the room state. This is pretty much just + // enough information for the remote side to show something useful to the + // user, like the room name, aliases etc. + strippedState := []gomatrixserverlib.InviteV2StrippedState{} + stateWanted := []string{ + gomatrixserverlib.MRoomName, gomatrixserverlib.MRoomCanonicalAlias, + gomatrixserverlib.MRoomAliases, gomatrixserverlib.MRoomJoinRules, + } + + // For each of the state keys that we want to try and send, ask the + // roomserver if we have a state event for that room that matches the + // state key. + for _, wanted := range stateWanted { + queryReq := api.QueryLatestEventsAndStateRequest{ + RoomID: oie.Event.RoomID(), + StateToFetch: []gomatrixserverlib.StateKeyTuple{ + gomatrixserverlib.StateKeyTuple{ + EventType: wanted, + StateKey: "", + }, + }, + } + // If this fails then we just move onto the next event - we don't + // actually know at this point whether the room even has that type + // of state. + queryRes := api.QueryLatestEventsAndStateResponse{} + if err := s.query.QueryLatestEventsAndState(context.TODO(), &queryReq, &queryRes); err != nil { + log.WithFields(log.Fields{ + "room_id": queryReq.RoomID, + "event_type": wanted, + }).WithError(err).Info("couldn't find state to strip") + continue + } + // Append the stripped down copy of the state to our list. + for _, headeredEvent := range queryRes.StateEvents { + event := headeredEvent.Unwrap() + strippedState = append(strippedState, gomatrixserverlib.NewInviteV2StrippedState(&event)) + + log.WithFields(log.Fields{ + "room_id": queryReq.RoomID, + "event_type": event.Type(), + }).Info("adding stripped state") + } + } + + // Build the invite request with the info we've got. + inviteReq, err := gomatrixserverlib.NewInviteV2Request(&oie.Event, strippedState) + if err != nil { + return fmt.Errorf("gomatrixserverlib.NewInviteV2Request: %w", err) + } + + // Send the event. + return s.queues.SendInvite(&inviteReq) +} + // joinedHostsAtEvent works out a list of matrix servers that were joined to // the room at the event. // It is important to use the state at the event for sending messages because: diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index b4a6da1a3..7d4dc850b 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -24,6 +24,7 @@ import ( "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" log "github.com/sirupsen/logrus" + "go.uber.org/atomic" ) // destinationQueue is a queue of events for a single destination. @@ -34,14 +35,15 @@ type destinationQueue struct { client *gomatrixserverlib.FederationClient origin gomatrixserverlib.ServerName destination gomatrixserverlib.ServerName - // The running mutex protects running, sentCounter, lastTransactionIDs and + running atomic.Bool + // The running mutex protects sentCounter, lastTransactionIDs and // pendingEvents, pendingEDUs. runningMutex sync.Mutex - running bool sentCounter int lastTransactionIDs []gomatrixserverlib.TransactionID pendingEvents []*gomatrixserverlib.HeaderedEvent pendingEDUs []*gomatrixserverlib.EDU + pendingInvites []*gomatrixserverlib.InviteV2Request } // Send event adds the event to the pending queue for the destination. @@ -51,29 +53,43 @@ func (oq *destinationQueue) sendEvent(ev *gomatrixserverlib.HeaderedEvent) { oq.runningMutex.Lock() defer oq.runningMutex.Unlock() oq.pendingEvents = append(oq.pendingEvents, ev) - if !oq.running { - oq.running = true + if !oq.running.Load() { go oq.backgroundSend() } } // sendEDU adds the EDU event to the pending queue for the destination. // If the queue is empty then it starts a background goroutine to -// start sending event to that destination. +// start sending events to that destination. func (oq *destinationQueue) sendEDU(e *gomatrixserverlib.EDU) { oq.runningMutex.Lock() defer oq.runningMutex.Unlock() oq.pendingEDUs = append(oq.pendingEDUs, e) - if !oq.running { - oq.running = true + if !oq.running.Load() { go oq.backgroundSend() } } +// sendInvite adds the invite event to the pending queue for the +// destination. If the queue is empty then it starts a background +// goroutine to start sending events to that destination. +func (oq *destinationQueue) sendInvite(ev *gomatrixserverlib.InviteV2Request) { + oq.runningMutex.Lock() + defer oq.runningMutex.Unlock() + oq.pendingInvites = append(oq.pendingInvites, ev) + if !oq.running.Load() { + go oq.backgroundSend() + } +} + +// backgroundSend is the worker goroutine for sending events. func (oq *destinationQueue) backgroundSend() { + oq.running.Store(true) + defer oq.running.Store(false) + for { - t := oq.next() - if t == nil { + transaction, invites := oq.nextTransaction(), oq.nextInvites() + if !transaction && !invites { // If the queue is empty then stop processing for this destination. // TODO: Remove this destination from the queue map. return @@ -81,29 +97,18 @@ func (oq *destinationQueue) backgroundSend() { // TODO: handle retries. // TODO: blacklist uncooperative servers. - - util.GetLogger(context.TODO()).Infof("Sending transaction %q containing %d PDUs, %d EDUs", t.TransactionID, len(t.PDUs), len(t.EDUs)) - - _, err := oq.client.SendTransaction(context.TODO(), *t) - if err != nil { - log.WithFields(log.Fields{ - "destination": oq.destination, - log.ErrorKey: err, - }).Info("problem sending transaction") - } } } -// next creates a new transaction from the pending event queue -// and flushes the queue. -// Returns nil if the queue was empty. -func (oq *destinationQueue) next() *gomatrixserverlib.Transaction { +// nextTransaction creates a new transaction from the pending event +// queue and sends it. Returns true if a transaction was sent or +// false otherwise. +func (oq *destinationQueue) nextTransaction() bool { oq.runningMutex.Lock() defer oq.runningMutex.Unlock() if len(oq.pendingEvents) == 0 && len(oq.pendingEDUs) == 0 { - oq.running = false - return nil + return false } t := gomatrixserverlib.Transaction{ @@ -136,5 +141,46 @@ func (oq *destinationQueue) next() *gomatrixserverlib.Transaction { oq.pendingEDUs = nil oq.sentCounter += len(t.EDUs) - return &t + util.GetLogger(context.TODO()).Infof("Sending transaction %q containing %d PDUs, %d EDUs", t.TransactionID, len(t.PDUs), len(t.EDUs)) + + _, err := oq.client.SendTransaction(context.TODO(), t) + if err != nil { + log.WithFields(log.Fields{ + "destination": oq.destination, + log.ErrorKey: err, + }).Info("problem sending transaction") + } + + return true +} + +// nextInvite takes pending invite events from the queue and sends +// them. Returns true if a transaction was sent or false otherwise. +func (oq *destinationQueue) nextInvites() bool { + oq.runningMutex.Lock() + defer oq.runningMutex.Unlock() + + if len(oq.pendingInvites) == 0 { + return false + } + + for _, inviteReq := range oq.pendingInvites { + ev := inviteReq.Event() + + if _, err := oq.client.SendInviteV2( + context.TODO(), + oq.destination, + *inviteReq, + ); err != nil { + log.WithFields(log.Fields{ + "event_id": ev.EventID(), + "state_key": ev.StateKey(), + "destination": oq.destination, + }).WithError(err).Error("failed to send invite") + } + } + + oq.pendingInvites = nil + + return true } diff --git a/federationsender/queue/queue.go b/federationsender/queue/queue.go index 840fe4afe..88d47f120 100644 --- a/federationsender/queue/queue.go +++ b/federationsender/queue/queue.go @@ -80,6 +80,49 @@ func (oqs *OutgoingQueues) SendEvent( return nil } +// SendEvent sends an event to the destinations +func (oqs *OutgoingQueues) SendInvite( + inviteReq *gomatrixserverlib.InviteV2Request, +) error { + ev := inviteReq.Event() + stateKey := ev.StateKey() + if stateKey == nil { + log.WithFields(log.Fields{ + "event_id": ev.EventID(), + }).Info("invite had no state key, dropping") + return nil + } + + _, destination, err := gomatrixserverlib.SplitID('@', *stateKey) + if err != nil { + log.WithFields(log.Fields{ + "event_id": ev.EventID(), + "state_key": stateKey, + }).Info("failed to split destination from state key") + return nil + } + + log.WithFields(log.Fields{ + "event_id": ev.EventID(), + }).Info("Sending invite") + + oqs.queuesMutex.Lock() + defer oqs.queuesMutex.Unlock() + oq := oqs.queues[destination] + if oq == nil { + oq = &destinationQueue{ + origin: oqs.origin, + destination: destination, + client: oqs.client, + } + oqs.queues[destination] = oq + } + + oq.sendInvite(inviteReq) + + return nil +} + // SendEDU sends an EDU event to the destinations func (oqs *OutgoingQueues) SendEDU( e *gomatrixserverlib.EDU, origin gomatrixserverlib.ServerName, diff --git a/go.mod b/go.mod index 97c0e44ef..5854e552b 100644 --- a/go.mod +++ b/go.mod @@ -7,9 +7,9 @@ require ( github.com/libp2p/go-libp2p-core v0.5.0 github.com/matrix-org/dugong v0.0.0-20171220115018-ea0a4690a0d5 github.com/matrix-org/go-http-js-libp2p v0.0.0-20200318135427-31631a9ef51f - github.com/matrix-org/go-sqlite3-js v0.0.0-20200304164012-aa524245b658 + github.com/matrix-org/go-sqlite3-js v0.0.0-20200325174927-327088cdef10 github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 - github.com/matrix-org/gomatrixserverlib v0.0.0-20200327155501-33fb4c7049dc + github.com/matrix-org/gomatrixserverlib v0.0.0-20200409140603-8b9a51fe9b89 github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1 github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 github.com/mattn/go-sqlite3 v2.0.2+incompatible @@ -23,9 +23,9 @@ require ( github.com/tidwall/pretty v1.0.1 // indirect github.com/uber/jaeger-client-go v2.22.1+incompatible github.com/uber/jaeger-lib v2.2.0+incompatible - go.uber.org/atomic v1.6.0 // indirect + go.uber.org/atomic v1.6.0 golang.org/x/crypto v0.0.0-20200115085410-6d4e4cb37c7d - golang.org/x/net v0.0.0-20190909003024-a7b16738d86b // indirect + golang.org/x/tools v0.0.0-20200402223321-bcf690261a44 // indirect gopkg.in/Shopify/sarama.v1 v1.20.1 gopkg.in/h2non/bimg.v1 v1.0.18 gopkg.in/yaml.v2 v2.2.5 diff --git a/go.sum b/go.sum index aeedd6f62..b1f5ac2f4 100644 --- a/go.sum +++ b/go.sum @@ -122,14 +122,16 @@ github.com/matrix-org/go-http-js-libp2p v0.0.0-20200318135427-31631a9ef51f h1:5T github.com/matrix-org/go-http-js-libp2p v0.0.0-20200318135427-31631a9ef51f/go.mod h1:qK3LUW7RCLhFM7gC3pabj3EXT9A1DsCK33MHstUhhbk= github.com/matrix-org/go-sqlite3-js v0.0.0-20200304164012-aa524245b658 h1:UlhTKClOgWnSB25Rv+BS/Vc1mRinjNUErfyGEVOBP04= github.com/matrix-org/go-sqlite3-js v0.0.0-20200304164012-aa524245b658/go.mod h1:e+cg2q7C7yE5QnAXgzo512tgFh1RbQLC0+jozuegKgo= +github.com/matrix-org/go-sqlite3-js v0.0.0-20200325174927-327088cdef10 h1:SnhC7/o87ueVwEWI3mUYtrs+s8VnYq3KZtpWsFQOLFE= +github.com/matrix-org/go-sqlite3-js v0.0.0-20200325174927-327088cdef10/go.mod h1:e+cg2q7C7yE5QnAXgzo512tgFh1RbQLC0+jozuegKgo= github.com/matrix-org/gomatrix v0.0.0-20190130130140-385f072fe9af h1:piaIBNQGIHnni27xRB7VKkEwoWCgAmeuYf8pxAyG0bI= github.com/matrix-org/gomatrix v0.0.0-20190130130140-385f072fe9af/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0= github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 h1:Hr3zjRsq2bhrnp3Ky1qgx/fzCtCALOoGYylh2tpS9K4= github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0= github.com/matrix-org/gomatrixserverlib v0.0.0-20200124100636-0c2ec91d1df5 h1:kmRjpmFOenVpOaV/DRlo9p6z/IbOKlUC+hhKsAAh8Qg= github.com/matrix-org/gomatrixserverlib v0.0.0-20200124100636-0c2ec91d1df5/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI= -github.com/matrix-org/gomatrixserverlib v0.0.0-20200327155501-33fb4c7049dc h1:qrRu4/AlulnldLiyGpYYm+ELIkrP51XCRlA3txWpN30= -github.com/matrix-org/gomatrixserverlib v0.0.0-20200327155501-33fb4c7049dc/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI= +github.com/matrix-org/gomatrixserverlib v0.0.0-20200409140603-8b9a51fe9b89 h1:YAlUJK/Ty2ZrP/DL41CiR0Cp3pteshnyIS420KVs220= +github.com/matrix-org/gomatrixserverlib v0.0.0-20200409140603-8b9a51fe9b89/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI= github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1 h1:osLoFdOy+ChQqVUn2PeTDETFftVkl4w9t/OW18g3lnk= github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1/go.mod h1:cXoYQIENbdWIQHt1SyCo6Bl3C3raHwJ0wgVrXHSqf+A= github.com/matrix-org/util v0.0.0-20171127121716-2e2df66af2f5 h1:W7l5CP4V7wPyPb4tYE11dbmeAOwtFQBTW0rf4OonOS8= @@ -251,6 +253,7 @@ github.com/uber/jaeger-lib v1.5.0/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/Aaua github.com/uber/jaeger-lib v2.2.0+incompatible h1:MxZXOiR2JuoANZ3J6DE/U0kSFv/eJ/GfSYVCjK7dyaw= github.com/uber/jaeger-lib v2.2.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U= github.com/x-cray/logrus-prefixed-formatter v0.5.2/go.mod h1:2duySbKsL6M18s5GU7VPsoEPHyzalCE06qoARUCeBBE= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.uber.org/atomic v1.3.0 h1:vs7fgriifsPbGdK3bNuMWapNn3qnZhCRXc19NRdq010= go.uber.org/atomic v1.3.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= @@ -274,6 +277,8 @@ golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvx golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs= golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/mod v0.2.0 h1:KU7oHjnv3XNWfa5COkzUifxZmxp1TyI7ImMXqFxLwvQ= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -286,8 +291,8 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20190909003024-a7b16738d86b h1:XfVGCX+0T4WOStkaOsJRllbsiImhB2jgVBGc9L0lPGc= -golang.org/x/net v0.0.0-20190909003024-a7b16738d86b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b h1:0mm1VjtFUOIlE1SbDlwjYaDxZVDP2S5ou6y0gSgXHu8= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -322,7 +327,11 @@ golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3 golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c h1:IGkKhmfzcztjm6gYkykvu/NiS8kaqbCWAEWWAyf8J5U= golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200402223321-bcf690261a44 h1:bMm0eoDiGkM5VfIyKjxDvoflW5GLp7+VCo+60n8F+TE= +golang.org/x/tools v0.0.0-20200402223321-bcf690261a44/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= diff --git a/roomserver/api/alias.go b/roomserver/api/alias.go index cb78f726a..ad375a830 100644 --- a/roomserver/api/alias.go +++ b/roomserver/api/alias.go @@ -16,6 +16,7 @@ package api import ( "context" + "errors" "net/http" commonHTTP "github.com/matrix-org/dendrite/common/http" @@ -139,12 +140,12 @@ const RoomserverGetCreatorIDForAliasPath = "/api/roomserver/GetCreatorIDForAlias const RoomserverRemoveRoomAliasPath = "/api/roomserver/removeRoomAlias" // NewRoomserverAliasAPIHTTP creates a RoomserverAliasAPI implemented by talking to a HTTP POST API. -// If httpClient is nil then it uses the http.DefaultClient -func NewRoomserverAliasAPIHTTP(roomserverURL string, httpClient *http.Client) RoomserverAliasAPI { +// If httpClient is nil an error is returned +func NewRoomserverAliasAPIHTTP(roomserverURL string, httpClient *http.Client) (RoomserverAliasAPI, error) { if httpClient == nil { - httpClient = http.DefaultClient + return nil, errors.New("NewRoomserverAliasAPIHTTP: httpClient is ") } - return &httpRoomserverAliasAPI{roomserverURL, httpClient} + return &httpRoomserverAliasAPI{roomserverURL, httpClient}, nil } type httpRoomserverAliasAPI struct { diff --git a/roomserver/api/input.go b/roomserver/api/input.go index f07cc0221..87e3983e3 100644 --- a/roomserver/api/input.go +++ b/roomserver/api/input.go @@ -17,6 +17,7 @@ package api import ( "context" + "errors" "net/http" commonHTTP "github.com/matrix-org/dendrite/common/http" @@ -85,7 +86,9 @@ type TransactionID struct { // the usual context a matrix room event would have. We usually do not have // access to the events needed to check the event auth rules for the invite. type InputInviteEvent struct { - Event gomatrixserverlib.HeaderedEvent `json:"event"` + RoomVersion gomatrixserverlib.RoomVersion `json:"room_version"` + Event gomatrixserverlib.HeaderedEvent `json:"event"` + InviteRoomState []gomatrixserverlib.InviteV2StrippedState `json:"invite_room_state"` } // InputRoomEventsRequest is a request to InputRoomEvents @@ -112,12 +115,12 @@ type RoomserverInputAPI interface { const RoomserverInputRoomEventsPath = "/api/roomserver/inputRoomEvents" // NewRoomserverInputAPIHTTP creates a RoomserverInputAPI implemented by talking to a HTTP POST API. -// If httpClient is nil then it uses the http.DefaultClient -func NewRoomserverInputAPIHTTP(roomserverURL string, httpClient *http.Client) RoomserverInputAPI { +// If httpClient is nil an error is returned +func NewRoomserverInputAPIHTTP(roomserverURL string, httpClient *http.Client) (RoomserverInputAPI, error) { if httpClient == nil { - httpClient = http.DefaultClient + return nil, errors.New("NewRoomserverInputAPIHTTP: httpClient is ") } - return &httpRoomserverInputAPI{roomserverURL, httpClient} + return &httpRoomserverInputAPI{roomserverURL, httpClient}, nil } type httpRoomserverInputAPI struct { diff --git a/roomserver/api/output.go b/roomserver/api/output.go index 4e7adff79..92a468a96 100644 --- a/roomserver/api/output.go +++ b/roomserver/api/output.go @@ -116,6 +116,8 @@ type OutputNewRoomEvent struct { // Invite events can be received outside of an existing room so have to be // tracked separately from the room events themselves. type OutputNewInviteEvent struct { + // The room version of the invited room. + RoomVersion gomatrixserverlib.RoomVersion `json:"room_version"` // The "m.room.member" invite event. Event gomatrixserverlib.HeaderedEvent `json:"event"` } diff --git a/roomserver/api/query.go b/roomserver/api/query.go index 3cb1b8a7b..5f024d266 100644 --- a/roomserver/api/query.go +++ b/roomserver/api/query.go @@ -18,6 +18,7 @@ package api import ( "context" + "errors" "net/http" commonHTTP "github.com/matrix-org/dendrite/common/http" @@ -202,6 +203,9 @@ type QueryStateAndAuthChainRequest struct { PrevEventIDs []string `json:"prev_event_ids"` // The list of auth events for the event. Used to calculate the auth chain AuthEventIDs []string `json:"auth_event_ids"` + // Should state resolution be ran on the result events? + // TODO: check call sites and remove if we always want to do state res + ResolveState bool `json:"resolve_state"` } // QueryStateAndAuthChainResponse is a response to QueryStateAndAuthChain @@ -406,12 +410,12 @@ const RoomserverQueryRoomVersionCapabilitiesPath = "/api/roomserver/queryRoomVer const RoomserverQueryRoomVersionForRoomPath = "/api/roomserver/queryRoomVersionForRoom" // NewRoomserverQueryAPIHTTP creates a RoomserverQueryAPI implemented by talking to a HTTP POST API. -// If httpClient is nil then it uses the http.DefaultClient -func NewRoomserverQueryAPIHTTP(roomserverURL string, httpClient *http.Client) RoomserverQueryAPI { +// If httpClient is nil an error is returned +func NewRoomserverQueryAPIHTTP(roomserverURL string, httpClient *http.Client) (RoomserverQueryAPI, error) { if httpClient == nil { - httpClient = http.DefaultClient + return nil, errors.New("NewRoomserverQueryAPIHTTP: httpClient is ") } - return &httpRoomserverQueryAPI{roomserverURL, httpClient} + return &httpRoomserverQueryAPI{roomserverURL, httpClient}, nil } type httpRoomserverQueryAPI struct { diff --git a/roomserver/input/events.go b/roomserver/input/events.go index c75a3acd9..2bb0d0a05 100644 --- a/roomserver/input/events.go +++ b/roomserver/input/events.go @@ -26,6 +26,7 @@ import ( "github.com/matrix-org/dendrite/roomserver/state/database" "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/gomatrixserverlib" + log "github.com/sirupsen/logrus" ) // A RoomEventDatabase has the storage APIs needed to store a room event. @@ -64,6 +65,7 @@ type RoomEventDatabase interface { // Build a membership updater for the target user in a room. MembershipUpdater( ctx context.Context, roomID, targerUserID string, + roomVersion gomatrixserverlib.RoomVersion, ) (types.MembershipUpdater, error) // Look up event ID by transaction's info. // This is used to determine if the room event is processed/processing already. @@ -193,7 +195,14 @@ func processInviteEvent( roomID := input.Event.RoomID() targetUserID := *input.Event.StateKey() - updater, err := db.MembershipUpdater(ctx, roomID, targetUserID) + log.WithFields(log.Fields{ + "event_id": input.Event.EventID(), + "room_id": roomID, + "room_version": input.RoomVersion, + "target_user_id": targetUserID, + }).Info("processing invite event") + + updater, err := db.MembershipUpdater(ctx, roomID, targetUserID, input.RoomVersion) if err != nil { return err } @@ -237,7 +246,12 @@ func processInviteEvent( } event := input.Event.Unwrap() - outputUpdates, err := updateToInviteMembership(updater, &event, nil) + + if err = event.SetUnsignedField("invite_room_state", input.InviteRoomState); err != nil { + return err + } + + outputUpdates, err := updateToInviteMembership(updater, &event, nil, input.Event.RoomVersion) if err != nil { return err } diff --git a/roomserver/input/membership.go b/roomserver/input/membership.go index f2ac3b510..ee39ff5eb 100644 --- a/roomserver/input/membership.go +++ b/roomserver/input/membership.go @@ -112,7 +112,7 @@ func updateMembership( switch newMembership { case gomatrixserverlib.Invite: - return updateToInviteMembership(mu, add, updates) + return updateToInviteMembership(mu, add, updates, updater.RoomVersion()) case gomatrixserverlib.Join: return updateToJoinMembership(mu, add, updates) case gomatrixserverlib.Leave, gomatrixserverlib.Ban: @@ -126,6 +126,7 @@ func updateMembership( func updateToInviteMembership( mu types.MembershipUpdater, add *gomatrixserverlib.Event, updates []api.OutputEvent, + roomVersion gomatrixserverlib.RoomVersion, ) ([]api.OutputEvent, error) { // We may have already sent the invite to the user, either because we are // reprocessing this event, or because the we received this invite from a @@ -136,14 +137,14 @@ func updateToInviteMembership( return nil, err } if needsSending { - roomVersion := gomatrixserverlib.RoomVersionV1 // We notify the consumers using a special event even though we will // notify them about the change in current state as part of the normal // room event stream. This ensures that the consumers only have to // consider a single stream of events when determining whether a user // is invited, rather than having to combine multiple streams themselves. onie := api.OutputNewInviteEvent{ - Event: (*add).Headered(roomVersion), + Event: (*add).Headered(roomVersion), + RoomVersion: roomVersion, } updates = append(updates, api.OutputEvent{ Type: api.OutputTypeNewInviteEvent, diff --git a/roomserver/query/query.go b/roomserver/query/query.go index b7cdf1507..7e05fe36f 100644 --- a/roomserver/query/query.go +++ b/roomserver/query/query.go @@ -132,7 +132,7 @@ func (r *RoomserverQueryAPI) QueryLatestEventsAndState( return err } - // Look up the currrent state for the requested tuples. + // Look up the current state for the requested tuples. stateEntries, err := roomState.LoadStateAtSnapshotForStringTuples( ctx, currentStateSnapshotNID, request.StateToFetch, ) @@ -736,6 +736,14 @@ func (r *RoomserverQueryAPI) QueryStateAndAuthChain( return err } + if request.ResolveState { + if stateEvents, err = state.ResolveConflictsAdhoc( + roomVersion, stateEvents, authEvents, + ); err != nil { + return err + } + } + for _, event := range stateEvents { response.StateEvents = append(response.StateEvents, event.Headered(roomVersion)) } diff --git a/roomserver/state/state.go b/roomserver/state/state.go index b8e3e18a1..3f68e0747 100644 --- a/roomserver/state/state.go +++ b/roomserver/state/state.go @@ -18,7 +18,6 @@ package state import ( "context" - "errors" "fmt" "sort" "time" @@ -672,7 +671,7 @@ func (v StateResolution) calculateStateAfterManyEvents( return } algorithm = "full_state_with_conflicts" - state = resolved + state = resolved[:util.SortAndUnique(stateEntrySorter(resolved))] } else { algorithm = "full_state_no_conflicts" // 6) There weren't any conflicts @@ -681,6 +680,83 @@ func (v StateResolution) calculateStateAfterManyEvents( return } +// ResolveConflictsAdhoc is a helper function to assist the query API in +// performing state resolution when requested. This is a different code +// path to the rest of state.go because this assumes you already have +// gomatrixserverlib.Event objects and not just a bunch of NIDs like +// elsewhere in the state resolution. +// TODO: Some of this can possibly be deduplicated +func ResolveConflictsAdhoc( + version gomatrixserverlib.RoomVersion, + events []gomatrixserverlib.Event, + authEvents []gomatrixserverlib.Event, +) ([]gomatrixserverlib.Event, error) { + type stateKeyTuple struct { + Type string + StateKey string + } + + // Prepare our data structures. + eventMap := make(map[stateKeyTuple][]gomatrixserverlib.Event) + var conflicted, notConflicted, resolved []gomatrixserverlib.Event + + // Run through all of the events that we were given and sort them + // into a map, sorted by (event_type, state_key) tuple. This means + // that we can easily spot events that are "conflicted", e.g. + // there are duplicate values for the same tuple key. + for _, event := range events { + if event.StateKey() == nil { + // Ignore events that are not state events. + continue + } + // Append the events if there is already a conflicted list for + // this tuple key, create it if not. + tuple := stateKeyTuple{event.Type(), *event.StateKey()} + if _, ok := eventMap[tuple]; ok { + eventMap[tuple] = append(eventMap[tuple], event) + } else { + eventMap[tuple] = []gomatrixserverlib.Event{event} + } + } + + // Split out the events in the map into conflicted and unconflicted + // buckets. The conflicted events will be ran through state res, + // whereas unconfliced events will always going to appear in the + // final resolved state. + for _, list := range eventMap { + if len(list) > 1 { + conflicted = append(conflicted, list...) + } else { + notConflicted = append(notConflicted, list...) + } + } + + // Work out which state resolution algorithm we want to run for + // the room version. + stateResAlgo, err := version.StateResAlgorithm() + if err != nil { + return nil, err + } + switch stateResAlgo { + case gomatrixserverlib.StateResV1: + // Currently state res v1 doesn't handle unconflicted events + // for us, like state res v2 does, so we will need to add the + // unconflicted events into the state ourselves. + // TODO: Fix state res v1 so this is handled for the caller. + resolved = gomatrixserverlib.ResolveStateConflicts(conflicted, authEvents) + resolved = append(resolved, notConflicted...) + case gomatrixserverlib.StateResV2: + // TODO: auth difference here? + resolved = gomatrixserverlib.ResolveStateConflictsV2(conflicted, notConflicted, authEvents, authEvents) + default: + return nil, fmt.Errorf("unsupported state resolution algorithm %v", stateResAlgo) + } + + // Return the final resolved state events, including both the + // resolved set of conflicted events, and the unconflicted events. + return resolved, nil +} + func (v StateResolution) resolveConflicts( ctx context.Context, version gomatrixserverlib.RoomVersion, notConflicted, conflicted []types.StateEntry, @@ -695,7 +771,7 @@ func (v StateResolution) resolveConflicts( case gomatrixserverlib.StateResV2: return v.resolveConflictsV2(ctx, notConflicted, conflicted) } - return nil, errors.New("unsupported state resolution algorithm") + return nil, fmt.Errorf("unsupported state resolution algorithm %v", stateResAlgo) } // resolveConflicts resolves a list of conflicted state entries. It takes two lists. diff --git a/roomserver/storage/interface.go b/roomserver/storage/interface.go index 20db7ef7f..50369d806 100644 --- a/roomserver/storage/interface.go +++ b/roomserver/storage/interface.go @@ -41,7 +41,7 @@ type Database interface { GetAliasesForRoomID(ctx context.Context, roomID string) ([]string, error) GetCreatorIDForAlias(ctx context.Context, alias string) (string, error) RemoveRoomAlias(ctx context.Context, alias string) error - MembershipUpdater(ctx context.Context, roomID, targetUserID string) (types.MembershipUpdater, error) + MembershipUpdater(ctx context.Context, roomID, targetUserID string, roomVersion gomatrixserverlib.RoomVersion) (types.MembershipUpdater, error) GetMembership(ctx context.Context, roomNID types.RoomNID, requestSenderUserID string) (membershipEventNID types.EventNID, stillInRoom bool, err error) GetMembershipEventNIDsForRoom(ctx context.Context, roomNID types.RoomNID, joinOnly bool) ([]types.EventNID, error) EventsFromIDs(ctx context.Context, eventIDs []string) ([]types.Event, error) diff --git a/roomserver/storage/postgres/storage.go b/roomserver/storage/postgres/storage.go index 83a17b1a1..c91c59ebc 100644 --- a/roomserver/storage/postgres/storage.go +++ b/roomserver/storage/postgres/storage.go @@ -393,6 +393,12 @@ type roomRecentEventsUpdater struct { currentStateSnapshotNID types.StateSnapshotNID } +// RoomVersion implements types.RoomRecentEventsUpdater +func (u *roomRecentEventsUpdater) RoomVersion() (version gomatrixserverlib.RoomVersion) { + version, _ = u.d.GetRoomVersionForRoomNID(u.ctx, u.roomNID) + return +} + // LatestEvents implements types.RoomRecentEventsUpdater func (u *roomRecentEventsUpdater) LatestEvents() []types.StateAtEventAndReference { return u.latestEvents @@ -534,6 +540,7 @@ func (d *Database) StateEntriesForTuples( // MembershipUpdater implements input.RoomEventDatabase func (d *Database) MembershipUpdater( ctx context.Context, roomID, targetUserID string, + roomVersion gomatrixserverlib.RoomVersion, ) (types.MembershipUpdater, error) { txn, err := d.db.Begin() if err != nil { @@ -546,8 +553,7 @@ func (d *Database) MembershipUpdater( } }() - // TODO: Room version here - roomNID, err := d.assignRoomNID(ctx, txn, roomID, "1") + roomNID, err := d.assignRoomNID(ctx, txn, roomID, roomVersion) if err != nil { return nil, err } diff --git a/roomserver/storage/sqlite3/storage.go b/roomserver/storage/sqlite3/storage.go index 6d6743393..f6c692fd1 100644 --- a/roomserver/storage/sqlite3/storage.go +++ b/roomserver/storage/sqlite3/storage.go @@ -486,6 +486,12 @@ type roomRecentEventsUpdater struct { currentStateSnapshotNID types.StateSnapshotNID } +// RoomVersion implements types.RoomRecentEventsUpdater +func (u *roomRecentEventsUpdater) RoomVersion() (version gomatrixserverlib.RoomVersion) { + version, _ = u.d.GetRoomVersionForRoomNID(u.ctx, u.roomNID) + return +} + // LatestEvents implements types.RoomRecentEventsUpdater func (u *roomRecentEventsUpdater) LatestEvents() []types.StateAtEventAndReference { return u.latestEvents @@ -657,6 +663,7 @@ func (d *Database) StateEntriesForTuples( // MembershipUpdater implements input.RoomEventDatabase func (d *Database) MembershipUpdater( ctx context.Context, roomID, targetUserID string, + roomVersion gomatrixserverlib.RoomVersion, ) (updater types.MembershipUpdater, err error) { var txn *sql.Tx txn, err = d.db.Begin() @@ -682,8 +689,7 @@ func (d *Database) MembershipUpdater( } }() - // TODO: Room version here - roomNID, err := d.assignRoomNID(ctx, txn, roomID, "1") + roomNID, err := d.assignRoomNID(ctx, txn, roomID, roomVersion) if err != nil { return nil, err } diff --git a/roomserver/types/types.go b/roomserver/types/types.go index d5fe32762..dfc112cfd 100644 --- a/roomserver/types/types.go +++ b/roomserver/types/types.go @@ -140,6 +140,8 @@ type StateEntryList struct { // (On postgresql this wraps a database transaction that holds a "FOR UPDATE" // lock on the row in the rooms table holding the latest events for the room.) type RoomRecentEventsUpdater interface { + // The room version of the room. + RoomVersion() gomatrixserverlib.RoomVersion // The latest event IDs and state in the room. LatestEvents() []StateAtEventAndReference // The event ID of the latest event written to the output log in the room. diff --git a/roomserver/version/version.go b/roomserver/version/version.go index ed16ecca0..e60b5ef7a 100644 --- a/roomserver/version/version.go +++ b/roomserver/version/version.go @@ -43,12 +43,12 @@ var roomVersions = map[gomatrixserverlib.RoomVersion]RoomVersionDescription{ Stable: true, }, gomatrixserverlib.RoomVersionV3: RoomVersionDescription{ - Supported: false, - Stable: false, + Supported: true, + Stable: true, }, gomatrixserverlib.RoomVersionV4: RoomVersionDescription{ - Supported: false, - Stable: false, + Supported: true, + Stable: true, }, gomatrixserverlib.RoomVersionV5: RoomVersionDescription{ Supported: false, diff --git a/show-expected-fail-tests.sh b/show-expected-fail-tests.sh index 9cd51b007..0a4c7be87 100755 --- a/show-expected-fail-tests.sh +++ b/show-expected-fail-tests.sh @@ -60,7 +60,7 @@ while read -r test_name; do # Ignore empty lines [ "${test_name}" = "" ] && continue - grep "${test_name}" "${whitelist_file}" > /dev/null 2>&1 + grep "^${test_name}" "${whitelist_file}" > /dev/null 2>&1 if [ "$?" != "0" ]; then # Check if this test name is blacklisted if printf '%s\n' "${blacklisted_tests[@]}" | grep -q -P "^${test_name}$"; then @@ -80,8 +80,8 @@ done <<< "${passed_but_expected_fail}" # TODO: Check that the same test doesn't appear twice in the whitelist|blacklist # Trim test output strings -tests_to_add=$(echo -e $tests_to_add | xargs) -already_in_whitelist=$(echo -e $already_in_whitelist | xargs) +tests_to_add=$(echo -e $tests_to_add | xargs -d '\n') +already_in_whitelist=$(echo -e $already_in_whitelist | xargs -d '\n') # Format output with markdown for buildkite annotation rendering purposes if [ -n "${tests_to_add}" ] && [ -n "${already_in_whitelist}" ]; then diff --git a/syncapi/consumers/typingserver.go b/syncapi/consumers/eduserver.go similarity index 90% rename from syncapi/consumers/typingserver.go rename to syncapi/consumers/eduserver.go index 369254411..5491c1e9f 100644 --- a/syncapi/consumers/typingserver.go +++ b/syncapi/consumers/eduserver.go @@ -19,15 +19,15 @@ import ( "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common/config" + "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/sync" "github.com/matrix-org/dendrite/syncapi/types" - "github.com/matrix-org/dendrite/typingserver/api" log "github.com/sirupsen/logrus" sarama "gopkg.in/Shopify/sarama.v1" ) -// OutputTypingEventConsumer consumes events that originated in the typing server. +// OutputTypingEventConsumer consumes events that originated in the EDU server. type OutputTypingEventConsumer struct { typingConsumer *common.ContinualConsumer db storage.Database @@ -35,7 +35,7 @@ type OutputTypingEventConsumer struct { } // NewOutputTypingEventConsumer creates a new OutputTypingEventConsumer. -// Call Start() to begin consuming from the typing server. +// Call Start() to begin consuming from the EDU server. func NewOutputTypingEventConsumer( cfg *config.Dendrite, kafkaConsumer sarama.Consumer, @@ -60,7 +60,7 @@ func NewOutputTypingEventConsumer( return s } -// Start consuming from typing api +// Start consuming from EDU api func (s *OutputTypingEventConsumer) Start() error { s.db.SetTypingTimeoutCallback(func(userID, roomID string, latestSyncPosition int64) { s.notifier.OnNewEvent( @@ -78,7 +78,7 @@ func (s *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error var output api.OutputTypingEvent if err := json.Unmarshal(msg.Value, &output); err != nil { // If the message was invalid, log it and move on to the next message in the stream - log.WithError(err).Errorf("typing server output log: message parse failure") + log.WithError(err).Errorf("EDU server output log: message parse failure") return nil } @@ -86,7 +86,7 @@ func (s *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error "room_id": output.Event.RoomID, "user_id": output.Event.UserID, "typing": output.Event.Typing, - }).Debug("received data from typing server") + }).Debug("received data from EDU server") var typingPos types.StreamPosition typingEvent := output.Event diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index b6dc19696..a3efd8d58 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -20,9 +20,9 @@ import ( "github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/types" - "github.com/matrix-org/dendrite/typingserver/cache" "github.com/matrix-org/gomatrixserverlib" ) diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go index f3f1aabc7..ead1bf335 100644 --- a/syncapi/storage/postgres/syncserver.go +++ b/syncapi/storage/postgres/syncserver.go @@ -30,8 +30,8 @@ import ( // Import the postgres database driver. _ "github.com/lib/pq" "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/syncapi/types" - "github.com/matrix-org/dendrite/typingserver/cache" "github.com/matrix-org/gomatrixserverlib" ) @@ -53,7 +53,7 @@ type SyncServerDatasource struct { events outputRoomEventsStatements roomstate currentRoomStateStatements invites inviteEventsStatements - typingCache *cache.TypingCache + eduCache *cache.EDUCache topology outputRoomEventsTopologyStatements backwardExtremities backwardExtremitiesStatements } @@ -86,7 +86,7 @@ func NewSyncServerDatasource(dbDataSourceName string) (*SyncServerDatasource, er if err := d.backwardExtremities.prepare(d.db); err != nil { return nil, err } - d.typingCache = cache.NewTypingCache() + d.eduCache = cache.New() return &d, nil } @@ -395,7 +395,7 @@ func (d *SyncServerDatasource) syncPositionTx( maxEventID = maxInviteID } sp.PDUPosition = types.StreamPosition(maxEventID) - sp.EDUTypingPosition = types.StreamPosition(d.typingCache.GetLatestSyncPosition()) + sp.EDUTypingPosition = types.StreamPosition(d.eduCache.GetLatestSyncPosition()) return } @@ -468,7 +468,7 @@ func (d *SyncServerDatasource) addTypingDeltaToResponse( var ok bool var err error for _, roomID := range joinedRoomIDs { - if typingUsers, updated := d.typingCache.GetTypingUsersIfUpdatedAfter( + if typingUsers, updated := d.eduCache.GetTypingUsersIfUpdatedAfter( roomID, int64(since.EDUTypingPosition), ); updated { ev := gomatrixserverlib.ClientEvent{ @@ -719,7 +719,7 @@ func (d *SyncServerDatasource) RetireInviteEvent( } func (d *SyncServerDatasource) SetTypingTimeoutCallback(fn cache.TimeoutCallbackFn) { - d.typingCache.SetTimeoutCallback(fn) + d.eduCache.SetTimeoutCallback(fn) } // AddTypingUser adds a typing user to the typing cache. @@ -727,7 +727,7 @@ func (d *SyncServerDatasource) SetTypingTimeoutCallback(fn cache.TimeoutCallback func (d *SyncServerDatasource) AddTypingUser( userID, roomID string, expireTime *time.Time, ) types.StreamPosition { - return types.StreamPosition(d.typingCache.AddTypingUser(userID, roomID, expireTime)) + return types.StreamPosition(d.eduCache.AddTypingUser(userID, roomID, expireTime)) } // RemoveTypingUser removes a typing user from the typing cache. @@ -735,7 +735,7 @@ func (d *SyncServerDatasource) AddTypingUser( func (d *SyncServerDatasource) RemoveTypingUser( userID, roomID string, ) types.StreamPosition { - return types.StreamPosition(d.typingCache.RemoveUser(userID, roomID)) + return types.StreamPosition(d.eduCache.RemoveUser(userID, roomID)) } func (d *SyncServerDatasource) addInvitesToResponse( diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go index 8ff189007..30f77e54d 100644 --- a/syncapi/storage/sqlite3/syncserver.go +++ b/syncapi/storage/sqlite3/syncserver.go @@ -33,8 +33,8 @@ import ( _ "github.com/mattn/go-sqlite3" "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/syncapi/types" - "github.com/matrix-org/dendrite/typingserver/cache" "github.com/matrix-org/gomatrixserverlib" ) @@ -57,7 +57,7 @@ type SyncServerDatasource struct { events outputRoomEventsStatements roomstate currentRoomStateStatements invites inviteEventsStatements - typingCache *cache.TypingCache + eduCache *cache.EDUCache topology outputRoomEventsTopologyStatements backwardExtremities backwardExtremitiesStatements } @@ -84,7 +84,7 @@ func NewSyncServerDatasource(dataSourceName string) (*SyncServerDatasource, erro if err = d.prepare(); err != nil { return nil, err } - d.typingCache = cache.NewTypingCache() + d.eduCache = cache.New() return &d, nil } @@ -429,7 +429,7 @@ func (d *SyncServerDatasource) syncPositionTx( maxEventID = maxInviteID } sp.PDUPosition = types.StreamPosition(maxEventID) - sp.EDUTypingPosition = types.StreamPosition(d.typingCache.GetLatestSyncPosition()) + sp.EDUTypingPosition = types.StreamPosition(d.eduCache.GetLatestSyncPosition()) return } @@ -502,7 +502,7 @@ func (d *SyncServerDatasource) addTypingDeltaToResponse( var ok bool var err error for _, roomID := range joinedRoomIDs { - if typingUsers, updated := d.typingCache.GetTypingUsersIfUpdatedAfter( + if typingUsers, updated := d.eduCache.GetTypingUsersIfUpdatedAfter( roomID, int64(since.EDUTypingPosition), ); updated { ev := gomatrixserverlib.ClientEvent{ @@ -766,7 +766,7 @@ func (d *SyncServerDatasource) RetireInviteEvent( } func (d *SyncServerDatasource) SetTypingTimeoutCallback(fn cache.TimeoutCallbackFn) { - d.typingCache.SetTimeoutCallback(fn) + d.eduCache.SetTimeoutCallback(fn) } // AddTypingUser adds a typing user to the typing cache. @@ -774,7 +774,7 @@ func (d *SyncServerDatasource) SetTypingTimeoutCallback(fn cache.TimeoutCallback func (d *SyncServerDatasource) AddTypingUser( userID, roomID string, expireTime *time.Time, ) types.StreamPosition { - return types.StreamPosition(d.typingCache.AddTypingUser(userID, roomID, expireTime)) + return types.StreamPosition(d.eduCache.AddTypingUser(userID, roomID, expireTime)) } // RemoveTypingUser removes a typing user from the typing cache. @@ -782,7 +782,7 @@ func (d *SyncServerDatasource) AddTypingUser( func (d *SyncServerDatasource) RemoveTypingUser( userID, roomID string, ) types.StreamPosition { - return types.StreamPosition(d.typingCache.RemoveUser(userID, roomID)) + return types.StreamPosition(d.eduCache.RemoveUser(userID, roomID)) } func (d *SyncServerDatasource) addInvitesToResponse( diff --git a/sytest-blacklist b/sytest-blacklist index 2e6f2057c..da0667a1d 100644 --- a/sytest-blacklist +++ b/sytest-blacklist @@ -31,4 +31,7 @@ Alias creators can delete canonical alias with no ops # Blacklisted because we need to implement v2 invite endpoints for room versions # to be supported (currently fails with M_UNSUPPORTED_ROOM_VERSION) -Inbound federation rejects invites which are not signed by the sender \ No newline at end of file +Inbound federation rejects invites which are not signed by the sender + +# Blacklisted because we don't support ignores yet +Ignore invite in incremental sync diff --git a/sytest-whitelist b/sytest-whitelist index a2e7b2a6d..97ded9bb8 100644 --- a/sytest-whitelist +++ b/sytest-whitelist @@ -215,7 +215,6 @@ Guest users can sync from default guest_access rooms if joined Real non-joined users cannot room initalSync for non-world_readable rooms Push rules come down in an initial /sync Regular users can add and delete aliases in the default room configuration -Regular users can add and delete aliases when m.room.aliases is restricted GET /r0/capabilities is not public GET /joined_rooms lists newly-created room /joined_rooms returns only joined rooms @@ -225,9 +224,24 @@ Remote user can backfill in a room with version 1 POST /createRoom creates a room with the given version POST /createRoom rejects attempts to create rooms with numeric versions POST /createRoom rejects attempts to create rooms with unknown versions +Regular users can add and delete aliases when m.room.aliases is restricted User can create and send/receive messages in a room with version 2 local user can join room with version 2 remote user can join room with version 2 User can invite local user to room with version 2 Remote user can backfill in a room with version 2 -Inbound federation accepts attempts to join v2 rooms from servers with support \ No newline at end of file +Inbound federation accepts attempts to join v2 rooms from servers with support +Outbound federation can send invites via v2 API +User can create and send/receive messages in a room with version 3 +local user can join room with version 3 +Remote user can backfill in a room with version 3 +User can create and send/receive messages in a room with version 4 +local user can join room with version 4 +remote user can join room with version 3 +remote user can join room with version 4 +Remote user can backfill in a room with version 4 +# We don't support ignores yet, so ignore this for now - ha ha. +# Ignore invite in incremental sync +Outbound federation can send invites via v2 API +User can invite local user to room with version 3 +User can invite local user to room with version 4