From ed388a32b7e38c4a42fb9cea141d97a62209f26d Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Thu, 24 May 2018 13:54:42 +0100 Subject: [PATCH] Add Application Service component (#462) * Add Application Service component The component subscribes to the Roomserver kafka stream, filtering events to be eventually sent off to relevant app services, as well as handling incoming HTTP traffic from app services on the `/matrix/app/r0/*` route. Signed-off-by: Andrew Morgan * Make linting happy Signed-off-by: Andrew Morgan --- dendrite-config.yaml | 1 + .../matrix-org/dendrite/appservice/README.md | 10 ++ .../dendrite/appservice/appservice.go | 50 ++++++ .../appservice/consumers/roomserver.go | 144 ++++++++++++++++++ .../dendrite/appservice/routing/routing.go | 61 ++++++++ .../cmd/dendrite-app-service-server/main.go | 38 +++++ .../cmd/dendrite-monolith-server/main.go | 2 + 7 files changed, 306 insertions(+) create mode 100644 src/github.com/matrix-org/dendrite/appservice/README.md create mode 100644 src/github.com/matrix-org/dendrite/appservice/appservice.go create mode 100644 src/github.com/matrix-org/dendrite/appservice/consumers/roomserver.go create mode 100644 src/github.com/matrix-org/dendrite/appservice/routing/routing.go create mode 100644 src/github.com/matrix-org/dendrite/cmd/dendrite-app-service-server/main.go diff --git a/dendrite-config.yaml b/dendrite-config.yaml index 249afe60c..ae926bab8 100644 --- a/dendrite-config.yaml +++ b/dendrite-config.yaml @@ -112,6 +112,7 @@ listen: media_api: "localhost:7774" public_rooms_api: "localhost:7775" federation_sender: "localhost:7776" + appservice: "localhost:7777" # The configuration for tracing the dendrite components. tracing: diff --git a/src/github.com/matrix-org/dendrite/appservice/README.md b/src/github.com/matrix-org/dendrite/appservice/README.md new file mode 100644 index 000000000..5b00386d3 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/appservice/README.md @@ -0,0 +1,10 @@ +# Application Service + +This component interfaces with external [Application +Services](https://matrix.org/docs/spec/application_service/unstable.html). +This includes any HTTP endpoints that Application Services call, as well as talking +to any HTTP endpoints that Application Services provide themselves. + +## Consumers + +This component consumes and filters events from the Roomserver Kafka stream, passing on any necessary events to subscribing Application Services. \ No newline at end of file diff --git a/src/github.com/matrix-org/dendrite/appservice/appservice.go b/src/github.com/matrix-org/dendrite/appservice/appservice.go new file mode 100644 index 000000000..2eecf8a26 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/appservice/appservice.go @@ -0,0 +1,50 @@ +// Copyright 2018 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package appservice + +import ( + "github.com/matrix-org/dendrite/appservice/consumers" + "github.com/matrix-org/dendrite/appservice/routing" + "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" + "github.com/matrix-org/dendrite/common/basecomponent" + "github.com/matrix-org/dendrite/common/transactions" + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/gomatrixserverlib" + "github.com/sirupsen/logrus" +) + +// SetupAppServiceAPIComponent sets up and registers HTTP handlers for the AppServices +// component. +func SetupAppServiceAPIComponent( + base *basecomponent.BaseDendrite, + accountsDB *accounts.Database, + federation *gomatrixserverlib.FederationClient, + aliasAPI api.RoomserverAliasAPI, + queryAPI api.RoomserverQueryAPI, + transactionsCache *transactions.Cache, +) { + consumer := consumers.NewOutputRoomEventConsumer( + base.Cfg, base.KafkaConsumer, accountsDB, queryAPI, + ) + if err := consumer.Start(); err != nil { + logrus.WithError(err).Panicf("failed to start app service's room server consumer") + } + + // Set up HTTP Endpoints + routing.Setup( + base.APIMux, *base.Cfg, queryAPI, aliasAPI, accountsDB, + federation, transactionsCache, + ) +} diff --git a/src/github.com/matrix-org/dendrite/appservice/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/appservice/consumers/roomserver.go new file mode 100644 index 000000000..d0b6a5ba7 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/appservice/consumers/roomserver.go @@ -0,0 +1,144 @@ +// Copyright 2018 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumers + +import ( + "context" + "encoding/json" + + "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/common/config" + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/gomatrixserverlib" + + log "github.com/sirupsen/logrus" + sarama "gopkg.in/Shopify/sarama.v1" +) + +// OutputRoomEventConsumer consumes events that originated in the room server. +type OutputRoomEventConsumer struct { + roomServerConsumer *common.ContinualConsumer + db *accounts.Database + query api.RoomserverQueryAPI + serverName string +} + +// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers. +func NewOutputRoomEventConsumer( + cfg *config.Dendrite, + kafkaConsumer sarama.Consumer, + store *accounts.Database, + queryAPI api.RoomserverQueryAPI, +) *OutputRoomEventConsumer { + + consumer := common.ContinualConsumer{ + Topic: string(cfg.Kafka.Topics.OutputRoomEvent), + Consumer: kafkaConsumer, + PartitionStore: store, + } + s := &OutputRoomEventConsumer{ + roomServerConsumer: &consumer, + db: store, + query: queryAPI, + serverName: string(cfg.Matrix.ServerName), + } + consumer.ProcessMessage = s.onMessage + + return s +} + +// Start consuming from room servers +func (s *OutputRoomEventConsumer) Start() error { + return s.roomServerConsumer.Start() +} + +// onMessage is called when the sync server receives a new event from the room server output log. +// It is not safe for this function to be called from multiple goroutines, or else the +// sync stream position may race and be incorrectly calculated. +func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { + // Parse out the event JSON + var output api.OutputEvent + if err := json.Unmarshal(msg.Value, &output); err != nil { + // If the message was invalid, log it and move on to the next message in the stream + log.WithError(err).Errorf("roomserver output log: message parse failure") + return nil + } + + if output.Type != api.OutputTypeNewRoomEvent { + log.WithField("type", output.Type).Debug( + "roomserver output log: ignoring unknown output type", + ) + return nil + } + + ev := output.NewRoomEvent.Event + log.WithFields(log.Fields{ + "event_id": ev.EventID(), + "room_id": ev.RoomID(), + "type": ev.Type(), + }).Info("appservice received event from roomserver") + + events, err := s.lookupStateEvents(output.NewRoomEvent.AddsStateEventIDs, ev) + if err != nil { + return err + } + + return s.db.UpdateMemberships(context.TODO(), events, output.NewRoomEvent.RemovesStateEventIDs) +} + +// lookupStateEvents looks up the state events that are added by a new event. +func (s *OutputRoomEventConsumer) lookupStateEvents( + addsStateEventIDs []string, event gomatrixserverlib.Event, +) ([]gomatrixserverlib.Event, error) { + // Fast path if there aren't any new state events. + if len(addsStateEventIDs) == 0 { + // If the event is a membership update (e.g. for a profile update), it won't + // show up in AddsStateEventIDs, so we need to add it manually + if event.Type() == "m.room.member" { + return []gomatrixserverlib.Event{event}, nil + } + return nil, nil + } + + // Fast path if the only state event added is the event itself. + if len(addsStateEventIDs) == 1 && addsStateEventIDs[0] == event.EventID() { + return []gomatrixserverlib.Event{event}, nil + } + + result := []gomatrixserverlib.Event{} + missing := []string{} + for _, id := range addsStateEventIDs { + // Append the current event in the results if its ID is in the events list + if id == event.EventID() { + result = append(result, event) + } else { + // If the event isn't the current one, add it to the list of events + // to retrieve from the roomserver + missing = append(missing, id) + } + } + + // Request the missing events from the roomserver + eventReq := api.QueryEventsByIDRequest{EventIDs: missing} + var eventResp api.QueryEventsByIDResponse + if err := s.query.QueryEventsByID(context.TODO(), &eventReq, &eventResp); err != nil { + return nil, err + } + + result = append(result, eventResp.Events...) + + return result, nil +} diff --git a/src/github.com/matrix-org/dendrite/appservice/routing/routing.go b/src/github.com/matrix-org/dendrite/appservice/routing/routing.go new file mode 100644 index 000000000..f0b8461dc --- /dev/null +++ b/src/github.com/matrix-org/dendrite/appservice/routing/routing.go @@ -0,0 +1,61 @@ +// Copyright 2018 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package routing + +import ( + "net/http" + + "github.com/gorilla/mux" + "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/common/config" + "github.com/matrix-org/dendrite/common/transactions" + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" +) + +const pathPrefixApp = "/_matrix/app/r0" + +// Setup registers HTTP handlers with the given ServeMux. It also supplies the given http.Client +// to clients which need to make outbound HTTP requests. +func Setup( + apiMux *mux.Router, cfg config.Dendrite, // nolint: unparam + queryAPI api.RoomserverQueryAPI, aliasAPI api.RoomserverAliasAPI, // nolint: unparam + accountDB *accounts.Database, // nolint: unparam + federation *gomatrixserverlib.FederationClient, // nolint: unparam + transactionsCache *transactions.Cache, // nolint: unparam +) { + appMux := apiMux.PathPrefix(pathPrefixApp).Subrouter() + + appMux.Handle("/alias", + common.MakeExternalAPI("alias", func(req *http.Request) util.JSONResponse { + // TODO: Implement + return util.JSONResponse{ + Code: http.StatusOK, + JSON: nil, + } + }), + ).Methods(http.MethodGet, http.MethodOptions) + appMux.Handle("/user", + common.MakeExternalAPI("user", func(req *http.Request) util.JSONResponse { + // TODO: Implement + return util.JSONResponse{ + Code: http.StatusOK, + JSON: nil, + } + }), + ).Methods(http.MethodGet, http.MethodOptions) +} diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-app-service-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-app-service-server/main.go new file mode 100644 index 000000000..3c537bea0 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-app-service-server/main.go @@ -0,0 +1,38 @@ +// Copyright 2018 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "github.com/matrix-org/dendrite/appservice" + "github.com/matrix-org/dendrite/common/basecomponent" + "github.com/matrix-org/dendrite/common/transactions" +) + +func main() { + cfg := basecomponent.ParseFlags() + base := basecomponent.NewBaseDendrite(cfg, "AppService") + + defer base.Close() // nolint: errcheck + accountDB := base.CreateAccountsDB() + federation := base.CreateFederationClient() + alias, _, query := base.CreateHTTPRoomserverAPIs() + cache := transactions.New() + + appservice.SetupAppServiceAPIComponent( + base, accountDB, federation, alias, query, cache, + ) + + base.SetupAndServeHTTP(string(base.Cfg.Listen.FederationSender)) +} diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go index 84c2b3942..2d3bc09eb 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go @@ -21,6 +21,7 @@ import ( "github.com/matrix-org/dendrite/common/keydb" "github.com/matrix-org/dendrite/common/transactions" + "github.com/matrix-org/dendrite/appservice" "github.com/matrix-org/dendrite/clientapi" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common/basecomponent" @@ -65,6 +66,7 @@ func main() { mediaapi.SetupMediaAPIComponent(base, deviceDB) publicroomsapi.SetupPublicRoomsAPIComponent(base, deviceDB) syncapi.SetupSyncAPIComponent(base, deviceDB, accountDB, query) + appservice.SetupAppServiceAPIComponent(base, accountDB, federation, alias, query, transactions.New()) httpHandler := common.WrapHandlerInCORS(base.APIMux)