From 515cce1a4517cc6ab86f08844e60b33785fbd07d Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 7 Jun 2017 14:32:53 +0100 Subject: [PATCH 1/2] Add support for receiving room events over federation. (#130) * Add API for querying events by ID. * Fix tense * Start implementing federation ingress * More stuff * Hook up federation event receiving * Handle the case where we are missing state * Fix docstring and comments * Fix infinite loop when printing unknownRoomError --- .../dendrite-federation-api-server/main.go | 53 ++++- .../dendrite/cmd/federation-api-proxy/main.go | 124 ++++++++++ .../dendrite/federationapi/routing/routing.go | 29 ++- .../dendrite/federationapi/writers/send.go | 211 ++++++++++++++++++ .../dendrite/roomserver/input/events.go | 2 +- .../dendrite/roomserver/query/query.go | 9 +- .../dendrite/roomserver/state/state.go | 2 +- .../roomserver/storage/events_table.go | 17 +- .../dendrite/roomserver/types/types.go | 6 + 9 files changed, 441 insertions(+), 12 deletions(-) create mode 100644 src/github.com/matrix-org/dendrite/cmd/federation-api-proxy/main.go create mode 100644 src/github.com/matrix-org/dendrite/federationapi/writers/send.go diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-federation-api-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-federation-api-server/main.go index 15bf5bc47..268eecbec 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-federation-api-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-federation-api-server/main.go @@ -18,11 +18,14 @@ import ( "encoding/base64" "net/http" "os" + "strings" "time" + "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/federationapi/config" "github.com/matrix-org/dendrite/federationapi/routing" + "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" log "github.com/Sirupsen/logrus" @@ -40,7 +43,10 @@ var ( // openssl x509 -noout -fingerprint -sha256 -inform pem -in server.crt |\ // python -c 'print raw_input()[19:].replace(":","").decode("hex").encode("base64").rstrip("=\n")' // - tlsFingerprint = os.Getenv("TLS_FINGERPRINT") + tlsFingerprint = os.Getenv("TLS_FINGERPRINT") + kafkaURIs = strings.Split(os.Getenv("KAFKA_URIS"), ",") + roomserverURL = os.Getenv("ROOMSERVER_URL") + roomserverInputTopic = os.Getenv("TOPIC_INPUT_ROOM_EVENT") ) func main() { @@ -57,6 +63,18 @@ func main() { log.Panic("No TLS_FINGERPRINT environment variable found.") } + if len(kafkaURIs) == 0 { + // the kafka default is :9092 + kafkaURIs = []string{"localhost:9092"} + } + + if roomserverURL == "" { + log.Panic("No ROOMSERVER_URL environment variable found.") + } + + if roomserverInputTopic == "" { + log.Panic("No TOPIC_INPUT_ROOM_EVENT environment variable found. This should match the roomserver input topic.") + } cfg := config.FederationAPI{ ServerName: serverName, // TODO: make the validity period configurable. @@ -75,6 +93,37 @@ func main() { } cfg.TLSFingerPrints = []gomatrixserverlib.TLSFingerprint{{fingerprintSHA256}} - routing.Setup(http.DefaultServeMux, cfg) + federation := gomatrixserverlib.NewFederationClient(cfg.ServerName, cfg.KeyID, cfg.PrivateKey) + + keyRing := gomatrixserverlib.KeyRing{ + KeyFetchers: []gomatrixserverlib.KeyFetcher{ + // TODO: Use perspective key fetchers for production. + &gomatrixserverlib.DirectKeyFetcher{federation.Client}, + }, + KeyDatabase: &dummyKeyDatabase{}, + } + queryAPI := api.NewRoomserverQueryAPIHTTP(roomserverURL, nil) + + roomserverProducer, err := producers.NewRoomserverProducer(kafkaURIs, roomserverInputTopic) + if err != nil { + log.Panicf("Failed to setup kafka producers(%s): %s", kafkaURIs, err) + } + + routing.Setup(http.DefaultServeMux, cfg, queryAPI, roomserverProducer, keyRing, federation) log.Fatal(http.ListenAndServe(bindAddr, nil)) } + +// TODO: Implement a proper key database. +type dummyKeyDatabase struct{} + +func (d *dummyKeyDatabase) FetchKeys( + requests map[gomatrixserverlib.PublicKeyRequest]gomatrixserverlib.Timestamp, +) (map[gomatrixserverlib.PublicKeyRequest]gomatrixserverlib.ServerKeys, error) { + return nil, nil +} + +func (d *dummyKeyDatabase) StoreKeys( + map[gomatrixserverlib.PublicKeyRequest]gomatrixserverlib.ServerKeys, +) error { + return nil +} diff --git a/src/github.com/matrix-org/dendrite/cmd/federation-api-proxy/main.go b/src/github.com/matrix-org/dendrite/cmd/federation-api-proxy/main.go new file mode 100644 index 000000000..ed0aed722 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/cmd/federation-api-proxy/main.go @@ -0,0 +1,124 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "flag" + "fmt" + log "github.com/Sirupsen/logrus" + "net/http" + "net/http/httputil" + "net/url" + "os" + "strings" + "time" +) + +const usage = `Usage: %s + +Create a single endpoint URL which remote matrix servers can be pointed at. + +The server-server API in Dendrite is split across multiple processes +which listen on multiple ports. You cannot point a Matrix server at +any of those ports, as there will be unimplemented functionality. +In addition, all server-server API processes start with the additional +path prefix '/api', which Matrix servers will be unaware of. + +This tool will proxy requests for all server-server URLs and forward +them to their respective process. It will also add the '/api' path +prefix to incoming requests. + +THIS TOOL IS FOR TESTING AND NOT INTENDED FOR PRODUCTION USE. + +Arguments: + +` + +var ( + federationAPIURL = flag.String("federation-api-url", "", "The base URL of the listening 'dendrite-federation-api-server' process. E.g. 'http://localhost:4200'") + bindAddress = flag.String("bind-address", ":8448", "The listening port for the proxy.") + certFile = flag.String("tls-cert", "server.crt", "The PEM formatted X509 certificate to use for TLS") + keyFile = flag.String("tls-key", "server.key", "The PEM private key to use for TLS") +) + +func makeProxy(targetURL string) (*httputil.ReverseProxy, error) { + if !strings.HasSuffix(targetURL, "/") { + targetURL += "/" + } + // Check that we can parse the URL. + _, err := url.Parse(targetURL) + if err != nil { + return nil, err + } + return &httputil.ReverseProxy{ + Director: func(req *http.Request) { + // URL.Path() removes the % escaping from the path. + // The % encoding will be added back when the url is encoded + // when the request is forwarded. + // This means that we will lose any unessecary escaping from the URL. + // Pratically this means that any distinction between '%2F' and '/' + // in the URL will be lost by the time it reaches the target. + path := req.URL.Path + path = "api" + path + log.WithFields(log.Fields{ + "path": path, + "url": targetURL, + "method": req.Method, + }).Print("proxying request") + newURL, err := url.Parse(targetURL + path) + if err != nil { + // We already checked that we can parse the URL + // So this shouldn't ever get hit. + panic(err) + } + // Copy the query parameters from the request. + newURL.RawQuery = req.URL.RawQuery + req.URL = newURL + }, + }, nil +} + +func main() { + flag.Usage = func() { + fmt.Fprintf(os.Stderr, usage, os.Args[0]) + flag.PrintDefaults() + } + + flag.Parse() + + if *federationAPIURL == "" { + flag.Usage() + fmt.Fprintln(os.Stderr, "no --federation-api-url specified.") + os.Exit(1) + } + + federationProxy, err := makeProxy(*federationAPIURL) + if err != nil { + panic(err) + } + + http.Handle("/", federationProxy) + + srv := &http.Server{ + Addr: *bindAddress, + ReadTimeout: 1 * time.Minute, // how long we wait for the client to send the entire request (after connection accept) + WriteTimeout: 5 * time.Minute, // how long the proxy has to write the full response + } + + fmt.Println("Proxying requests to:") + fmt.Println(" /* => ", *federationAPIURL+"/api/*") + fmt.Println("Listening on ", *bindAddress) + panic(srv.ListenAndServeTLS(*certFile, *keyFile)) +} diff --git a/src/github.com/matrix-org/dendrite/federationapi/routing/routing.go b/src/github.com/matrix-org/dendrite/federationapi/routing/routing.go index b5af66dc6..0f20d0115 100644 --- a/src/github.com/matrix-org/dendrite/federationapi/routing/routing.go +++ b/src/github.com/matrix-org/dendrite/federationapi/routing/routing.go @@ -16,21 +16,35 @@ package routing import ( "github.com/gorilla/mux" + "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/federationapi/config" "github.com/matrix-org/dendrite/federationapi/readers" + "github.com/matrix-org/dendrite/federationapi/writers" + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" "github.com/prometheus/client_golang/prometheus" "net/http" + "time" ) const ( - pathPrefixV2Keys = "/_matrix/key/v2" + pathPrefixV2Keys = "/_matrix/key/v2" + pathPrefixV1Federation = "/_matrix/federation/v1" ) // Setup registers HTTP handlers with the given ServeMux. -func Setup(servMux *http.ServeMux, cfg config.FederationAPI) { +func Setup( + servMux *http.ServeMux, + cfg config.FederationAPI, + query api.RoomserverQueryAPI, + producer *producers.RoomserverProducer, + keys gomatrixserverlib.KeyRing, + federation *gomatrixserverlib.FederationClient, +) { apiMux := mux.NewRouter() v2keysmux := apiMux.PathPrefix(pathPrefixV2Keys).Subrouter() + v1fedmux := apiMux.PathPrefix(pathPrefixV1Federation).Subrouter() localKeys := makeAPI("localkeys", func(req *http.Request) util.JSONResponse { return readers.LocalKeys(req, cfg) @@ -43,6 +57,17 @@ func Setup(servMux *http.ServeMux, cfg config.FederationAPI) { v2keysmux.Handle("/server/{keyID}", localKeys) v2keysmux.Handle("/server/", localKeys) + v1fedmux.Handle("/send/{txnID}/", makeAPI("send", + func(req *http.Request) util.JSONResponse { + vars := mux.Vars(req) + return writers.Send( + req, gomatrixserverlib.TransactionID(vars["txnID"]), + time.Now(), + cfg, query, producer, keys, federation, + ) + }, + )) + servMux.Handle("/metrics", prometheus.Handler()) servMux.Handle("/api/", http.StripPrefix("/api", apiMux)) } diff --git a/src/github.com/matrix-org/dendrite/federationapi/writers/send.go b/src/github.com/matrix-org/dendrite/federationapi/writers/send.go new file mode 100644 index 000000000..14e1fae6c --- /dev/null +++ b/src/github.com/matrix-org/dendrite/federationapi/writers/send.go @@ -0,0 +1,211 @@ +package writers + +import ( + "encoding/json" + "fmt" + "github.com/matrix-org/dendrite/clientapi/httputil" + "github.com/matrix-org/dendrite/clientapi/jsonerror" + "github.com/matrix-org/dendrite/clientapi/producers" + "github.com/matrix-org/dendrite/federationapi/config" + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" + "net/http" + "time" +) + +// Send implements /_matrix/federation/v1/send/{txnID} +func Send( + req *http.Request, + txnID gomatrixserverlib.TransactionID, + now time.Time, + cfg config.FederationAPI, + query api.RoomserverQueryAPI, + producer *producers.RoomserverProducer, + keys gomatrixserverlib.KeyRing, + federation *gomatrixserverlib.FederationClient, +) util.JSONResponse { + request, errResp := gomatrixserverlib.VerifyHTTPRequest(req, now, cfg.ServerName, keys) + if request == nil { + return errResp + } + + t := txnReq{ + query: query, + producer: producer, + keys: keys, + federation: federation, + } + if err := json.Unmarshal(request.Content(), &t); err != nil { + return util.JSONResponse{ + Code: 400, + JSON: jsonerror.BadJSON("The request body could not be decoded into valid JSON. " + err.Error()), + } + } + + t.Origin = request.Origin() + t.TransactionID = txnID + t.Destination = cfg.ServerName + + resp, err := t.processTransaction() + if err != nil { + return httputil.LogThenError(req, err) + } + + return util.JSONResponse{ + Code: 200, + JSON: resp, + } +} + +type txnReq struct { + gomatrixserverlib.Transaction + query api.RoomserverQueryAPI + producer *producers.RoomserverProducer + keys gomatrixserverlib.KeyRing + federation *gomatrixserverlib.FederationClient +} + +func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) { + // Check the event signatures + if err := gomatrixserverlib.VerifyEventSignatures(t.PDUs, t.keys); err != nil { + return nil, err + } + + // Process the events. + results := map[string]gomatrixserverlib.PDUResult{} + for _, e := range t.PDUs { + err := t.processEvent(e) + if err != nil { + // If the error is due to the event itself being bad then we skip + // it and move onto the next event. We report an error so that the + // sender knows that we have skipped processing it. + // + // However if the event is due to a temporary failure in our server + // such as a database being unavailable then we should bail, and + // hope that the sender will retry when we are feeling better. + // + // It is uncertain what we should do if an event fails because + // we failed to fetch more information from the sending server. + // For example if a request to /state fails. + // If we skip the event then we risk missing the event until we + // receive another event referencing it. + // If we bail and stop processing then we risk wedging incoming + // transactions from that server forever. + switch err.(type) { + case unknownRoomError: + case *gomatrixserverlib.NotAllowed: + default: + // Any other error should be the result of a temporary error in + // our server so we should bail processing the transaction entirely. + return nil, err + } + results[e.EventID()] = gomatrixserverlib.PDUResult{err.Error()} + } else { + results[e.EventID()] = gomatrixserverlib.PDUResult{} + } + } + + // TODO: Process the EDUs. + + return &gomatrixserverlib.RespSend{PDUs: results}, nil +} + +type unknownRoomError struct { + roomID string +} + +func (e unknownRoomError) Error() string { return fmt.Sprintf("unknown room %q", e.roomID) } + +func (t *txnReq) processEvent(e gomatrixserverlib.Event) error { + refs := e.PrevEvents() + prevEventIDs := make([]string, len(refs)) + for i := range refs { + prevEventIDs[i] = refs[i].EventID + } + + // Fetch the state needed to authenticate the event. + needed := gomatrixserverlib.StateNeededForAuth([]gomatrixserverlib.Event{e}) + stateReq := api.QueryStateAfterEventsRequest{ + RoomID: e.RoomID(), + PrevEventIDs: prevEventIDs, + StateToFetch: needed.Tuples(), + } + var stateResp api.QueryStateAfterEventsResponse + if err := t.query.QueryStateAfterEvents(&stateReq, &stateResp); err != nil { + return err + } + + if !stateResp.RoomExists { + // TODO: When synapse receives a message for a room it is not in it + // asks the remote server for the state of the room so that it can + // check if the remote server knows of a join "m.room.member" event + // that this server is unaware of. + // However generally speaking we should reject events for rooms we + // aren't a member of. + return unknownRoomError{e.RoomID()} + } + + if !stateResp.PrevEventsExist { + return t.processEventWithMissingState(e) + } + + // Check that the event is allowed by the state at the event. + if err := checkAllowedByState(e, stateResp.StateEvents); err != nil { + return err + } + + // TODO: Check that the roomserver has a copy of all of the auth_events. + // TODO: Check that the event is allowed by its auth_events. + + // pass the event to the roomserver + if err := t.producer.SendEvents([]gomatrixserverlib.Event{e}); err != nil { + return err + } + + return nil +} + +func checkAllowedByState(e gomatrixserverlib.Event, stateEvents []gomatrixserverlib.Event) error { + authUsingState := gomatrixserverlib.NewAuthEvents(nil) + for i := range stateEvents { + authUsingState.AddEvent(&stateEvents[i]) + } + return gomatrixserverlib.Allowed(e, &authUsingState) +} + +func (t *txnReq) processEventWithMissingState(e gomatrixserverlib.Event) error { + // We are missing the previous events for this events. + // This means that there is a gap in our view of the history of the + // room. There two ways that we can handle such a gap: + // 1) We can fill in the gap using /get_missing_events + // 2) We can leave the gap and request the state of the room at + // this event from the remote server using either /state_ids + // or /state. + // Synapse will attempt to do 1 and if that fails or if the gap is + // too large then it will attempt 2. + // Synapse will use /state_ids if possible since ususally the state + // is largely unchanged and it is more efficient to fetch a list of + // event ids and then use /event to fetch the individual events. + // However not all version of synapse support /state_ids so you may + // need to fallback to /state. + // TODO: Attempt to fill in the gap using /get_missing_events + // TODO: Attempt to fetch the state using /state_ids and /events + state, err := t.federation.LookupState(t.Origin, e.RoomID(), e.EventID()) + if err != nil { + return err + } + // Check that the returned state is valid. + if err := state.Check(t.keys); err != nil { + return err + } + // Check that the event is allowed by the state. + if err := checkAllowedByState(e, state.StateEvents); err != nil { + return err + } + // pass the event along with the state to the roomserver + if err := t.producer.SendEventWithState(state, e); err != nil { + return err + } + return nil +} diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/events.go b/src/github.com/matrix-org/dendrite/roomserver/input/events.go index 513d10ce5..b379b3aeb 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/events.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/events.go @@ -28,7 +28,7 @@ type RoomEventDatabase interface { StoreEvent(event gomatrixserverlib.Event, authEventNIDs []types.EventNID) (types.RoomNID, types.StateAtEvent, error) // Lookup the state entries for a list of string event IDs // Returns an error if the there is an error talking to the database - // or if the event IDs aren't in the database. + // Returns a types.MissingEventError if the event IDs aren't in the database. StateEntriesForEventIDs(eventIDs []string) ([]types.StateEntry, error) // Set the state at an event. SetState(eventNID types.EventNID, stateNID types.StateSnapshotNID) error diff --git a/src/github.com/matrix-org/dendrite/roomserver/query/query.go b/src/github.com/matrix-org/dendrite/roomserver/query/query.go index 6f236e938..1b1820f0a 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/query/query.go +++ b/src/github.com/matrix-org/dendrite/roomserver/query/query.go @@ -97,9 +97,12 @@ func (r *RoomserverQueryAPI) QueryStateAfterEvents( prevStates, err := r.DB.StateAtEventIDs(request.PrevEventIDs) if err != nil { - // TODO: Check if the error was because we are missing events from the - // database or are missing state at events from the database. - return err + switch err.(type) { + case types.MissingEventError: + return nil + default: + return err + } } response.PrevEventsExist = true diff --git a/src/github.com/matrix-org/dendrite/roomserver/state/state.go b/src/github.com/matrix-org/dendrite/roomserver/state/state.go index 6b49aba64..b5454c0cd 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/state/state.go +++ b/src/github.com/matrix-org/dendrite/roomserver/state/state.go @@ -32,7 +32,7 @@ type RoomStateDatabase interface { AddState(roomNID types.RoomNID, stateBlockNIDs []types.StateBlockNID, state []types.StateEntry) (types.StateSnapshotNID, error) // Lookup the state of a room at each event for a list of string event IDs. // Returns an error if there is an error talking to the database - // or if the room state for the event IDs aren't in the database + // Returns a types.MissingEventError if the room state for the event IDs aren't in the database StateAtEventIDs(eventIDs []string) ([]types.StateAtEvent, error) // Lookup the numeric IDs for a list of string event types. // Returns a map from string event type to numeric ID for the event type. diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/events_table.go b/src/github.com/matrix-org/dendrite/roomserver/storage/events_table.go index acaf43b6d..a2813ae97 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/events_table.go +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/events_table.go @@ -166,6 +166,8 @@ func (s *eventStatements) selectEvent(eventID string) (types.EventNID, types.Sta return types.EventNID(eventNID), types.StateSnapshotNID(stateNID), err } +// bulkSelectStateEventByID lookups a list of state events by event ID. +// If any of the requested events are missing from the database it returns a types.MissingEventError func (s *eventStatements) bulkSelectStateEventByID(eventIDs []string) ([]types.StateEntry, error) { rows, err := s.bulkSelectStateEventByIDStmt.Query(pq.StringArray(eventIDs)) if err != nil { @@ -194,11 +196,16 @@ func (s *eventStatements) bulkSelectStateEventByID(eventIDs []string) ([]types.S // However it should be possible debug this by replaying queries or entries from the input kafka logs. // If this turns out to be impossible and we do need the debug information here, it would be better // to do it as a separate query rather than slowing down/complicating the common case. - return nil, fmt.Errorf("storage: state event IDs missing from the database (%d != %d)", i, len(eventIDs)) + return nil, types.MissingEventError( + fmt.Sprintf("storage: state event IDs missing from the database (%d != %d)", i, len(eventIDs)), + ) } return results, err } +// bulkSelectStateAtEventByID lookups the state at a list of events by event ID. +// If any of the requested events are missing from the database it returns a types.MissingEventError. +// If we do not have the state for any of the requested events it returns a types.MissingEventError. func (s *eventStatements) bulkSelectStateAtEventByID(eventIDs []string) ([]types.StateAtEvent, error) { rows, err := s.bulkSelectStateAtEventByIDStmt.Query(pq.StringArray(eventIDs)) if err != nil { @@ -218,11 +225,15 @@ func (s *eventStatements) bulkSelectStateAtEventByID(eventIDs []string) ([]types return nil, err } if result.BeforeStateSnapshotNID == 0 { - return nil, fmt.Errorf("storage: missing state for event NID %d", result.EventNID) + return nil, types.MissingEventError( + fmt.Sprintf("storage: missing state for event NID %d", result.EventNID), + ) } } if i != len(eventIDs) { - return nil, fmt.Errorf("storage: event IDs missing from the database (%d != %d)", i, len(eventIDs)) + return nil, types.MissingEventError( + fmt.Sprintf("storage: event IDs missing from the database (%d != %d)", i, len(eventIDs)), + ) } return results, err } diff --git a/src/github.com/matrix-org/dendrite/roomserver/types/types.go b/src/github.com/matrix-org/dendrite/roomserver/types/types.go index f728696bf..b255b64b9 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/types/types.go +++ b/src/github.com/matrix-org/dendrite/roomserver/types/types.go @@ -168,3 +168,9 @@ type RoomRecentEventsUpdater interface { // Rollback the transaction. Rollback() error } + +// A MissingEventError is an error that happened because the roomserver was +// missing requested events from its database. +type MissingEventError string + +func (e MissingEventError) Error() string { return string(e) } From b184a488971a24cb738ccfaa2a88e9651889a43e Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 7 Jun 2017 16:35:41 +0100 Subject: [PATCH 2/2] Fetching missing state from the roomserver. (#135) * Fetching missing state from the roomserver. Whenever the syncserver receives an event from the room server that adds state that isn't in the syncserver's local database it should fetch those state events from the roomserver. * Fix append * Put comment back * Comments * s/addsStateEvents/lookupStateEvents/ * Fix spelling * Include the stream position that a state event was added at in the current state tables * Fix comment * Review comments --- .../dendrite/syncapi/config/config.go | 2 + .../dendrite/syncapi/consumers/roomserver.go | 88 ++++++++++++++++++- .../storage/current_room_state_table.go | 55 +++++++++--- .../storage/output_room_events_table.go | 21 ++--- .../dendrite/syncapi/storage/syncserver.go | 82 ++++++++++++----- 5 files changed, 200 insertions(+), 48 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/syncapi/config/config.go b/src/github.com/matrix-org/dendrite/syncapi/config/config.go index 433a7a2b6..32f047229 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/config/config.go +++ b/src/github.com/matrix-org/dendrite/syncapi/config/config.go @@ -20,6 +20,8 @@ import ( // Sync contains the config information necessary to spin up a sync-server process. type Sync struct { + // Where the room server is listening for queries. + RoomserverURL string `yaml:"roomserver_url"` // The topic for events which are written by the room server output log. RoomserverOutputTopic string `yaml:"roomserver_topic"` // A list of URIs to consume events from. These kafka logs should be produced by a Room Server. diff --git a/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go index b8ec98d2c..31c48ac57 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go @@ -16,6 +16,7 @@ package consumers import ( "encoding/json" + "fmt" log "github.com/Sirupsen/logrus" "github.com/matrix-org/dendrite/common" @@ -33,6 +34,7 @@ type OutputRoomEvent struct { roomServerConsumer *common.ContinualConsumer db *storage.SyncServerDatabase notifier *sync.Notifier + query api.RoomserverQueryAPI } // NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers. @@ -51,6 +53,7 @@ func NewOutputRoomEvent(cfg *config.Sync, n *sync.Notifier, store *storage.SyncS roomServerConsumer: &consumer, db: store, notifier: n, + query: api.NewRoomserverQueryAPIHTTP(cfg.RoomserverURL, nil), } consumer.ProcessMessage = s.onMessage @@ -84,7 +87,19 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error { "room_id": ev.RoomID(), }).Info("received event from roomserver") - syncStreamPos, err := s.db.WriteEvent(&ev, output.AddsStateEventIDs, output.RemovesStateEventIDs) + addsStateEvents, err := s.lookupStateEvents(output.AddsStateEventIDs, ev) + if err != nil { + log.WithFields(log.Fields{ + "event": string(ev.JSON()), + log.ErrorKey: err, + "add": output.AddsStateEventIDs, + "del": output.RemovesStateEventIDs, + }).Panicf("roomserver output log: state event lookup failure") + } + + syncStreamPos, err := s.db.WriteEvent( + &ev, addsStateEvents, output.AddsStateEventIDs, output.RemovesStateEventIDs, + ) if err != nil { // panic rather than continue with an inconsistent database @@ -100,3 +115,74 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error { return nil } + +// lookupStateEvents looks up the state events that are added by a new event. +func (s *OutputRoomEvent) lookupStateEvents( + addsStateEventIDs []string, event gomatrixserverlib.Event, +) ([]gomatrixserverlib.Event, error) { + // Fast path if there aren't any new state events. + if len(addsStateEventIDs) == 0 { + return nil, nil + } + + // Fast path if the only state event added is the event itself. + if len(addsStateEventIDs) == 1 && addsStateEventIDs[0] == event.EventID() { + return []gomatrixserverlib.Event{event}, nil + } + + // Check if this is re-adding a state events that we previously processed + // If we have previously received a state event it may still be in + // our event database. + result, err := s.db.Events(addsStateEventIDs) + if err != nil { + return nil, err + } + missing := missingEventsFrom(result, addsStateEventIDs) + + // Check if event itself is being added. + for _, eventID := range missing { + if eventID == event.EventID() { + result = append(result, event) + break + } + } + missing = missingEventsFrom(result, addsStateEventIDs) + + if len(missing) == 0 { + return result, nil + } + + // At this point the missing events are neither the event itself nor are + // they present in our local database. Our only option is to fetch them + // from the roomserver using the query API. + eventReq := api.QueryEventsByIDRequest{EventIDs: missing} + var eventResp api.QueryEventsByIDResponse + if err := s.query.QueryEventsByID(&eventReq, &eventResp); err != nil { + return nil, err + } + + result = append(result, eventResp.Events...) + missing = missingEventsFrom(result, addsStateEventIDs) + + if len(missing) != 0 { + return nil, fmt.Errorf( + "missing %d state events IDs at event %q", len(missing), event.EventID(), + ) + } + + return result, nil +} + +func missingEventsFrom(events []gomatrixserverlib.Event, required []string) []string { + have := map[string]bool{} + for _, event := range events { + have[event.EventID()] = true + } + var missing []string + for _, eventID := range required { + if !have[eventID] { + missing = append(missing, eventID) + } + } + return missing +} diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go index 28389ad94..d4f260e00 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go @@ -16,6 +16,7 @@ package storage import ( "database/sql" + "github.com/lib/pq" "github.com/matrix-org/gomatrixserverlib" ) @@ -35,6 +36,9 @@ CREATE TABLE IF NOT EXISTS current_room_state ( -- The 'content.membership' value if this event is an m.room.member event. For other -- events, this will be NULL. membership TEXT, + -- The serial ID of the output_room_events table when this event became + -- part of the current state of the room. + added_at BIGINT, -- Clobber based on 3-uple of room_id, type and state_key CONSTRAINT room_state_unique UNIQUE (room_id, type, state_key) ); @@ -45,9 +49,10 @@ CREATE INDEX IF NOT EXISTS membership_idx ON current_room_state(type, state_key, ` const upsertRoomStateSQL = "" + - "INSERT INTO current_room_state (room_id, event_id, type, state_key, event_json, membership) VALUES ($1, $2, $3, $4, $5, $6)" + + "INSERT INTO current_room_state (room_id, event_id, type, state_key, event_json, membership, added_at)" + + " VALUES ($1, $2, $3, $4, $5, $6, $7)" + " ON CONFLICT ON CONSTRAINT room_state_unique" + - " DO UPDATE SET event_id = $2, event_json = $5, membership = $6" + " DO UPDATE SET event_id = $2, event_json = $5, membership = $6, added_at = $7" const deleteRoomStateByEventIDSQL = "" + "DELETE FROM current_room_state WHERE event_id = $1" @@ -61,12 +66,16 @@ const selectCurrentStateSQL = "" + const selectJoinedUsersSQL = "" + "SELECT room_id, state_key FROM current_room_state WHERE type = 'm.room.member' AND membership = 'join'" +const selectEventsWithEventIDsSQL = "" + + "SELECT added_at, event_json FROM current_room_state WHERE event_id = ANY($1)" + type currentRoomStateStatements struct { upsertRoomStateStmt *sql.Stmt deleteRoomStateByEventIDStmt *sql.Stmt selectRoomIDsWithMembershipStmt *sql.Stmt selectCurrentStateStmt *sql.Stmt selectJoinedUsersStmt *sql.Stmt + selectEventsWithEventIDsStmt *sql.Stmt } func (s *currentRoomStateStatements) prepare(db *sql.DB) (err error) { @@ -89,6 +98,9 @@ func (s *currentRoomStateStatements) prepare(db *sql.DB) (err error) { if s.selectJoinedUsersStmt, err = db.Prepare(selectJoinedUsersSQL); err != nil { return } + if s.selectEventsWithEventIDsStmt, err = db.Prepare(selectEventsWithEventIDsSQL); err != nil { + return + } return } @@ -141,6 +153,33 @@ func (s *currentRoomStateStatements) selectCurrentState(txn *sql.Tx, roomID stri } defer rows.Close() + return rowsToEvents(rows) +} + +func (s *currentRoomStateStatements) deleteRoomStateByEventID(txn *sql.Tx, eventID string) error { + _, err := txn.Stmt(s.deleteRoomStateByEventIDStmt).Exec(eventID) + return err +} + +func (s *currentRoomStateStatements) upsertRoomState( + txn *sql.Tx, event gomatrixserverlib.Event, membership *string, addedAt int64, +) error { + _, err := txn.Stmt(s.upsertRoomStateStmt).Exec( + event.RoomID(), event.EventID(), event.Type(), *event.StateKey(), event.JSON(), membership, addedAt, + ) + return err +} + +func (s *currentRoomStateStatements) selectEventsWithEventIDs(txn *sql.Tx, eventIDs []string) ([]streamEvent, error) { + rows, err := txn.Stmt(s.selectEventsWithEventIDsStmt).Query(pq.StringArray(eventIDs)) + if err != nil { + return nil, err + } + defer rows.Close() + return rowsToStreamEvents(rows) +} + +func rowsToEvents(rows *sql.Rows) ([]gomatrixserverlib.Event, error) { var result []gomatrixserverlib.Event for rows.Next() { var eventBytes []byte @@ -156,15 +195,3 @@ func (s *currentRoomStateStatements) selectCurrentState(txn *sql.Tx, roomID stri } return result, nil } - -func (s *currentRoomStateStatements) deleteRoomStateByEventID(txn *sql.Tx, eventID string) error { - _, err := txn.Stmt(s.deleteRoomStateByEventIDStmt).Exec(eventID) - return err -} - -func (s *currentRoomStateStatements) upsertRoomState(txn *sql.Tx, event gomatrixserverlib.Event, membership *string) error { - _, err := txn.Stmt(s.upsertRoomStateStmt).Exec( - event.RoomID(), event.EventID(), event.Type(), *event.StateKey(), event.JSON(), membership, - ) - return err -} diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go index b3cc39255..6c8a52635 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go @@ -16,7 +16,6 @@ package storage import ( "database/sql" - "fmt" log "github.com/Sirupsen/logrus" "github.com/lib/pq" @@ -193,7 +192,7 @@ func (s *outputRoomEventsStatements) selectRecentEvents( return nil, err } defer rows.Close() - events, err := rowsToEvents(rows) + events, err := rowsToStreamEvents(rows) if err != nil { return nil, err } @@ -205,23 +204,19 @@ func (s *outputRoomEventsStatements) selectRecentEvents( // Events returns the events for the given event IDs. Returns an error if any one of the event IDs given are missing // from the database. func (s *outputRoomEventsStatements) selectEvents(txn *sql.Tx, eventIDs []string) ([]streamEvent, error) { - rows, err := txn.Stmt(s.selectEventsStmt).Query(pq.StringArray(eventIDs)) + stmt := s.selectEventsStmt + if txn != nil { + stmt = txn.Stmt(stmt) + } + rows, err := stmt.Query(pq.StringArray(eventIDs)) if err != nil { return nil, err } defer rows.Close() - result, err := rowsToEvents(rows) - if err != nil { - return nil, err - } - - if len(result) != len(eventIDs) { - return nil, fmt.Errorf("failed to map all event IDs to events: (got %d, wanted %d)", len(result), len(eventIDs)) - } - return result, nil + return rowsToStreamEvents(rows) } -func rowsToEvents(rows *sql.Rows) ([]streamEvent, error) { +func rowsToStreamEvents(rows *sql.Rows) ([]streamEvent, error) { var result []streamEvent for rows.Next() { var ( diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go index dee0b51c2..ce2e7f234 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go @@ -17,6 +17,7 @@ package storage import ( "database/sql" "encoding/json" + "fmt" // Import the postgres database driver. _ "github.com/lib/pq" "github.com/matrix-org/dendrite/clientapi/events" @@ -75,10 +76,24 @@ func (d *SyncServerDatabase) AllJoinedUsersInRooms() (map[string][]string, error return d.roomstate.selectJoinedUsers() } +// Events lookups a list of event by their event ID. +// Returns a list of events matching the requested IDs found in the database. +// If an event is not found in the database then it will be omitted from the list. +// Returns an error if there was a problem talking with the database +func (d *SyncServerDatabase) Events(eventIDs []string) ([]gomatrixserverlib.Event, error) { + streamEvents, err := d.events.selectEvents(nil, eventIDs) + if err != nil { + return nil, err + } + return streamEventsToEvents(streamEvents), nil +} + // WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races // when generating the stream position for this event. Returns the sync stream position for the inserted event. // Returns an error if there was a problem inserting this event. -func (d *SyncServerDatabase) WriteEvent(ev *gomatrixserverlib.Event, addStateEventIDs, removeStateEventIDs []string) (streamPos types.StreamPosition, returnErr error) { +func (d *SyncServerDatabase) WriteEvent( + ev *gomatrixserverlib.Event, addStateEvents []gomatrixserverlib.Event, addStateEventIDs, removeStateEventIDs []string, +) (streamPos types.StreamPosition, returnErr error) { returnErr = runTransaction(d.db, func(txn *sql.Tx) error { var err error pos, err := d.events.insertEvent(txn, ev, addStateEventIDs, removeStateEventIDs) @@ -87,31 +102,19 @@ func (d *SyncServerDatabase) WriteEvent(ev *gomatrixserverlib.Event, addStateEve } streamPos = types.StreamPosition(pos) - if len(addStateEventIDs) == 0 && len(removeStateEventIDs) == 0 { + if len(addStateEvents) == 0 && len(removeStateEventIDs) == 0 { // Nothing to do, the event may have just been a message event. return nil } - // Update the current room state based on the added/removed state event IDs. - // In the common case there is a single added event ID which is the state event itself, assuming `ev` is a state event. - // However, conflict resolution may result in there being different events being added, or even some removed. - if len(removeStateEventIDs) == 0 && len(addStateEventIDs) == 1 && addStateEventIDs[0] == ev.EventID() { - // common case - return d.updateRoomState(txn, nil, []gomatrixserverlib.Event{*ev}) - } - - // uncommon case: we need to fetch the full event for each event ID mentioned, then update room state - added, err := d.events.selectEvents(txn, addStateEventIDs) - if err != nil { - return err - } - - return d.updateRoomState(txn, removeStateEventIDs, streamEventsToEvents(added)) + return d.updateRoomState(txn, removeStateEventIDs, addStateEvents, streamPos) }) return } -func (d *SyncServerDatabase) updateRoomState(txn *sql.Tx, removedEventIDs []string, addedEvents []gomatrixserverlib.Event) error { +func (d *SyncServerDatabase) updateRoomState( + txn *sql.Tx, removedEventIDs []string, addedEvents []gomatrixserverlib.Event, streamPos types.StreamPosition, +) error { // remove first, then add, as we do not ever delete state, but do replace state which is a remove followed by an add. for _, eventID := range removedEventIDs { if err := d.roomstate.deleteRoomStateByEventID(txn, eventID); err != nil { @@ -132,7 +135,7 @@ func (d *SyncServerDatabase) updateRoomState(txn *sql.Tx, removedEventIDs []stri } membership = &memberContent.Membership } - if err := d.roomstate.upsertRoomState(txn, event, membership); err != nil { + if err := d.roomstate.upsertRoomState(txn, event, membership, int64(streamPos)); err != nil { return err } } @@ -310,7 +313,7 @@ func (d *SyncServerDatabase) fetchStateEvents(txn *sql.Tx, roomIDToEventIDSet ma for _, missingEvIDs := range missingEvents { allMissingEventIDs = append(allMissingEventIDs, missingEvIDs...) } - evs, err := d.events.selectEvents(txn, allMissingEventIDs) + evs, err := d.fetchMissingStateEvents(txn, allMissingEventIDs) if err != nil { return nil, err } @@ -323,6 +326,45 @@ func (d *SyncServerDatabase) fetchStateEvents(txn *sql.Tx, roomIDToEventIDSet ma return stateBetween, nil } +func (d *SyncServerDatabase) fetchMissingStateEvents(txn *sql.Tx, eventIDs []string) ([]streamEvent, error) { + // Fetch from the events table first so we pick up the stream ID for the + // event. + events, err := d.events.selectEvents(txn, eventIDs) + if err != nil { + return nil, err + } + + have := map[string]bool{} + for _, event := range events { + have[event.EventID()] = true + } + var missing []string + for _, eventID := range eventIDs { + if !have[eventID] { + missing = append(missing, eventID) + } + } + if len(missing) == 0 { + return events, nil + } + + // If they are missing from the events table then they should be state + // events that we received from outside the main event stream. + // These should be in the room state table. + stateEvents, err := d.roomstate.selectEventsWithEventIDs(txn, missing) + + if err != nil { + return nil, err + } + if len(stateEvents) != len(missing) { + return nil, fmt.Errorf("failed to map all event IDs to events: (got %d, wanted %d)", len(stateEvents), len(missing)) + } + for _, e := range stateEvents { + events = append(events, e) + } + return events, nil +} + func (d *SyncServerDatabase) getStateDeltas(txn *sql.Tx, fromPos, toPos types.StreamPosition, userID string) ([]stateDelta, error) { // Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821 // - Get membership list changes for this user in this sync response