From 1b2b837029f8fb0de30781bcb42a367e81e850e8 Mon Sep 17 00:00:00 2001 From: Mark Haines <mjark@negativecurvature.net> Date: Thu, 1 Jun 2017 20:43:06 +0100 Subject: [PATCH] Hook up federation event receiving --- .../dendrite-federation-api-server/main.go | 53 +++++++++++- .../dendrite/federationapi/routing/routing.go | 28 ++++++- .../dendrite/federationapi/writers/send.go | 80 +++++++++++++++++-- 3 files changed, 151 insertions(+), 10 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-federation-api-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-federation-api-server/main.go index 15bf5bc47..642388df2 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-federation-api-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-federation-api-server/main.go @@ -18,11 +18,14 @@ import ( "encoding/base64" "net/http" "os" + "strings" "time" + "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/federationapi/config" "github.com/matrix-org/dendrite/federationapi/routing" + "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" log "github.com/Sirupsen/logrus" @@ -40,7 +43,10 @@ var ( // openssl x509 -noout -fingerprint -sha256 -inform pem -in server.crt |\ // python -c 'print raw_input()[19:].replace(":","").decode("hex").encode("base64").rstrip("=\n")' // - tlsFingerprint = os.Getenv("TLS_FINGERPRINT") + tlsFingerprint = os.Getenv("TLS_FINGERPRINT") + kafkaURIs = strings.Split(os.Getenv("KAFKA_URIS"), ",") + roomserverURL = os.Getenv("ROOMSERVER_URL") + roomserverInputTopic = os.Getenv("TOPIC_INPUT_ROOM_EVENT") ) func main() { @@ -57,6 +63,18 @@ func main() { log.Panic("No TLS_FINGERPRINT environment variable found.") } + if len(kafkaURIs) == 0 { + // the kafka default is :9092 + kafkaURIs = []string{"localhost:9092"} + } + + if roomserverURL == "" { + log.Panic("No ROOMSERVER_URL environment variable found.") + } + + if roomserverInputTopic == "" { + log.Panic("No TOPIC_INPUT_ROOM_EVENT environment variable found. This should match the roomserver input topic.") + } cfg := config.FederationAPI{ ServerName: serverName, // TODO: make the validity period configurable. @@ -75,6 +93,37 @@ func main() { } cfg.TLSFingerPrints = []gomatrixserverlib.TLSFingerprint{{fingerprintSHA256}} - routing.Setup(http.DefaultServeMux, cfg) + federation := gomatrixserverlib.NewFederationClient(cfg.ServerName, cfg.KeyID, cfg.PrivateKey) + + keyRing := gomatrixserverlib.KeyRing{ + KeyFetchers: []gomatrixserverlib.KeyFetcher{ + // TODO: Use perspective key fetchers for production. + &gomatrixserverlib.DirectKeyFetcher{federation.Client}, + }, + KeyDatabase: &dummyKeyDatabase{}, + } + queryAPI := api.NewRoomserverQueryAPIHTTP(roomserverURL, nil) + + roomserverProducer, err := producers.NewRoomserverProducer(kafkaURIs, roomserverInputTopic) + if err != nil { + log.Panicf("Failed to setup kafka producers(%s): %s", kafkaURIs, err) + } + + routing.Setup(http.DefaultServeMux, cfg, queryAPI, roomserverProducer, keyRing) log.Fatal(http.ListenAndServe(bindAddr, nil)) } + +// TODO: Implement a proper key database. +type dummyKeyDatabase struct{} + +func (d *dummyKeyDatabase) FetchKeys( + requests map[gomatrixserverlib.PublicKeyRequest]gomatrixserverlib.Timestamp, +) (map[gomatrixserverlib.PublicKeyRequest]gomatrixserverlib.ServerKeys, error) { + return nil, nil +} + +func (d *dummyKeyDatabase) StoreKeys( + map[gomatrixserverlib.PublicKeyRequest]gomatrixserverlib.ServerKeys, +) error { + return nil +} diff --git a/src/github.com/matrix-org/dendrite/federationapi/routing/routing.go b/src/github.com/matrix-org/dendrite/federationapi/routing/routing.go index b5af66dc6..e94611357 100644 --- a/src/github.com/matrix-org/dendrite/federationapi/routing/routing.go +++ b/src/github.com/matrix-org/dendrite/federationapi/routing/routing.go @@ -16,21 +16,34 @@ package routing import ( "github.com/gorilla/mux" + "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/federationapi/config" "github.com/matrix-org/dendrite/federationapi/readers" + "github.com/matrix-org/dendrite/federationapi/writers" + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" "github.com/prometheus/client_golang/prometheus" "net/http" + "time" ) const ( - pathPrefixV2Keys = "/_matrix/key/v2" + pathPrefixV2Keys = "/_matrix/key/v2" + pathPrefixV1Federation = "/_matrix/federation/v1" ) // Setup registers HTTP handlers with the given ServeMux. -func Setup(servMux *http.ServeMux, cfg config.FederationAPI) { +func Setup( + servMux *http.ServeMux, + cfg config.FederationAPI, + query api.RoomserverQueryAPI, + producer *producers.RoomserverProducer, + keys gomatrixserverlib.KeyRing, +) { apiMux := mux.NewRouter() v2keysmux := apiMux.PathPrefix(pathPrefixV2Keys).Subrouter() + v1fedmux := apiMux.PathPrefix(pathPrefixV1Federation).Subrouter() localKeys := makeAPI("localkeys", func(req *http.Request) util.JSONResponse { return readers.LocalKeys(req, cfg) @@ -43,6 +56,17 @@ func Setup(servMux *http.ServeMux, cfg config.FederationAPI) { v2keysmux.Handle("/server/{keyID}", localKeys) v2keysmux.Handle("/server/", localKeys) + v1fedmux.Handle("/send/{txnID}/", makeAPI("send", + func(req *http.Request) util.JSONResponse { + vars := mux.Vars(req) + return writers.Send( + req, gomatrixserverlib.TransactionID(vars["txnID"]), + time.Now(), + cfg, query, producer, keys, + ) + }, + )) + servMux.Handle("/metrics", prometheus.Handler()) servMux.Handle("/api/", http.StripPrefix("/api", apiMux)) } diff --git a/src/github.com/matrix-org/dendrite/federationapi/writers/send.go b/src/github.com/matrix-org/dendrite/federationapi/writers/send.go index 006fc8464..da6ed34d9 100644 --- a/src/github.com/matrix-org/dendrite/federationapi/writers/send.go +++ b/src/github.com/matrix-org/dendrite/federationapi/writers/send.go @@ -3,7 +3,9 @@ package writers import ( "encoding/json" "fmt" + "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" + "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/federationapi/config" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" @@ -18,6 +20,8 @@ func Send( txnID gomatrixserverlib.TransactionID, now time.Time, cfg config.FederationAPI, + query api.RoomserverQueryAPI, + producer *producers.RoomserverProducer, keys gomatrixserverlib.KeyRing, ) util.JSONResponse { request, errResp := gomatrixserverlib.VerifyHTTPRequest(req, now, cfg.ServerName, keys) @@ -37,21 +41,76 @@ func Send( content.TransactionID = txnID content.Destination = cfg.ServerName - // TODO: process the transaction. + resp, err := processTransaction(content, query, producer, keys) + if err != nil { + return httputil.LogThenError(req, err) + } return util.JSONResponse{ Code: 200, - JSON: gomatrixserverlib.RespSend{}, + JSON: resp, } } -func processTransaction(t gomatrixserverlib.Transaction, query api.RoomserverQueryAPI) { +func processTransaction( + t gomatrixserverlib.Transaction, + query api.RoomserverQueryAPI, + producer *producers.RoomserverProducer, + keys gomatrixserverlib.KeyRing, +) (*gomatrixserverlib.RespSend, error) { + // Check the event signatures + if err := gomatrixserverlib.VerifyEventSignatures(t.PDUs, keys); err != nil { + return nil, err + } + // Process the events. + results := map[string]gomatrixserverlib.PDUResult{} + for _, e := range t.PDUs { + err := processEvent(e, query, producer) + if err != nil { + // If the error is due to the event itself being bad then we skip + // it and move onto the next event. We report an error so that the + // sender knows that we have skipped processing it. + // + // However if the event is due to a temporary failure in our server + // such as a database being unavailable then we should bail, and + // hope that the sender will retry when we are feeling better. + // + // It is uncertain what we should do if an event fails because + // we failed to fetch more information from the sending server. + // For example if a request to /state fails. + // If we skip the event then we risk missing the event until we + // receive another event referencing it. + // If we bail and stop processing then we risk wedging incoming + // transactions from that server forever. + switch err.(type) { + case unknownRoomError: + case *gomatrixserverlib.NotAllowed: + default: + // Any other error should be the result of a temporary error in + // our server so we should bail processing the transaction entirely. + return nil, err + } + results[e.EventID()] = gomatrixserverlib.PDUResult{err.Error()} + } else { + results[e.EventID()] = gomatrixserverlib.PDUResult{} + } + } + + // TODO: Process the EDUs. + + return &gomatrixserverlib.RespSend{PDUs: results}, nil } -var errUnknownRoom = fmt.Errorf("unknown room") +type unknownRoomError string -func processEvent(e gomatrixserverlib.Event, query api.RoomserverQueryAPI) error { +func (e unknownRoomError) Error() string { return fmt.Sprintf("unknown room %q", e) } + +func processEvent( + e gomatrixserverlib.Event, + query api.RoomserverQueryAPI, + producer *producers.RoomserverProducer, +) error { refs := e.PrevEvents() prevEventIDs := make([]string, len(refs)) for i := range refs { @@ -77,7 +136,7 @@ func processEvent(e gomatrixserverlib.Event, query api.RoomserverQueryAPI) error // that this server is unaware of. // However generally speaking we should reject events for rooms we // aren't a member of. - return errUnknownRoom + return unknownRoomError(e.RoomID()) } if !stateResp.PrevEventsExist { @@ -101,6 +160,7 @@ func processEvent(e gomatrixserverlib.Event, query api.RoomserverQueryAPI) error panic(fmt.Errorf("Receiving events with missing prev_events is no implemented")) } + // Check that the event is allowed by the state at the event. authUsingState := gomatrixserverlib.NewAuthEvents(nil) for i := range stateResp.StateEvents { authUsingState.AddEvent(&stateResp.StateEvents[i]) @@ -110,5 +170,13 @@ func processEvent(e gomatrixserverlib.Event, query api.RoomserverQueryAPI) error return err } + // TODO: Check that the roomserver has a copy of all of the auth_events. + // TODO: Check that the event is allowed by its auth_events. + + // pass the event to the roomserver + if err := producer.SendEvents([]gomatrixserverlib.Event{e}); err != nil { + return err + } + return nil }