diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-federation-api-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-federation-api-server/main.go index 15bf5bc47..268eecbec 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-federation-api-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-federation-api-server/main.go @@ -18,11 +18,14 @@ import ( "encoding/base64" "net/http" "os" + "strings" "time" + "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/federationapi/config" "github.com/matrix-org/dendrite/federationapi/routing" + "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" log "github.com/Sirupsen/logrus" @@ -40,7 +43,10 @@ var ( // openssl x509 -noout -fingerprint -sha256 -inform pem -in server.crt |\ // python -c 'print raw_input()[19:].replace(":","").decode("hex").encode("base64").rstrip("=\n")' // - tlsFingerprint = os.Getenv("TLS_FINGERPRINT") + tlsFingerprint = os.Getenv("TLS_FINGERPRINT") + kafkaURIs = strings.Split(os.Getenv("KAFKA_URIS"), ",") + roomserverURL = os.Getenv("ROOMSERVER_URL") + roomserverInputTopic = os.Getenv("TOPIC_INPUT_ROOM_EVENT") ) func main() { @@ -57,6 +63,18 @@ func main() { log.Panic("No TLS_FINGERPRINT environment variable found.") } + if len(kafkaURIs) == 0 { + // the kafka default is :9092 + kafkaURIs = []string{"localhost:9092"} + } + + if roomserverURL == "" { + log.Panic("No ROOMSERVER_URL environment variable found.") + } + + if roomserverInputTopic == "" { + log.Panic("No TOPIC_INPUT_ROOM_EVENT environment variable found. This should match the roomserver input topic.") + } cfg := config.FederationAPI{ ServerName: serverName, // TODO: make the validity period configurable. @@ -75,6 +93,37 @@ func main() { } cfg.TLSFingerPrints = []gomatrixserverlib.TLSFingerprint{{fingerprintSHA256}} - routing.Setup(http.DefaultServeMux, cfg) + federation := gomatrixserverlib.NewFederationClient(cfg.ServerName, cfg.KeyID, cfg.PrivateKey) + + keyRing := gomatrixserverlib.KeyRing{ + KeyFetchers: []gomatrixserverlib.KeyFetcher{ + // TODO: Use perspective key fetchers for production. + &gomatrixserverlib.DirectKeyFetcher{federation.Client}, + }, + KeyDatabase: &dummyKeyDatabase{}, + } + queryAPI := api.NewRoomserverQueryAPIHTTP(roomserverURL, nil) + + roomserverProducer, err := producers.NewRoomserverProducer(kafkaURIs, roomserverInputTopic) + if err != nil { + log.Panicf("Failed to setup kafka producers(%s): %s", kafkaURIs, err) + } + + routing.Setup(http.DefaultServeMux, cfg, queryAPI, roomserverProducer, keyRing, federation) log.Fatal(http.ListenAndServe(bindAddr, nil)) } + +// TODO: Implement a proper key database. +type dummyKeyDatabase struct{} + +func (d *dummyKeyDatabase) FetchKeys( + requests map[gomatrixserverlib.PublicKeyRequest]gomatrixserverlib.Timestamp, +) (map[gomatrixserverlib.PublicKeyRequest]gomatrixserverlib.ServerKeys, error) { + return nil, nil +} + +func (d *dummyKeyDatabase) StoreKeys( + map[gomatrixserverlib.PublicKeyRequest]gomatrixserverlib.ServerKeys, +) error { + return nil +} diff --git a/src/github.com/matrix-org/dendrite/cmd/federation-api-proxy/main.go b/src/github.com/matrix-org/dendrite/cmd/federation-api-proxy/main.go new file mode 100644 index 000000000..ed0aed722 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/cmd/federation-api-proxy/main.go @@ -0,0 +1,124 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "flag" + "fmt" + log "github.com/Sirupsen/logrus" + "net/http" + "net/http/httputil" + "net/url" + "os" + "strings" + "time" +) + +const usage = `Usage: %s + +Create a single endpoint URL which remote matrix servers can be pointed at. + +The server-server API in Dendrite is split across multiple processes +which listen on multiple ports. You cannot point a Matrix server at +any of those ports, as there will be unimplemented functionality. +In addition, all server-server API processes start with the additional +path prefix '/api', which Matrix servers will be unaware of. + +This tool will proxy requests for all server-server URLs and forward +them to their respective process. It will also add the '/api' path +prefix to incoming requests. + +THIS TOOL IS FOR TESTING AND NOT INTENDED FOR PRODUCTION USE. + +Arguments: + +` + +var ( + federationAPIURL = flag.String("federation-api-url", "", "The base URL of the listening 'dendrite-federation-api-server' process. E.g. 'http://localhost:4200'") + bindAddress = flag.String("bind-address", ":8448", "The listening port for the proxy.") + certFile = flag.String("tls-cert", "server.crt", "The PEM formatted X509 certificate to use for TLS") + keyFile = flag.String("tls-key", "server.key", "The PEM private key to use for TLS") +) + +func makeProxy(targetURL string) (*httputil.ReverseProxy, error) { + if !strings.HasSuffix(targetURL, "/") { + targetURL += "/" + } + // Check that we can parse the URL. + _, err := url.Parse(targetURL) + if err != nil { + return nil, err + } + return &httputil.ReverseProxy{ + Director: func(req *http.Request) { + // URL.Path() removes the % escaping from the path. + // The % encoding will be added back when the url is encoded + // when the request is forwarded. + // This means that we will lose any unessecary escaping from the URL. + // Pratically this means that any distinction between '%2F' and '/' + // in the URL will be lost by the time it reaches the target. + path := req.URL.Path + path = "api" + path + log.WithFields(log.Fields{ + "path": path, + "url": targetURL, + "method": req.Method, + }).Print("proxying request") + newURL, err := url.Parse(targetURL + path) + if err != nil { + // We already checked that we can parse the URL + // So this shouldn't ever get hit. + panic(err) + } + // Copy the query parameters from the request. + newURL.RawQuery = req.URL.RawQuery + req.URL = newURL + }, + }, nil +} + +func main() { + flag.Usage = func() { + fmt.Fprintf(os.Stderr, usage, os.Args[0]) + flag.PrintDefaults() + } + + flag.Parse() + + if *federationAPIURL == "" { + flag.Usage() + fmt.Fprintln(os.Stderr, "no --federation-api-url specified.") + os.Exit(1) + } + + federationProxy, err := makeProxy(*federationAPIURL) + if err != nil { + panic(err) + } + + http.Handle("/", federationProxy) + + srv := &http.Server{ + Addr: *bindAddress, + ReadTimeout: 1 * time.Minute, // how long we wait for the client to send the entire request (after connection accept) + WriteTimeout: 5 * time.Minute, // how long the proxy has to write the full response + } + + fmt.Println("Proxying requests to:") + fmt.Println(" /* => ", *federationAPIURL+"/api/*") + fmt.Println("Listening on ", *bindAddress) + panic(srv.ListenAndServeTLS(*certFile, *keyFile)) +} diff --git a/src/github.com/matrix-org/dendrite/federationapi/routing/routing.go b/src/github.com/matrix-org/dendrite/federationapi/routing/routing.go index b5af66dc6..0f20d0115 100644 --- a/src/github.com/matrix-org/dendrite/federationapi/routing/routing.go +++ b/src/github.com/matrix-org/dendrite/federationapi/routing/routing.go @@ -16,21 +16,35 @@ package routing import ( "github.com/gorilla/mux" + "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/federationapi/config" "github.com/matrix-org/dendrite/federationapi/readers" + "github.com/matrix-org/dendrite/federationapi/writers" + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" "github.com/prometheus/client_golang/prometheus" "net/http" + "time" ) const ( - pathPrefixV2Keys = "/_matrix/key/v2" + pathPrefixV2Keys = "/_matrix/key/v2" + pathPrefixV1Federation = "/_matrix/federation/v1" ) // Setup registers HTTP handlers with the given ServeMux. -func Setup(servMux *http.ServeMux, cfg config.FederationAPI) { +func Setup( + servMux *http.ServeMux, + cfg config.FederationAPI, + query api.RoomserverQueryAPI, + producer *producers.RoomserverProducer, + keys gomatrixserverlib.KeyRing, + federation *gomatrixserverlib.FederationClient, +) { apiMux := mux.NewRouter() v2keysmux := apiMux.PathPrefix(pathPrefixV2Keys).Subrouter() + v1fedmux := apiMux.PathPrefix(pathPrefixV1Federation).Subrouter() localKeys := makeAPI("localkeys", func(req *http.Request) util.JSONResponse { return readers.LocalKeys(req, cfg) @@ -43,6 +57,17 @@ func Setup(servMux *http.ServeMux, cfg config.FederationAPI) { v2keysmux.Handle("/server/{keyID}", localKeys) v2keysmux.Handle("/server/", localKeys) + v1fedmux.Handle("/send/{txnID}/", makeAPI("send", + func(req *http.Request) util.JSONResponse { + vars := mux.Vars(req) + return writers.Send( + req, gomatrixserverlib.TransactionID(vars["txnID"]), + time.Now(), + cfg, query, producer, keys, federation, + ) + }, + )) + servMux.Handle("/metrics", prometheus.Handler()) servMux.Handle("/api/", http.StripPrefix("/api", apiMux)) } diff --git a/src/github.com/matrix-org/dendrite/federationapi/writers/send.go b/src/github.com/matrix-org/dendrite/federationapi/writers/send.go new file mode 100644 index 000000000..14e1fae6c --- /dev/null +++ b/src/github.com/matrix-org/dendrite/federationapi/writers/send.go @@ -0,0 +1,211 @@ +package writers + +import ( + "encoding/json" + "fmt" + "github.com/matrix-org/dendrite/clientapi/httputil" + "github.com/matrix-org/dendrite/clientapi/jsonerror" + "github.com/matrix-org/dendrite/clientapi/producers" + "github.com/matrix-org/dendrite/federationapi/config" + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" + "net/http" + "time" +) + +// Send implements /_matrix/federation/v1/send/{txnID} +func Send( + req *http.Request, + txnID gomatrixserverlib.TransactionID, + now time.Time, + cfg config.FederationAPI, + query api.RoomserverQueryAPI, + producer *producers.RoomserverProducer, + keys gomatrixserverlib.KeyRing, + federation *gomatrixserverlib.FederationClient, +) util.JSONResponse { + request, errResp := gomatrixserverlib.VerifyHTTPRequest(req, now, cfg.ServerName, keys) + if request == nil { + return errResp + } + + t := txnReq{ + query: query, + producer: producer, + keys: keys, + federation: federation, + } + if err := json.Unmarshal(request.Content(), &t); err != nil { + return util.JSONResponse{ + Code: 400, + JSON: jsonerror.BadJSON("The request body could not be decoded into valid JSON. " + err.Error()), + } + } + + t.Origin = request.Origin() + t.TransactionID = txnID + t.Destination = cfg.ServerName + + resp, err := t.processTransaction() + if err != nil { + return httputil.LogThenError(req, err) + } + + return util.JSONResponse{ + Code: 200, + JSON: resp, + } +} + +type txnReq struct { + gomatrixserverlib.Transaction + query api.RoomserverQueryAPI + producer *producers.RoomserverProducer + keys gomatrixserverlib.KeyRing + federation *gomatrixserverlib.FederationClient +} + +func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) { + // Check the event signatures + if err := gomatrixserverlib.VerifyEventSignatures(t.PDUs, t.keys); err != nil { + return nil, err + } + + // Process the events. + results := map[string]gomatrixserverlib.PDUResult{} + for _, e := range t.PDUs { + err := t.processEvent(e) + if err != nil { + // If the error is due to the event itself being bad then we skip + // it and move onto the next event. We report an error so that the + // sender knows that we have skipped processing it. + // + // However if the event is due to a temporary failure in our server + // such as a database being unavailable then we should bail, and + // hope that the sender will retry when we are feeling better. + // + // It is uncertain what we should do if an event fails because + // we failed to fetch more information from the sending server. + // For example if a request to /state fails. + // If we skip the event then we risk missing the event until we + // receive another event referencing it. + // If we bail and stop processing then we risk wedging incoming + // transactions from that server forever. + switch err.(type) { + case unknownRoomError: + case *gomatrixserverlib.NotAllowed: + default: + // Any other error should be the result of a temporary error in + // our server so we should bail processing the transaction entirely. + return nil, err + } + results[e.EventID()] = gomatrixserverlib.PDUResult{err.Error()} + } else { + results[e.EventID()] = gomatrixserverlib.PDUResult{} + } + } + + // TODO: Process the EDUs. + + return &gomatrixserverlib.RespSend{PDUs: results}, nil +} + +type unknownRoomError struct { + roomID string +} + +func (e unknownRoomError) Error() string { return fmt.Sprintf("unknown room %q", e.roomID) } + +func (t *txnReq) processEvent(e gomatrixserverlib.Event) error { + refs := e.PrevEvents() + prevEventIDs := make([]string, len(refs)) + for i := range refs { + prevEventIDs[i] = refs[i].EventID + } + + // Fetch the state needed to authenticate the event. + needed := gomatrixserverlib.StateNeededForAuth([]gomatrixserverlib.Event{e}) + stateReq := api.QueryStateAfterEventsRequest{ + RoomID: e.RoomID(), + PrevEventIDs: prevEventIDs, + StateToFetch: needed.Tuples(), + } + var stateResp api.QueryStateAfterEventsResponse + if err := t.query.QueryStateAfterEvents(&stateReq, &stateResp); err != nil { + return err + } + + if !stateResp.RoomExists { + // TODO: When synapse receives a message for a room it is not in it + // asks the remote server for the state of the room so that it can + // check if the remote server knows of a join "m.room.member" event + // that this server is unaware of. + // However generally speaking we should reject events for rooms we + // aren't a member of. + return unknownRoomError{e.RoomID()} + } + + if !stateResp.PrevEventsExist { + return t.processEventWithMissingState(e) + } + + // Check that the event is allowed by the state at the event. + if err := checkAllowedByState(e, stateResp.StateEvents); err != nil { + return err + } + + // TODO: Check that the roomserver has a copy of all of the auth_events. + // TODO: Check that the event is allowed by its auth_events. + + // pass the event to the roomserver + if err := t.producer.SendEvents([]gomatrixserverlib.Event{e}); err != nil { + return err + } + + return nil +} + +func checkAllowedByState(e gomatrixserverlib.Event, stateEvents []gomatrixserverlib.Event) error { + authUsingState := gomatrixserverlib.NewAuthEvents(nil) + for i := range stateEvents { + authUsingState.AddEvent(&stateEvents[i]) + } + return gomatrixserverlib.Allowed(e, &authUsingState) +} + +func (t *txnReq) processEventWithMissingState(e gomatrixserverlib.Event) error { + // We are missing the previous events for this events. + // This means that there is a gap in our view of the history of the + // room. There two ways that we can handle such a gap: + // 1) We can fill in the gap using /get_missing_events + // 2) We can leave the gap and request the state of the room at + // this event from the remote server using either /state_ids + // or /state. + // Synapse will attempt to do 1 and if that fails or if the gap is + // too large then it will attempt 2. + // Synapse will use /state_ids if possible since ususally the state + // is largely unchanged and it is more efficient to fetch a list of + // event ids and then use /event to fetch the individual events. + // However not all version of synapse support /state_ids so you may + // need to fallback to /state. + // TODO: Attempt to fill in the gap using /get_missing_events + // TODO: Attempt to fetch the state using /state_ids and /events + state, err := t.federation.LookupState(t.Origin, e.RoomID(), e.EventID()) + if err != nil { + return err + } + // Check that the returned state is valid. + if err := state.Check(t.keys); err != nil { + return err + } + // Check that the event is allowed by the state. + if err := checkAllowedByState(e, state.StateEvents); err != nil { + return err + } + // pass the event along with the state to the roomserver + if err := t.producer.SendEventWithState(state, e); err != nil { + return err + } + return nil +} diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/events.go b/src/github.com/matrix-org/dendrite/roomserver/input/events.go index 513d10ce5..b379b3aeb 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/events.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/events.go @@ -28,7 +28,7 @@ type RoomEventDatabase interface { StoreEvent(event gomatrixserverlib.Event, authEventNIDs []types.EventNID) (types.RoomNID, types.StateAtEvent, error) // Lookup the state entries for a list of string event IDs // Returns an error if the there is an error talking to the database - // or if the event IDs aren't in the database. + // Returns a types.MissingEventError if the event IDs aren't in the database. StateEntriesForEventIDs(eventIDs []string) ([]types.StateEntry, error) // Set the state at an event. SetState(eventNID types.EventNID, stateNID types.StateSnapshotNID) error diff --git a/src/github.com/matrix-org/dendrite/roomserver/query/query.go b/src/github.com/matrix-org/dendrite/roomserver/query/query.go index 6f236e938..1b1820f0a 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/query/query.go +++ b/src/github.com/matrix-org/dendrite/roomserver/query/query.go @@ -97,9 +97,12 @@ func (r *RoomserverQueryAPI) QueryStateAfterEvents( prevStates, err := r.DB.StateAtEventIDs(request.PrevEventIDs) if err != nil { - // TODO: Check if the error was because we are missing events from the - // database or are missing state at events from the database. - return err + switch err.(type) { + case types.MissingEventError: + return nil + default: + return err + } } response.PrevEventsExist = true diff --git a/src/github.com/matrix-org/dendrite/roomserver/state/state.go b/src/github.com/matrix-org/dendrite/roomserver/state/state.go index 6b49aba64..b5454c0cd 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/state/state.go +++ b/src/github.com/matrix-org/dendrite/roomserver/state/state.go @@ -32,7 +32,7 @@ type RoomStateDatabase interface { AddState(roomNID types.RoomNID, stateBlockNIDs []types.StateBlockNID, state []types.StateEntry) (types.StateSnapshotNID, error) // Lookup the state of a room at each event for a list of string event IDs. // Returns an error if there is an error talking to the database - // or if the room state for the event IDs aren't in the database + // Returns a types.MissingEventError if the room state for the event IDs aren't in the database StateAtEventIDs(eventIDs []string) ([]types.StateAtEvent, error) // Lookup the numeric IDs for a list of string event types. // Returns a map from string event type to numeric ID for the event type. diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/events_table.go b/src/github.com/matrix-org/dendrite/roomserver/storage/events_table.go index acaf43b6d..a2813ae97 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/events_table.go +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/events_table.go @@ -166,6 +166,8 @@ func (s *eventStatements) selectEvent(eventID string) (types.EventNID, types.Sta return types.EventNID(eventNID), types.StateSnapshotNID(stateNID), err } +// bulkSelectStateEventByID lookups a list of state events by event ID. +// If any of the requested events are missing from the database it returns a types.MissingEventError func (s *eventStatements) bulkSelectStateEventByID(eventIDs []string) ([]types.StateEntry, error) { rows, err := s.bulkSelectStateEventByIDStmt.Query(pq.StringArray(eventIDs)) if err != nil { @@ -194,11 +196,16 @@ func (s *eventStatements) bulkSelectStateEventByID(eventIDs []string) ([]types.S // However it should be possible debug this by replaying queries or entries from the input kafka logs. // If this turns out to be impossible and we do need the debug information here, it would be better // to do it as a separate query rather than slowing down/complicating the common case. - return nil, fmt.Errorf("storage: state event IDs missing from the database (%d != %d)", i, len(eventIDs)) + return nil, types.MissingEventError( + fmt.Sprintf("storage: state event IDs missing from the database (%d != %d)", i, len(eventIDs)), + ) } return results, err } +// bulkSelectStateAtEventByID lookups the state at a list of events by event ID. +// If any of the requested events are missing from the database it returns a types.MissingEventError. +// If we do not have the state for any of the requested events it returns a types.MissingEventError. func (s *eventStatements) bulkSelectStateAtEventByID(eventIDs []string) ([]types.StateAtEvent, error) { rows, err := s.bulkSelectStateAtEventByIDStmt.Query(pq.StringArray(eventIDs)) if err != nil { @@ -218,11 +225,15 @@ func (s *eventStatements) bulkSelectStateAtEventByID(eventIDs []string) ([]types return nil, err } if result.BeforeStateSnapshotNID == 0 { - return nil, fmt.Errorf("storage: missing state for event NID %d", result.EventNID) + return nil, types.MissingEventError( + fmt.Sprintf("storage: missing state for event NID %d", result.EventNID), + ) } } if i != len(eventIDs) { - return nil, fmt.Errorf("storage: event IDs missing from the database (%d != %d)", i, len(eventIDs)) + return nil, types.MissingEventError( + fmt.Sprintf("storage: event IDs missing from the database (%d != %d)", i, len(eventIDs)), + ) } return results, err } diff --git a/src/github.com/matrix-org/dendrite/roomserver/types/types.go b/src/github.com/matrix-org/dendrite/roomserver/types/types.go index f728696bf..b255b64b9 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/types/types.go +++ b/src/github.com/matrix-org/dendrite/roomserver/types/types.go @@ -168,3 +168,9 @@ type RoomRecentEventsUpdater interface { // Rollback the transaction. Rollback() error } + +// A MissingEventError is an error that happened because the roomserver was +// missing requested events from its database. +type MissingEventError string + +func (e MissingEventError) Error() string { return string(e) } diff --git a/src/github.com/matrix-org/dendrite/syncapi/config/config.go b/src/github.com/matrix-org/dendrite/syncapi/config/config.go index 433a7a2b6..32f047229 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/config/config.go +++ b/src/github.com/matrix-org/dendrite/syncapi/config/config.go @@ -20,6 +20,8 @@ import ( // Sync contains the config information necessary to spin up a sync-server process. type Sync struct { + // Where the room server is listening for queries. + RoomserverURL string `yaml:"roomserver_url"` // The topic for events which are written by the room server output log. RoomserverOutputTopic string `yaml:"roomserver_topic"` // A list of URIs to consume events from. These kafka logs should be produced by a Room Server. diff --git a/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go index b8ec98d2c..31c48ac57 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go @@ -16,6 +16,7 @@ package consumers import ( "encoding/json" + "fmt" log "github.com/Sirupsen/logrus" "github.com/matrix-org/dendrite/common" @@ -33,6 +34,7 @@ type OutputRoomEvent struct { roomServerConsumer *common.ContinualConsumer db *storage.SyncServerDatabase notifier *sync.Notifier + query api.RoomserverQueryAPI } // NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers. @@ -51,6 +53,7 @@ func NewOutputRoomEvent(cfg *config.Sync, n *sync.Notifier, store *storage.SyncS roomServerConsumer: &consumer, db: store, notifier: n, + query: api.NewRoomserverQueryAPIHTTP(cfg.RoomserverURL, nil), } consumer.ProcessMessage = s.onMessage @@ -84,7 +87,19 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error { "room_id": ev.RoomID(), }).Info("received event from roomserver") - syncStreamPos, err := s.db.WriteEvent(&ev, output.AddsStateEventIDs, output.RemovesStateEventIDs) + addsStateEvents, err := s.lookupStateEvents(output.AddsStateEventIDs, ev) + if err != nil { + log.WithFields(log.Fields{ + "event": string(ev.JSON()), + log.ErrorKey: err, + "add": output.AddsStateEventIDs, + "del": output.RemovesStateEventIDs, + }).Panicf("roomserver output log: state event lookup failure") + } + + syncStreamPos, err := s.db.WriteEvent( + &ev, addsStateEvents, output.AddsStateEventIDs, output.RemovesStateEventIDs, + ) if err != nil { // panic rather than continue with an inconsistent database @@ -100,3 +115,74 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error { return nil } + +// lookupStateEvents looks up the state events that are added by a new event. +func (s *OutputRoomEvent) lookupStateEvents( + addsStateEventIDs []string, event gomatrixserverlib.Event, +) ([]gomatrixserverlib.Event, error) { + // Fast path if there aren't any new state events. + if len(addsStateEventIDs) == 0 { + return nil, nil + } + + // Fast path if the only state event added is the event itself. + if len(addsStateEventIDs) == 1 && addsStateEventIDs[0] == event.EventID() { + return []gomatrixserverlib.Event{event}, nil + } + + // Check if this is re-adding a state events that we previously processed + // If we have previously received a state event it may still be in + // our event database. + result, err := s.db.Events(addsStateEventIDs) + if err != nil { + return nil, err + } + missing := missingEventsFrom(result, addsStateEventIDs) + + // Check if event itself is being added. + for _, eventID := range missing { + if eventID == event.EventID() { + result = append(result, event) + break + } + } + missing = missingEventsFrom(result, addsStateEventIDs) + + if len(missing) == 0 { + return result, nil + } + + // At this point the missing events are neither the event itself nor are + // they present in our local database. Our only option is to fetch them + // from the roomserver using the query API. + eventReq := api.QueryEventsByIDRequest{EventIDs: missing} + var eventResp api.QueryEventsByIDResponse + if err := s.query.QueryEventsByID(&eventReq, &eventResp); err != nil { + return nil, err + } + + result = append(result, eventResp.Events...) + missing = missingEventsFrom(result, addsStateEventIDs) + + if len(missing) != 0 { + return nil, fmt.Errorf( + "missing %d state events IDs at event %q", len(missing), event.EventID(), + ) + } + + return result, nil +} + +func missingEventsFrom(events []gomatrixserverlib.Event, required []string) []string { + have := map[string]bool{} + for _, event := range events { + have[event.EventID()] = true + } + var missing []string + for _, eventID := range required { + if !have[eventID] { + missing = append(missing, eventID) + } + } + return missing +} diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go index 28389ad94..d4f260e00 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go @@ -16,6 +16,7 @@ package storage import ( "database/sql" + "github.com/lib/pq" "github.com/matrix-org/gomatrixserverlib" ) @@ -35,6 +36,9 @@ CREATE TABLE IF NOT EXISTS current_room_state ( -- The 'content.membership' value if this event is an m.room.member event. For other -- events, this will be NULL. membership TEXT, + -- The serial ID of the output_room_events table when this event became + -- part of the current state of the room. + added_at BIGINT, -- Clobber based on 3-uple of room_id, type and state_key CONSTRAINT room_state_unique UNIQUE (room_id, type, state_key) ); @@ -45,9 +49,10 @@ CREATE INDEX IF NOT EXISTS membership_idx ON current_room_state(type, state_key, ` const upsertRoomStateSQL = "" + - "INSERT INTO current_room_state (room_id, event_id, type, state_key, event_json, membership) VALUES ($1, $2, $3, $4, $5, $6)" + + "INSERT INTO current_room_state (room_id, event_id, type, state_key, event_json, membership, added_at)" + + " VALUES ($1, $2, $3, $4, $5, $6, $7)" + " ON CONFLICT ON CONSTRAINT room_state_unique" + - " DO UPDATE SET event_id = $2, event_json = $5, membership = $6" + " DO UPDATE SET event_id = $2, event_json = $5, membership = $6, added_at = $7" const deleteRoomStateByEventIDSQL = "" + "DELETE FROM current_room_state WHERE event_id = $1" @@ -61,12 +66,16 @@ const selectCurrentStateSQL = "" + const selectJoinedUsersSQL = "" + "SELECT room_id, state_key FROM current_room_state WHERE type = 'm.room.member' AND membership = 'join'" +const selectEventsWithEventIDsSQL = "" + + "SELECT added_at, event_json FROM current_room_state WHERE event_id = ANY($1)" + type currentRoomStateStatements struct { upsertRoomStateStmt *sql.Stmt deleteRoomStateByEventIDStmt *sql.Stmt selectRoomIDsWithMembershipStmt *sql.Stmt selectCurrentStateStmt *sql.Stmt selectJoinedUsersStmt *sql.Stmt + selectEventsWithEventIDsStmt *sql.Stmt } func (s *currentRoomStateStatements) prepare(db *sql.DB) (err error) { @@ -89,6 +98,9 @@ func (s *currentRoomStateStatements) prepare(db *sql.DB) (err error) { if s.selectJoinedUsersStmt, err = db.Prepare(selectJoinedUsersSQL); err != nil { return } + if s.selectEventsWithEventIDsStmt, err = db.Prepare(selectEventsWithEventIDsSQL); err != nil { + return + } return } @@ -141,6 +153,33 @@ func (s *currentRoomStateStatements) selectCurrentState(txn *sql.Tx, roomID stri } defer rows.Close() + return rowsToEvents(rows) +} + +func (s *currentRoomStateStatements) deleteRoomStateByEventID(txn *sql.Tx, eventID string) error { + _, err := txn.Stmt(s.deleteRoomStateByEventIDStmt).Exec(eventID) + return err +} + +func (s *currentRoomStateStatements) upsertRoomState( + txn *sql.Tx, event gomatrixserverlib.Event, membership *string, addedAt int64, +) error { + _, err := txn.Stmt(s.upsertRoomStateStmt).Exec( + event.RoomID(), event.EventID(), event.Type(), *event.StateKey(), event.JSON(), membership, addedAt, + ) + return err +} + +func (s *currentRoomStateStatements) selectEventsWithEventIDs(txn *sql.Tx, eventIDs []string) ([]streamEvent, error) { + rows, err := txn.Stmt(s.selectEventsWithEventIDsStmt).Query(pq.StringArray(eventIDs)) + if err != nil { + return nil, err + } + defer rows.Close() + return rowsToStreamEvents(rows) +} + +func rowsToEvents(rows *sql.Rows) ([]gomatrixserverlib.Event, error) { var result []gomatrixserverlib.Event for rows.Next() { var eventBytes []byte @@ -156,15 +195,3 @@ func (s *currentRoomStateStatements) selectCurrentState(txn *sql.Tx, roomID stri } return result, nil } - -func (s *currentRoomStateStatements) deleteRoomStateByEventID(txn *sql.Tx, eventID string) error { - _, err := txn.Stmt(s.deleteRoomStateByEventIDStmt).Exec(eventID) - return err -} - -func (s *currentRoomStateStatements) upsertRoomState(txn *sql.Tx, event gomatrixserverlib.Event, membership *string) error { - _, err := txn.Stmt(s.upsertRoomStateStmt).Exec( - event.RoomID(), event.EventID(), event.Type(), *event.StateKey(), event.JSON(), membership, - ) - return err -} diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go index b3cc39255..6c8a52635 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go @@ -16,7 +16,6 @@ package storage import ( "database/sql" - "fmt" log "github.com/Sirupsen/logrus" "github.com/lib/pq" @@ -193,7 +192,7 @@ func (s *outputRoomEventsStatements) selectRecentEvents( return nil, err } defer rows.Close() - events, err := rowsToEvents(rows) + events, err := rowsToStreamEvents(rows) if err != nil { return nil, err } @@ -205,23 +204,19 @@ func (s *outputRoomEventsStatements) selectRecentEvents( // Events returns the events for the given event IDs. Returns an error if any one of the event IDs given are missing // from the database. func (s *outputRoomEventsStatements) selectEvents(txn *sql.Tx, eventIDs []string) ([]streamEvent, error) { - rows, err := txn.Stmt(s.selectEventsStmt).Query(pq.StringArray(eventIDs)) + stmt := s.selectEventsStmt + if txn != nil { + stmt = txn.Stmt(stmt) + } + rows, err := stmt.Query(pq.StringArray(eventIDs)) if err != nil { return nil, err } defer rows.Close() - result, err := rowsToEvents(rows) - if err != nil { - return nil, err - } - - if len(result) != len(eventIDs) { - return nil, fmt.Errorf("failed to map all event IDs to events: (got %d, wanted %d)", len(result), len(eventIDs)) - } - return result, nil + return rowsToStreamEvents(rows) } -func rowsToEvents(rows *sql.Rows) ([]streamEvent, error) { +func rowsToStreamEvents(rows *sql.Rows) ([]streamEvent, error) { var result []streamEvent for rows.Next() { var ( diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go index dee0b51c2..ce2e7f234 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go @@ -17,6 +17,7 @@ package storage import ( "database/sql" "encoding/json" + "fmt" // Import the postgres database driver. _ "github.com/lib/pq" "github.com/matrix-org/dendrite/clientapi/events" @@ -75,10 +76,24 @@ func (d *SyncServerDatabase) AllJoinedUsersInRooms() (map[string][]string, error return d.roomstate.selectJoinedUsers() } +// Events lookups a list of event by their event ID. +// Returns a list of events matching the requested IDs found in the database. +// If an event is not found in the database then it will be omitted from the list. +// Returns an error if there was a problem talking with the database +func (d *SyncServerDatabase) Events(eventIDs []string) ([]gomatrixserverlib.Event, error) { + streamEvents, err := d.events.selectEvents(nil, eventIDs) + if err != nil { + return nil, err + } + return streamEventsToEvents(streamEvents), nil +} + // WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races // when generating the stream position for this event. Returns the sync stream position for the inserted event. // Returns an error if there was a problem inserting this event. -func (d *SyncServerDatabase) WriteEvent(ev *gomatrixserverlib.Event, addStateEventIDs, removeStateEventIDs []string) (streamPos types.StreamPosition, returnErr error) { +func (d *SyncServerDatabase) WriteEvent( + ev *gomatrixserverlib.Event, addStateEvents []gomatrixserverlib.Event, addStateEventIDs, removeStateEventIDs []string, +) (streamPos types.StreamPosition, returnErr error) { returnErr = runTransaction(d.db, func(txn *sql.Tx) error { var err error pos, err := d.events.insertEvent(txn, ev, addStateEventIDs, removeStateEventIDs) @@ -87,31 +102,19 @@ func (d *SyncServerDatabase) WriteEvent(ev *gomatrixserverlib.Event, addStateEve } streamPos = types.StreamPosition(pos) - if len(addStateEventIDs) == 0 && len(removeStateEventIDs) == 0 { + if len(addStateEvents) == 0 && len(removeStateEventIDs) == 0 { // Nothing to do, the event may have just been a message event. return nil } - // Update the current room state based on the added/removed state event IDs. - // In the common case there is a single added event ID which is the state event itself, assuming `ev` is a state event. - // However, conflict resolution may result in there being different events being added, or even some removed. - if len(removeStateEventIDs) == 0 && len(addStateEventIDs) == 1 && addStateEventIDs[0] == ev.EventID() { - // common case - return d.updateRoomState(txn, nil, []gomatrixserverlib.Event{*ev}) - } - - // uncommon case: we need to fetch the full event for each event ID mentioned, then update room state - added, err := d.events.selectEvents(txn, addStateEventIDs) - if err != nil { - return err - } - - return d.updateRoomState(txn, removeStateEventIDs, streamEventsToEvents(added)) + return d.updateRoomState(txn, removeStateEventIDs, addStateEvents, streamPos) }) return } -func (d *SyncServerDatabase) updateRoomState(txn *sql.Tx, removedEventIDs []string, addedEvents []gomatrixserverlib.Event) error { +func (d *SyncServerDatabase) updateRoomState( + txn *sql.Tx, removedEventIDs []string, addedEvents []gomatrixserverlib.Event, streamPos types.StreamPosition, +) error { // remove first, then add, as we do not ever delete state, but do replace state which is a remove followed by an add. for _, eventID := range removedEventIDs { if err := d.roomstate.deleteRoomStateByEventID(txn, eventID); err != nil { @@ -132,7 +135,7 @@ func (d *SyncServerDatabase) updateRoomState(txn *sql.Tx, removedEventIDs []stri } membership = &memberContent.Membership } - if err := d.roomstate.upsertRoomState(txn, event, membership); err != nil { + if err := d.roomstate.upsertRoomState(txn, event, membership, int64(streamPos)); err != nil { return err } } @@ -310,7 +313,7 @@ func (d *SyncServerDatabase) fetchStateEvents(txn *sql.Tx, roomIDToEventIDSet ma for _, missingEvIDs := range missingEvents { allMissingEventIDs = append(allMissingEventIDs, missingEvIDs...) } - evs, err := d.events.selectEvents(txn, allMissingEventIDs) + evs, err := d.fetchMissingStateEvents(txn, allMissingEventIDs) if err != nil { return nil, err } @@ -323,6 +326,45 @@ func (d *SyncServerDatabase) fetchStateEvents(txn *sql.Tx, roomIDToEventIDSet ma return stateBetween, nil } +func (d *SyncServerDatabase) fetchMissingStateEvents(txn *sql.Tx, eventIDs []string) ([]streamEvent, error) { + // Fetch from the events table first so we pick up the stream ID for the + // event. + events, err := d.events.selectEvents(txn, eventIDs) + if err != nil { + return nil, err + } + + have := map[string]bool{} + for _, event := range events { + have[event.EventID()] = true + } + var missing []string + for _, eventID := range eventIDs { + if !have[eventID] { + missing = append(missing, eventID) + } + } + if len(missing) == 0 { + return events, nil + } + + // If they are missing from the events table then they should be state + // events that we received from outside the main event stream. + // These should be in the room state table. + stateEvents, err := d.roomstate.selectEventsWithEventIDs(txn, missing) + + if err != nil { + return nil, err + } + if len(stateEvents) != len(missing) { + return nil, fmt.Errorf("failed to map all event IDs to events: (got %d, wanted %d)", len(stateEvents), len(missing)) + } + for _, e := range stateEvents { + events = append(events, e) + } + return events, nil +} + func (d *SyncServerDatabase) getStateDeltas(txn *sql.Tx, fromPos, toPos types.StreamPosition, userID string) ([]stateDelta, error) { // Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821 // - Get membership list changes for this user in this sync response