diff --git a/dendrite-config.yaml b/dendrite-config.yaml new file mode 100644 index 000000000..0fc8e8baa --- /dev/null +++ b/dendrite-config.yaml @@ -0,0 +1,76 @@ +# The config file format version +# This is used by dendrite to tell if it understands the config format. +# This will change if the structure of the config file changes or if the meaning +# of an existing config key changes. +version: 0 + +# The matrix specific config +matrix: + # The name of the server. This is usually the domain name, e.g 'matrix.org', 'localhost'. + server_name: "example.com" + # The path to the PEM formatted matrix private key. + private_key: "/etc/dendrite/matrix_key.pem" + # The x509 certificates used by the federation listeners for this server + federation_certificates: ["/etc/dendrite/federation_tls.pem"] + +# The media repository config +media: + # The base path to where the media files will be stored. May be relative or absolute. + base_path: /var/dendrite/media + + # The maximum file size in bytes that is allowed to be stored on this server. + # Note: if max_file_size_bytes is set to 0, the size is unlimited. + # Note: if max_file_size_bytes is not set, it will default to 10485760 (10MB) + max_file_size_bytes: 10485760 + + # Whether to dynamically generate thumbnails on-the-fly if the requested resolution is not already generated + # NOTE: This is a possible denial-of-service attack vector - use at your own risk + dynamic_thumbnails: false + + # A list of thumbnail sizes to be pre-generated for downloaded remote / uploaded content + # method is one of crop or scale. If omitted, it will default to scale. + # crop scales to fill the requested dimensions and crops the excess. + # scale scales to fit the requested dimensions and one dimension may be smaller than requested. + thumbnail_sizes: + - width: 32 + height: 32 + method: crop + - width: 96 + height: 96 + method: crop + - width: 320 + height: 240 + method: scale + - width: 640 + height: 480 + method: scale + - width: 800 + height: 600 + method: scale + +# The config for communicating with kafka +kafka: + # Where the kafka servers are running. + addresses: ["localhost:9092"] + # The names of the kafka topics to use. + topics: + input_room_event: roomserverInput + output_room_event: roomserverOutput + +# The postgres connection configs for connecting to the databases e.g a postgres:// URI +database: + account: "postgres://dendrite:itsasecret@localhost/dendrite_account?sslmode=disable" + device: "postgres://dendrite:itsasecret@localhost/dendrite_device?sslmode=disable" + media_api: "postgres://dendrite:itsasecret@localhost/dendrite_mediaapi?sslmode=disable" + sync_api: "postgres://dendrite:itsasecret@localhost/dendrite_syncapi?sslmode=disable" + room_server: "postgres://dendrite:itsasecret@localhost/dendrite_roomserver?sslmode=disable" + server_key: "postgres://dendrite:itsasecret@localhost/dendrite_serverkey?sslmode=disable" + +# The TCP host:port pairs to bind the internal HTTP APIs to. +# These shouldn't be exposed to the public internet. +listen: + room_server: "localhost:7770" + client_api: "localhost:7771" + federation_api: "localhost:7772" + sync_api: "localhost:7773" + media_api: "localhost:7774" diff --git a/media-api-server-config.yaml b/media-api-server-config.yaml deleted file mode 100644 index c222fe8fb..000000000 --- a/media-api-server-config.yaml +++ /dev/null @@ -1,38 +0,0 @@ -# The name of the server. This is usually the domain name, e.g 'matrix.org', 'localhost'. -server_name: "example.com" - -# The base path to where the media files will be stored. May be relative or absolute. -base_path: /var/dendrite/media - -# The maximum file size in bytes that is allowed to be stored on this server. -# Note: if max_file_size_bytes is set to 0, the size is unlimited. -# Note: if max_file_size_bytes is not set, it will default to 10485760 (10MB) -max_file_size_bytes: 10485760 - -# The postgres connection config for connecting to the database e.g a postgres:// URI -database: "postgres://dendrite:itsasecret@localhost/mediaapi?sslmode=disable" - -# Whether to dynamically generate thumbnails on-the-fly if the requested resolution is not already generated -# NOTE: This is a possible denial-of-service attack vector - use at your own risk -dynamic_thumbnails: false - -# A list of thumbnail sizes to be pre-generated for downloaded remote / uploaded content -# method is one of crop or scale. If omitted, it will default to scale. -# crop scales to fill the requested dimensions and crops the excess. -# scale scales to fit the requested dimensions and one dimension may be smaller than requested. -thumbnail_sizes: -- width: 32 - height: 32 - method: crop -- width: 96 - height: 96 - method: crop -- width: 320 - height: 240 - method: scale -- width: 640 - height: 480 - method: scale -- width: 800 - height: 600 - method: scale diff --git a/src/github.com/matrix-org/dendrite/clientapi/producers/roomserver.go b/src/github.com/matrix-org/dendrite/clientapi/producers/roomserver.go index 02a5956ce..aea912c25 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/producers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/clientapi/producers/roomserver.go @@ -49,7 +49,7 @@ func (c *RoomserverProducer) SendEvents(events []gomatrixserverlib.Event, sendAs ires[i] = api.InputRoomEvent{ Kind: api.KindNew, Event: event.JSON(), - AuthEventIDs: authEventIDs(event), + AuthEventIDs: event.AuthEventIDs(), SendAsServer: string(sendAsServer), } eventIDs[i] = event.EventID() @@ -71,7 +71,7 @@ func (c *RoomserverProducer) SendEventWithState(state gomatrixserverlib.RespStat ires[i] = api.InputRoomEvent{ Kind: api.KindOutlier, Event: outlier.JSON(), - AuthEventIDs: authEventIDs(outlier), + AuthEventIDs: outlier.AuthEventIDs(), } eventIDs[i] = outlier.EventID() } @@ -84,7 +84,7 @@ func (c *RoomserverProducer) SendEventWithState(state gomatrixserverlib.RespStat ires[len(outliers)] = api.InputRoomEvent{ Kind: api.KindNew, Event: event.JSON(), - AuthEventIDs: authEventIDs(event), + AuthEventIDs: event.AuthEventIDs(), HasState: true, StateEventIDs: stateEventIDs, } @@ -93,14 +93,6 @@ func (c *RoomserverProducer) SendEventWithState(state gomatrixserverlib.RespStat return c.SendInputRoomEvents(ires, eventIDs) } -// TODO Make this a method on gomatrixserverlib.Event -func authEventIDs(event gomatrixserverlib.Event) (ids []string) { - for _, ref := range event.AuthEvents() { - ids = append(ids, ref.EventID) - } - return -} - // SendInputRoomEvents writes the given input room events to the roomserver input log. The length of both // arrays must match, and each element must correspond to the same event. func (c *RoomserverProducer) SendInputRoomEvents(ires []api.InputRoomEvent, eventIDs []string) error { diff --git a/src/github.com/matrix-org/dendrite/clientapi/readers/directory.go b/src/github.com/matrix-org/dendrite/clientapi/readers/directory.go index db8e75247..10e760943 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/readers/directory.go +++ b/src/github.com/matrix-org/dendrite/clientapi/readers/directory.go @@ -16,6 +16,8 @@ package readers import ( "fmt" + "net/http" + "github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" @@ -23,8 +25,6 @@ import ( "github.com/matrix-org/gomatrix" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" - "net/http" - "strings" ) // DirectoryRoom looks up a room alias @@ -35,7 +35,7 @@ func DirectoryRoom( federation *gomatrixserverlib.FederationClient, cfg *config.Dendrite, ) util.JSONResponse { - domain, err := domainFromID(roomAlias) + _, domain, err := gomatrixserverlib.ParseID('#', roomAlias) if err != nil { return util.JSONResponse{ Code: 400, @@ -69,19 +69,3 @@ func DirectoryRoom( } } } - -// domainFromID returns everything after the first ":" character to extract -// the domain part of a matrix ID. -// TODO: duplicated from gomatrixserverlib. -func domainFromID(id string) (gomatrixserverlib.ServerName, error) { - // IDs have the format: SIGIL LOCALPART ":" DOMAIN - // Split on the first ":" character since the domain can contain ":" - // characters. - parts := strings.SplitN(id, ":", 2) - if len(parts) != 2 { - // The ID must have a ":" character. - return "", fmt.Errorf("invalid ID: %q", id) - } - // Return everything after the first ":" character. - return gomatrixserverlib.ServerName(parts[1]), nil -} diff --git a/src/github.com/matrix-org/dendrite/clientapi/writers/createroom.go b/src/github.com/matrix-org/dendrite/clientapi/writers/createroom.go index e05c17424..731519836 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/writers/createroom.go +++ b/src/github.com/matrix-org/dendrite/clientapi/writers/createroom.go @@ -60,14 +60,7 @@ func (r createRoomRequest) Validate() *util.JSONResponse { // It should be a struct (with pointers into a single string to avoid copying) and // we should update all refs to use UserID types rather than strings. // https://github.com/matrix-org/synapse/blob/v0.19.2/synapse/types.py#L92 - if len(userID) == 0 || userID[0] != '@' { - return &util.JSONResponse{ - Code: 400, - JSON: jsonerror.BadJSON("user id must start with '@'"), - } - } - parts := strings.SplitN(userID[1:], ":", 2) - if len(parts) != 2 { + if _, _, err := gomatrixserverlib.ParseID('@', userID); err != nil { return &util.JSONResponse{ Code: 400, JSON: jsonerror.BadJSON("user id must be in the form @localpart:domain"), @@ -192,9 +185,13 @@ func createRoom(req *http.Request, device *authtypes.Device, cfg config.Dendrite return httputil.LogThenError(req, err) } + response := createRoomResponse{ + RoomID: roomID, + } + return util.JSONResponse{ Code: 200, - JSON: builtEvents, + JSON: response, } } diff --git a/src/github.com/matrix-org/dendrite/clientapi/writers/joinroom.go b/src/github.com/matrix-org/dendrite/clientapi/writers/joinroom.go index 57bc3e86b..09051c8f1 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/writers/joinroom.go +++ b/src/github.com/matrix-org/dendrite/clientapi/writers/joinroom.go @@ -16,6 +16,10 @@ package writers import ( "fmt" + "net/http" + "strings" + "time" + "github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" @@ -25,9 +29,6 @@ import ( "github.com/matrix-org/gomatrix" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" - "net/http" - "strings" - "time" ) // JoinRoomByIDOrAlias implements the "/join/{roomIDOrAlias}" API. @@ -88,7 +89,7 @@ func (r joinRoomReq) joinRoomByID() util.JSONResponse { // joinRoomByAlias joins a room using a room alias. func (r joinRoomReq) joinRoomByAlias(roomAlias string) util.JSONResponse { - domain, err := domainFromID(roomAlias) + _, domain, err := gomatrixserverlib.ParseID('#', roomAlias) if err != nil { return util.JSONResponse{ Code: 400, @@ -245,19 +246,3 @@ func (r joinRoomReq) joinRoomUsingServer(roomID string, server gomatrixserverlib }{roomID}, }, nil } - -// domainFromID returns everything after the first ":" character to extract -// the domain part of a matrix ID. -// TODO: duplicated from gomatrixserverlib. -func domainFromID(id string) (gomatrixserverlib.ServerName, error) { - // IDs have the format: SIGIL LOCALPART ":" DOMAIN - // Split on the first ":" character since the domain can contain ":" - // characters. - parts := strings.SplitN(id, ":", 2) - if len(parts) != 2 { - // The ID must have a ":" character. - return "", fmt.Errorf("invalid ID: %q", id) - } - // Return everything after the first ":" character. - return gomatrixserverlib.ServerName(parts[1]), nil -} diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-federation-sender-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-federation-sender-server/main.go new file mode 100644 index 000000000..3c7e5c472 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-federation-sender-server/main.go @@ -0,0 +1,74 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "flag" + "net/http" + "os" + + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/common/config" + "github.com/matrix-org/dendrite/federationsender/consumers" + "github.com/matrix-org/dendrite/federationsender/queue" + "github.com/matrix-org/dendrite/federationsender/storage" + "github.com/matrix-org/gomatrixserverlib" + "github.com/prometheus/client_golang/prometheus" + + log "github.com/Sirupsen/logrus" +) + +var configPath = flag.String("config", "dendrite.yaml", "The path to the config file. For more information, see the config file in this repository.") + +func main() { + common.SetupLogging(os.Getenv("LOG_DIR")) + + flag.Parse() + + if *configPath == "" { + log.Fatal("--config must be supplied") + } + cfg, err := config.Load(*configPath) + if err != nil { + log.Fatalf("Invalid config file: %s", err) + } + + log.Info("config: ", cfg) + + db, err := storage.NewDatabase(string(cfg.Database.FederationSender)) + if err != nil { + log.Panicf("startup: failed to create federation sender database with data source %s : %s", cfg.Database.FederationSender, err) + } + + federation := gomatrixserverlib.NewFederationClient( + cfg.Matrix.ServerName, cfg.Matrix.KeyID, cfg.Matrix.PrivateKey, + ) + + queues := queue.NewOutgoingQueues(cfg.Matrix.ServerName, federation) + + consumer, err := consumers.NewOutputRoomEvent(cfg, queues, db) + if err != nil { + log.WithError(err).Panicf("startup: failed to create room server consumer") + } + if err = consumer.Start(); err != nil { + log.WithError(err).Panicf("startup: failed to start room server consumer") + } + + http.DefaultServeMux.Handle("/metrics", prometheus.Handler()) + + if err := http.ListenAndServe(string(cfg.Listen.FederationSender), nil); err != nil { + panic(err) + } +} diff --git a/src/github.com/matrix-org/dendrite/common/config/config.go b/src/github.com/matrix-org/dendrite/common/config/config.go index a4977731f..9a5a3dcc9 100644 --- a/src/github.com/matrix-org/dendrite/common/config/config.go +++ b/src/github.com/matrix-org/dendrite/common/config/config.go @@ -30,7 +30,7 @@ import ( // Version is the current version of the config format. // This will change whenever we make breaking changes to the config format. -const Version = "v0" +const Version = 0 // Dendrite contains all the config used by a dendrite process. // Relative paths are resolved relative to the current working directory @@ -41,7 +41,7 @@ type Dendrite struct { // to update their config file to the current version. // The version of the file should only be different if there has // been a breaking change to the config file format. - Version string `yaml:"version"` + Version int `yaml:"version"` // The configuration required for a matrix server. Matrix struct { @@ -122,16 +122,20 @@ type Dendrite struct { // The RoomServer database stores information about matrix rooms. // It is only accessed by the RoomServer. RoomServer DataSource `yaml:"room_server"` + // The FederationSender database stores information used by the FederationSender + // It is only accessed by the FederationSender. + FederationSender DataSource `yaml:"federation_sender"` } `yaml:"database"` // The internal addresses the components will listen on. // These should not be exposed externally as they expose metrics and debugging APIs. Listen struct { - MediaAPI Address `yaml:"media_api"` - ClientAPI Address `yaml:"client_api"` - FederationAPI Address `yaml:"federation_api"` - SyncAPI Address `yaml:"sync_api"` - RoomServer Address `yaml:"room_server"` + MediaAPI Address `yaml:"media_api"` + ClientAPI Address `yaml:"client_api"` + FederationAPI Address `yaml:"federation_api"` + SyncAPI Address `yaml:"sync_api"` + RoomServer Address `yaml:"room_server"` + FederationSender Address `yaml:"federation_sender"` } `yaml:"listen"` } diff --git a/src/github.com/matrix-org/dendrite/common/config/config_test.go b/src/github.com/matrix-org/dendrite/common/config/config_test.go index f690882ef..7af619688 100644 --- a/src/github.com/matrix-org/dendrite/common/config/config_test.go +++ b/src/github.com/matrix-org/dendrite/common/config/config_test.go @@ -32,7 +32,7 @@ func TestLoadConfigRelative(t *testing.T) { } const testConfig = ` -version: v0 +version: 0 matrix: server_name: localhost private_key: matrix_key.pem diff --git a/src/github.com/matrix-org/dendrite/federationapi/writers/send.go b/src/github.com/matrix-org/dendrite/federationapi/writers/send.go index a6013cea2..67ee6b027 100644 --- a/src/github.com/matrix-org/dendrite/federationapi/writers/send.go +++ b/src/github.com/matrix-org/dendrite/federationapi/writers/send.go @@ -118,11 +118,7 @@ type unknownRoomError struct { func (e unknownRoomError) Error() string { return fmt.Sprintf("unknown room %q", e.roomID) } func (t *txnReq) processEvent(e gomatrixserverlib.Event) error { - refs := e.PrevEvents() - prevEventIDs := make([]string, len(refs)) - for i := range refs { - prevEventIDs[i] = refs[i].EventID - } + prevEventIDs := e.PrevEventIDs() // Fetch the state needed to authenticate the event. needed := gomatrixserverlib.StateNeededForAuth([]gomatrixserverlib.Event{e}) diff --git a/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver.go new file mode 100644 index 000000000..340875318 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver.go @@ -0,0 +1,342 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumers + +import ( + "encoding/json" + "fmt" + + log "github.com/Sirupsen/logrus" + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/common/config" + "github.com/matrix-org/dendrite/federationsender/queue" + "github.com/matrix-org/dendrite/federationsender/storage" + "github.com/matrix-org/dendrite/federationsender/types" + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/gomatrixserverlib" + sarama "gopkg.in/Shopify/sarama.v1" +) + +// OutputRoomEvent consumes events that originated in the room server. +type OutputRoomEvent struct { + roomServerConsumer *common.ContinualConsumer + db *storage.Database + queues *queue.OutgoingQueues + query api.RoomserverQueryAPI +} + +// NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers. +func NewOutputRoomEvent(cfg *config.Dendrite, queues *queue.OutgoingQueues, store *storage.Database) (*OutputRoomEvent, error) { + kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil) + if err != nil { + return nil, err + } + roomServerURL := cfg.RoomServerURL() + + consumer := common.ContinualConsumer{ + Topic: string(cfg.Kafka.Topics.OutputRoomEvent), + Consumer: kafkaConsumer, + PartitionStore: store, + } + s := &OutputRoomEvent{ + roomServerConsumer: &consumer, + db: store, + queues: queues, + query: api.NewRoomserverQueryAPIHTTP(roomServerURL, nil), + } + consumer.ProcessMessage = s.onMessage + + return s, nil +} + +// Start consuming from room servers +func (s *OutputRoomEvent) Start() error { + return s.roomServerConsumer.Start() +} + +// onMessage is called when the federation server receives a new event from the room server output log. +// It is unsafe to call this with messages for the same room in multiple gorountines +// because updates it will likely fail with a types.EventIDMismatchError when it +// realises that it cannot update the room state using the deltas. +func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error { + // Parse out the event JSON + var output api.OutputRoomEvent + if err := json.Unmarshal(msg.Value, &output); err != nil { + // If the message was invalid, log it and move on to the next message in the stream + log.WithError(err).Errorf("roomserver output log: message parse failure") + return nil + } + + ev, err := gomatrixserverlib.NewEventFromTrustedJSON(output.Event, false) + if err != nil { + log.WithError(err).Errorf("roomserver output log: event parse failure") + return nil + } + log.WithFields(log.Fields{ + "event_id": ev.EventID(), + "room_id": ev.RoomID(), + "send_as_server": output.SendAsServer, + }).Info("received event from roomserver") + + if err = s.processMessage(output, ev); err != nil { + // panic rather than continue with an inconsistent database + log.WithFields(log.Fields{ + "event": string(ev.JSON()), + log.ErrorKey: err, + "add": output.AddsStateEventIDs, + "del": output.RemovesStateEventIDs, + }).Panicf("roomserver output log: write event failure") + return nil + } + + return nil +} + +// processMessage updates the list of currently joined hosts in the room +// and then sends the event to the hosts that were joined before the event. +func (s *OutputRoomEvent) processMessage(ore api.OutputRoomEvent, ev gomatrixserverlib.Event) error { + addsStateEvents, err := s.lookupStateEvents(ore.AddsStateEventIDs, ev) + if err != nil { + return err + } + addsJoinedHosts, err := joinedHostsFromEvents(addsStateEvents) + if err != nil { + return err + } + // Update our copy of the current state. + // We keep a copy of the current state because the state at each event is + // expressed as a delta against the current state. + // TODO: handle EventIDMismatchError and recover the current state by talking + // to the roomserver + oldJoinedHosts, err := s.db.UpdateRoom( + ev.RoomID(), ore.LastSentEventID, ev.EventID(), + addsJoinedHosts, ore.RemovesStateEventIDs, + ) + if err != nil { + return err + } + + if ore.SendAsServer == api.DoNotSendToOtherServers { + // Ignore event that we don't need to send anywhere. + return nil + } + + // Work out which hosts were joined at the event itself. + joinedHostsAtEvent, err := s.joinedHostsAtEvent(ore, ev, oldJoinedHosts) + if err != nil { + return err + } + + // Send the event. + if err = s.queues.SendEvent( + &ev, gomatrixserverlib.ServerName(ore.SendAsServer), joinedHostsAtEvent, + ); err != nil { + return err + } + + return nil +} + +// joinedHostsAtEvent works out a list of matrix servers that were joined to +// the room at the event. +// It is important to use the state at the event for sending messages because: +// 1) We shouldn't send messages to servers that weren't in the room. +// 2) If a server is kicked from the rooms it should still be told about the +// kick event, +// Usually the list can be calculated locally, but sometimes it will need fetch +// events from the room server. +// Returns an error if there was a problem talking to the room server. +func (s *OutputRoomEvent) joinedHostsAtEvent( + ore api.OutputRoomEvent, ev gomatrixserverlib.Event, oldJoinedHosts []types.JoinedHost, +) ([]gomatrixserverlib.ServerName, error) { + // Combine the delta into a single delta so that the adds and removes can + // cancel each other out. This should reduce the number of times we need + // to fetch a state event from the room server. + combinedAdds, combinedRemoves := combineDeltas( + ore.AddsStateEventIDs, ore.RemovesStateEventIDs, + ore.StateBeforeAddsEventIDs, ore.StateBeforeRemovesEventIDs, + ) + combinedAddsEvents, err := s.lookupStateEvents(combinedAdds, ev) + if err != nil { + return nil, err + } + + combinedAddsJoinedHosts, err := joinedHostsFromEvents(combinedAddsEvents) + if err != nil { + return nil, err + } + + removed := map[string]bool{} + for _, eventID := range combinedRemoves { + removed[eventID] = true + } + + joined := map[gomatrixserverlib.ServerName]bool{} + for _, joinedHost := range oldJoinedHosts { + if removed[joinedHost.MemberEventID] { + // This m.room.member event is part of the current state of the + // room, but not part of the state at the event we are processing + // Therefore we can't use it to tell whether the server was in + // the room at the event. + continue + } + joined[joinedHost.ServerName] = true + } + + for _, joinedHost := range combinedAddsJoinedHosts { + // This m.room.member event was part of the state of the room at the + // event, but isn't part of the current state of the room now. + joined[joinedHost.ServerName] = true + } + + var result []gomatrixserverlib.ServerName + for serverName, include := range joined { + if include { + result = append(result, serverName) + } + } + return result, nil +} + +// joinedHostsFromEvents turns a list of state events into a list of joined hosts. +// This errors if one of the events was invalid. +// It should be impossible for an invalid event to get this far in the pipeline. +func joinedHostsFromEvents(evs []gomatrixserverlib.Event) ([]types.JoinedHost, error) { + var joinedHosts []types.JoinedHost + for _, ev := range evs { + if ev.Type() != "m.room.member" || ev.StateKey() == nil { + continue + } + membership, err := ev.Membership() + if err != nil { + return nil, err + } + if *membership != "join" { + continue + } + _, serverName, err := gomatrixserverlib.ParseID('@', *ev.StateKey()) + if err != nil { + return nil, err + } + joinedHosts = append(joinedHosts, types.JoinedHost{ + MemberEventID: ev.EventID(), ServerName: serverName, + }) + } + return joinedHosts, nil +} + +// combineDeltas combines two deltas into a single delta. +// Assumes that the order of operations is add(1), remove(1), add(2), remove(2). +// Removes duplicate entries and redundant operations from each delta. +func combineDeltas(adds1, removes1, adds2, removes2 []string) (adds, removes []string) { + addSet := map[string]bool{} + removeSet := map[string]bool{} + + // combine processes each unique value in a list. + // If the value is in the removeFrom set then it is removed from that set. + // Otherwise it is added to the addTo set. + combine := func(values []string, removeFrom, addTo map[string]bool) { + processed := map[string]bool{} + for _, value := range values { + if processed[value] { + continue + } + processed[value] = true + if removeFrom[value] { + delete(removeFrom, value) + } else { + addTo[value] = true + } + } + } + + combine(adds1, nil, addSet) + combine(removes1, addSet, removeSet) + combine(adds2, removeSet, addSet) + combine(removes2, addSet, removeSet) + + for value := range addSet { + adds = append(adds, value) + } + for value := range removeSet { + removes = append(removes, value) + } + return +} + +// lookupStateEvents looks up the state events that are added by a new event. +func (s *OutputRoomEvent) lookupStateEvents( + addsStateEventIDs []string, event gomatrixserverlib.Event, +) ([]gomatrixserverlib.Event, error) { + // Fast path if there aren't any new state events. + if len(addsStateEventIDs) == 0 { + return nil, nil + } + + // Fast path if the only state event added is the event itself. + if len(addsStateEventIDs) == 1 && addsStateEventIDs[0] == event.EventID() { + return []gomatrixserverlib.Event{event}, nil + } + + missing := addsStateEventIDs + var result []gomatrixserverlib.Event + + // Check if event itself is being added. + for _, eventID := range missing { + if eventID == event.EventID() { + result = append(result, event) + break + } + } + missing = missingEventsFrom(result, addsStateEventIDs) + + if len(missing) == 0 { + return result, nil + } + + // At this point the missing events are neither the event itself nor are + // they present in our local database. Our only option is to fetch them + // from the roomserver using the query API. + eventReq := api.QueryEventsByIDRequest{EventIDs: missing} + var eventResp api.QueryEventsByIDResponse + if err := s.query.QueryEventsByID(&eventReq, &eventResp); err != nil { + return nil, err + } + + result = append(result, eventResp.Events...) + missing = missingEventsFrom(result, addsStateEventIDs) + + if len(missing) != 0 { + return nil, fmt.Errorf( + "missing %d state events IDs at event %q", len(missing), event.EventID(), + ) + } + + return result, nil +} + +func missingEventsFrom(events []gomatrixserverlib.Event, required []string) []string { + have := map[string]bool{} + for _, event := range events { + have[event.EventID()] = true + } + var missing []string + for _, eventID := range required { + if !have[eventID] { + missing = append(missing, eventID) + } + } + return missing +} diff --git a/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver_test.go b/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver_test.go new file mode 100644 index 000000000..bb659b9cd --- /dev/null +++ b/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver_test.go @@ -0,0 +1,53 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumers + +import ( + "testing" +) + +func TestCombineNoOp(t *testing.T) { + inputAdd1 := []string{"a", "b", "c"} + inputDel1 := []string{"a", "b", "d"} + inputAdd2 := []string{"a", "d", "e"} + inputDel2 := []string{"a", "c", "e", "e"} + + gotAdd, gotDel := combineDeltas(inputAdd1, inputDel1, inputAdd2, inputDel2) + + if len(gotAdd) != 0 { + t.Errorf("wanted combined adds to be an empty list, got %#v", gotAdd) + } + + if len(gotDel) != 0 { + t.Errorf("wanted combined removes to be an empty list, got %#v", gotDel) + } +} + +func TestCombineDedup(t *testing.T) { + inputAdd1 := []string{"a", "a"} + inputDel1 := []string{"b", "b"} + inputAdd2 := []string{"a", "a"} + inputDel2 := []string{"b", "b"} + + gotAdd, gotDel := combineDeltas(inputAdd1, inputDel1, inputAdd2, inputDel2) + + if len(gotAdd) != 1 || gotAdd[0] != "a" { + t.Errorf("wanted combined adds to be %#v, got %#v", []string{"a"}, gotAdd) + } + + if len(gotDel) != 1 || gotDel[0] != "b" { + t.Errorf("wanted combined removes to be %#v, got %#v", []string{"b"}, gotDel) + } +} diff --git a/src/github.com/matrix-org/dendrite/federationsender/queue/destinationqueue.go b/src/github.com/matrix-org/dendrite/federationsender/queue/destinationqueue.go new file mode 100644 index 000000000..bb274b08e --- /dev/null +++ b/src/github.com/matrix-org/dendrite/federationsender/queue/destinationqueue.go @@ -0,0 +1,105 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package queue + +import ( + "fmt" + "sync" + "time" + + log "github.com/Sirupsen/logrus" + "github.com/matrix-org/gomatrixserverlib" +) + +// destinationQueue is a queue of events for a single destination. +// It is responsible for sending the events to the destination and +// ensures that only one request is in flight to a given destination +// at a time. +type destinationQueue struct { + client *gomatrixserverlib.FederationClient + origin gomatrixserverlib.ServerName + destination gomatrixserverlib.ServerName + // The running mutex protects running, sentCounter, lastTransactionIDs and + // pendingEvents. + runningMutex sync.Mutex + running bool + sentCounter int + lastTransactionIDs []gomatrixserverlib.TransactionID + pendingEvents []*gomatrixserverlib.Event +} + +// Send event adds the event to the pending queue for the destination. +// If the queue is empty then it starts a background goroutine to +// start sending events to that destination. +func (oq *destinationQueue) sendEvent(ev *gomatrixserverlib.Event) { + oq.runningMutex.Lock() + defer oq.runningMutex.Unlock() + oq.pendingEvents = append(oq.pendingEvents, ev) + if !oq.running { + oq.running = true + go oq.backgroundSend() + } +} + +func (oq *destinationQueue) backgroundSend() { + for { + t := oq.next() + if t == nil { + // If the queue is empty then stop processing for this destination. + // TODO: Remove this destination from the queue map. + return + } + + // TODO: handle retries. + // TODO: blacklist uncooperative servers. + + _, err := oq.client.SendTransaction(*t) + if err != nil { + log.WithFields(log.Fields{ + "destination": oq.destination, + log.ErrorKey: err, + }).Info("problem sending transaction") + } + } +} + +// next creates a new transaction from the pending event queue +// and flushes the queue. +// Returns nil if the queue was empty. +func (oq *destinationQueue) next() *gomatrixserverlib.Transaction { + oq.runningMutex.Lock() + defer oq.runningMutex.Unlock() + if len(oq.pendingEvents) == 0 { + oq.running = false + return nil + } + var t gomatrixserverlib.Transaction + now := gomatrixserverlib.AsTimestamp(time.Now()) + t.TransactionID = gomatrixserverlib.TransactionID(fmt.Sprintf("%d-%d", now, oq.sentCounter)) + t.Origin = oq.origin + t.Destination = oq.destination + t.OriginServerTS = now + t.PreviousIDs = oq.lastTransactionIDs + if t.PreviousIDs == nil { + t.PreviousIDs = []gomatrixserverlib.TransactionID{} + } + oq.lastTransactionIDs = []gomatrixserverlib.TransactionID{t.TransactionID} + for _, pdu := range oq.pendingEvents { + t.PDUs = append(t.PDUs, *pdu) + } + oq.pendingEvents = nil + oq.sentCounter += len(t.PDUs) + return &t +} diff --git a/src/github.com/matrix-org/dendrite/federationsender/queue/queue.go b/src/github.com/matrix-org/dendrite/federationsender/queue/queue.go new file mode 100644 index 000000000..79f019fdd --- /dev/null +++ b/src/github.com/matrix-org/dendrite/federationsender/queue/queue.go @@ -0,0 +1,95 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package queue + +import ( + "fmt" + "sync" + + log "github.com/Sirupsen/logrus" + "github.com/matrix-org/gomatrixserverlib" +) + +// OutgoingQueues is a collection of queues for sending transactions to other +// matrix servers +type OutgoingQueues struct { + origin gomatrixserverlib.ServerName + client *gomatrixserverlib.FederationClient + // The queuesMutex protects queues + queuesMutex sync.Mutex + queues map[gomatrixserverlib.ServerName]*destinationQueue +} + +// NewOutgoingQueues makes a new OutgoingQueues +func NewOutgoingQueues(origin gomatrixserverlib.ServerName, client *gomatrixserverlib.FederationClient) *OutgoingQueues { + return &OutgoingQueues{ + origin: origin, + client: client, + queues: map[gomatrixserverlib.ServerName]*destinationQueue{}, + } +} + +// SendEvent sends an event to the destinations +func (oqs *OutgoingQueues) SendEvent( + ev *gomatrixserverlib.Event, origin gomatrixserverlib.ServerName, + destinations []gomatrixserverlib.ServerName, +) error { + if origin != oqs.origin { + // TODO: Support virtual hosting by allowing us to send events using + // different origin server names. + // For now assume we are always asked to send as the single server configured + // in the dendrite config. + return fmt.Errorf( + "sendevent: unexpected server to send as: got %q expected %q", + origin, oqs.origin, + ) + } + + // Remove our own server from the list of destinations. + destinations = filterDestinations(oqs.origin, destinations) + + log.WithFields(log.Fields{ + "destinations": destinations, "event": ev.EventID(), + }).Info("Sending event") + + oqs.queuesMutex.Lock() + defer oqs.queuesMutex.Unlock() + for _, destination := range destinations { + oq := oqs.queues[destination] + if oq == nil { + oq = &destinationQueue{ + origin: oqs.origin, + destination: destination, + client: oqs.client, + } + oqs.queues[destination] = oq + } + oq.sendEvent(ev) + } + return nil +} + +// filterDestinations removes our own server from the list of destinations. +// Otherwise we could end up trying to talk to ourselves. +func filterDestinations(origin gomatrixserverlib.ServerName, destinations []gomatrixserverlib.ServerName) []gomatrixserverlib.ServerName { + var result []gomatrixserverlib.ServerName + for _, destination := range destinations { + if destination == origin { + continue + } + result = append(result, destination) + } + return result +} diff --git a/src/github.com/matrix-org/dendrite/federationsender/storage/joined_hosts_table.go b/src/github.com/matrix-org/dendrite/federationsender/storage/joined_hosts_table.go new file mode 100644 index 000000000..1b5c65ddf --- /dev/null +++ b/src/github.com/matrix-org/dendrite/federationsender/storage/joined_hosts_table.go @@ -0,0 +1,111 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "database/sql" + + "github.com/lib/pq" + "github.com/matrix-org/dendrite/federationsender/types" + "github.com/matrix-org/gomatrixserverlib" +) + +const joinedHostsSchema = ` +-- The joined_hosts table stores a list of m.room.member event ids in the +-- current state for each room where the membership is "join". +-- There will be an entry for every user that is joined to the room. +CREATE TABLE IF NOT EXISTS joined_hosts ( + -- The string ID of the room. + room_id TEXT NOT NULL, + -- The event ID of the m.room.member join event. + event_id TEXT NOT NULL, + -- The domain part of the user ID the m.room.member event is for. + server_name TEXT NOT NULL +); + +CREATE UNIQUE INDEX IF NOT EXISTS joined_hosts_event_id_idx + ON joined_hosts (event_id); + +CREATE INDEX IF NOT EXISTS joined_hosts_room_id_idx + ON joined_hosts (room_id) +` + +const insertJoinedHostsSQL = "" + + "INSERT INTO joined_hosts (room_id, event_id, server_name)" + + " VALUES ($1, $2, $3)" + +const deleteJoinedHostsSQL = "" + + "DELETE FROM joined_hosts WHERE event_id = ANY($1)" + +const selectJoinedHostsSQL = "" + + "SELECT event_id, server_name FROM joined_hosts" + + " WHERE room_id = $1" + +type joinedHostsStatements struct { + insertJoinedHostsStmt *sql.Stmt + deleteJoinedHostsStmt *sql.Stmt + selectJoinedHostsStmt *sql.Stmt +} + +func (s *joinedHostsStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(joinedHostsSchema) + if err != nil { + return + } + if s.insertJoinedHostsStmt, err = db.Prepare(insertJoinedHostsSQL); err != nil { + return + } + if s.deleteJoinedHostsStmt, err = db.Prepare(deleteJoinedHostsSQL); err != nil { + return + } + if s.selectJoinedHostsStmt, err = db.Prepare(selectJoinedHostsSQL); err != nil { + return + } + return +} + +func (s *joinedHostsStatements) insertJoinedHosts( + txn *sql.Tx, roomID, eventID string, serverName gomatrixserverlib.ServerName, +) error { + _, err := txn.Stmt(s.insertJoinedHostsStmt).Exec(roomID, eventID, serverName) + return err +} + +func (s *joinedHostsStatements) deleteJoinedHosts(txn *sql.Tx, eventIDs []string) error { + _, err := txn.Stmt(s.deleteJoinedHostsStmt).Exec(pq.StringArray(eventIDs)) + return err +} + +func (s *joinedHostsStatements) selectJoinedHosts(txn *sql.Tx, roomID string, +) ([]types.JoinedHost, error) { + rows, err := txn.Stmt(s.selectJoinedHostsStmt).Query(roomID) + if err != nil { + return nil, err + } + defer rows.Close() + + var result []types.JoinedHost + for rows.Next() { + var eventID, serverName string + if err = rows.Scan(&eventID, &serverName); err != nil { + return nil, err + } + result = append(result, types.JoinedHost{ + MemberEventID: eventID, + ServerName: gomatrixserverlib.ServerName(serverName), + }) + } + return result, nil +} diff --git a/src/github.com/matrix-org/dendrite/federationsender/storage/room_table.go b/src/github.com/matrix-org/dendrite/federationsender/storage/room_table.go new file mode 100644 index 000000000..e11ed4213 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/federationsender/storage/room_table.go @@ -0,0 +1,89 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "database/sql" +) + +const roomSchema = ` +CREATE TABLE IF NOT EXISTS rooms ( + -- The string ID of the room + room_id TEXT PRIMARY KEY, + -- The most recent event state by the room server. + -- We can use this to tell if our view of the room state has become + -- desynchronised. + last_event_id TEXT NOT NULL +);` + +const insertRoomSQL = "" + + "INSERT INTO rooms (room_id, last_event_id) VALUES ($1, '')" + + " ON CONFLICT DO NOTHING" + +const selectRoomForUpdateSQL = "" + + "SELECT last_event_id FROM rooms WHERE room_id = $1 FOR UPDATE" + +const updateRoomSQL = "" + + "UPDATE rooms SET last_event_id = $2 WHERE room_id = $1" + +type roomStatements struct { + insertRoomStmt *sql.Stmt + selectRoomForUpdateStmt *sql.Stmt + updateRoomStmt *sql.Stmt +} + +func (s *roomStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(roomSchema) + if err != nil { + return + } + + if s.insertRoomStmt, err = db.Prepare(insertRoomSQL); err != nil { + return + } + if s.selectRoomForUpdateStmt, err = db.Prepare(selectRoomForUpdateSQL); err != nil { + return + } + if s.updateRoomStmt, err = db.Prepare(updateRoomSQL); err != nil { + return + } + return +} + +// insertRoom inserts the room if it didn't already exist. +// If the room didn't exist then last_event_id is set to the empty string. +func (s *roomStatements) insertRoom(txn *sql.Tx, roomID string) error { + _, err := txn.Stmt(s.insertRoomStmt).Exec(roomID) + return err +} + +// selectRoomForUpdate locks the row for the room and returns the last_event_id. +// The row must already exist in the table. Callers can ensure that the row +// exists by calling insertRoom first. +func (s *roomStatements) selectRoomForUpdate(txn *sql.Tx, roomID string) (string, error) { + var lastEventID string + err := txn.Stmt(s.selectRoomForUpdateStmt).QueryRow(roomID).Scan(&lastEventID) + if err != nil { + return "", err + } + return lastEventID, nil +} + +// updateRoom updates the last_event_id for the room. selectRoomForUpdate should +// have already been called earlier within the transaction. +func (s *roomStatements) updateRoom(txn *sql.Tx, roomID, lastEventID string) error { + _, err := txn.Stmt(s.updateRoomStmt).Exec(roomID, lastEventID) + return err +} diff --git a/src/github.com/matrix-org/dendrite/federationsender/storage/storage.go b/src/github.com/matrix-org/dendrite/federationsender/storage/storage.go new file mode 100644 index 000000000..2f98093e4 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/federationsender/storage/storage.go @@ -0,0 +1,126 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "database/sql" + + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/federationsender/types" +) + +// Database stores information needed by the federation sender +type Database struct { + joinedHostsStatements + roomStatements + common.PartitionOffsetStatements + db *sql.DB +} + +// NewDatabase opens a new database +func NewDatabase(dataSourceName string) (*Database, error) { + var result Database + var err error + if result.db, err = sql.Open("postgres", dataSourceName); err != nil { + return nil, err + } + if err = result.prepare(); err != nil { + return nil, err + } + return &result, nil +} + +func (d *Database) prepare() error { + var err error + + if err = d.joinedHostsStatements.prepare(d.db); err != nil { + return err + } + + if err = d.roomStatements.prepare(d.db); err != nil { + return err + } + + if err = d.PartitionOffsetStatements.Prepare(d.db); err != nil { + return err + } + + return nil +} + +// PartitionOffsets implements common.PartitionStorer +func (d *Database) PartitionOffsets(topic string) ([]common.PartitionOffset, error) { + return d.SelectPartitionOffsets(topic) +} + +// SetPartitionOffset implements common.PartitionStorer +func (d *Database) SetPartitionOffset(topic string, partition int32, offset int64) error { + return d.UpsertPartitionOffset(topic, partition, offset) +} + +// UpdateRoom updates the joined hosts for a room and returns what the joined +// hosts were before the update. +func (d *Database) UpdateRoom( + roomID, oldEventID, newEventID string, + addHosts []types.JoinedHost, + removeHosts []string, +) (joinedHosts []types.JoinedHost, err error) { + err = runTransaction(d.db, func(txn *sql.Tx) error { + if err = d.insertRoom(txn, roomID); err != nil { + return err + } + lastSentEventID, err := d.selectRoomForUpdate(txn, roomID) + if err != nil { + return err + } + if lastSentEventID != oldEventID { + return types.EventIDMismatchError{lastSentEventID, oldEventID} + } + joinedHosts, err = d.selectJoinedHosts(txn, roomID) + if err != nil { + return err + } + for _, add := range addHosts { + err = d.insertJoinedHosts(txn, roomID, add.MemberEventID, add.ServerName) + if err != nil { + return err + } + } + if err = d.deleteJoinedHosts(txn, removeHosts); err != nil { + return err + } + return d.updateRoom(txn, roomID, newEventID) + }) + return +} + +func runTransaction(db *sql.DB, fn func(txn *sql.Tx) error) (err error) { + txn, err := db.Begin() + if err != nil { + return + } + defer func() { + if r := recover(); r != nil { + txn.Rollback() + panic(r) + } else if err != nil { + txn.Rollback() + } else { + err = txn.Commit() + } + }() + err = fn(txn) + return +} diff --git a/src/github.com/matrix-org/dendrite/federationsender/types/types.go b/src/github.com/matrix-org/dendrite/federationsender/types/types.go new file mode 100644 index 000000000..05ba92f77 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/federationsender/types/types.go @@ -0,0 +1,45 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package types + +import ( + "fmt" + + "github.com/matrix-org/gomatrixserverlib" +) + +// A JoinedHost is a server that is joined to a matrix room. +type JoinedHost struct { + // The MemberEventID of a m.room.member join event. + MemberEventID string + // The domain part of the state key of the m.room.member join event + ServerName gomatrixserverlib.ServerName +} + +// A EventIDMismatchError indicates that we have got out of sync with the +// room server. +type EventIDMismatchError struct { + // The event ID we have stored in our local database. + DatabaseID string + // The event ID received from the room server. + RoomServerID string +} + +func (e EventIDMismatchError) Error() string { + return fmt.Sprintf( + "mismatched last sent event ID: had %q in database got %q from room server", + e.DatabaseID, e.RoomServerID, + ) +} diff --git a/src/github.com/matrix-org/dendrite/mediaapi/writers/upload.go b/src/github.com/matrix-org/dendrite/mediaapi/writers/upload.go index 163ea7b68..275e6e9b3 100644 --- a/src/github.com/matrix-org/dendrite/mediaapi/writers/upload.go +++ b/src/github.com/matrix-org/dendrite/mediaapi/writers/upload.go @@ -20,7 +20,6 @@ import ( "net/http" "net/url" "path" - "strings" log "github.com/Sirupsen/logrus" "github.com/matrix-org/dendrite/clientapi/jsonerror" @@ -29,6 +28,7 @@ import ( "github.com/matrix-org/dendrite/mediaapi/storage" "github.com/matrix-org/dendrite/mediaapi/thumbnailer" "github.com/matrix-org/dendrite/mediaapi/types" + "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" ) @@ -193,14 +193,7 @@ func (r *uploadRequest) Validate(maxFileSizeBytes config.FileSizeBytes) *util.JS // It should be a struct (with pointers into a single string to avoid copying) and // we should update all refs to use UserID types rather than strings. // https://github.com/matrix-org/synapse/blob/v0.19.2/synapse/types.py#L92 - if len(r.MediaMetadata.UserID) == 0 || r.MediaMetadata.UserID[0] != '@' { - return &util.JSONResponse{ - Code: 400, - JSON: jsonerror.Unknown("user id must start with '@'"), - } - } - parts := strings.SplitN(string(r.MediaMetadata.UserID[1:]), ":", 2) - if len(parts) != 2 { + if _, _, err := gomatrixserverlib.ParseID('@', string(r.MediaMetadata.UserID)); err != nil { return &util.JSONResponse{ Code: 400, JSON: jsonerror.BadJSON("user id must be in the form @localpart:domain"), diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go index ce2e7f234..dfb94a9ea 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go @@ -16,11 +16,9 @@ package storage import ( "database/sql" - "encoding/json" "fmt" // Import the postgres database driver. _ "github.com/lib/pq" - "github.com/matrix-org/dendrite/clientapi/events" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" @@ -127,13 +125,15 @@ func (d *SyncServerDatabase) updateRoomState( // ignore non state events continue } - var membership *string + var ( + membership *string + err error + ) if event.Type() == "m.room.member" { - var memberContent events.MemberContent - if err := json.Unmarshal(event.Content(), &memberContent); err != nil { + membership, err = event.Membership() + if err != nil { return err } - membership = &memberContent.Membership } if err := d.roomstate.upsertRoomState(txn, event, membership, int64(streamPos)); err != nil { return err @@ -473,11 +473,11 @@ func removeDuplicates(stateEvents, recentEvents []gomatrixserverlib.Event) []gom // with type 'm.room.member' and state_key of userID. Otherwise, an empty string is returned. func getMembershipFromEvent(ev *gomatrixserverlib.Event, userID string) string { if ev.Type() == "m.room.member" && ev.StateKeyEquals(userID) { - var memberContent events.MemberContent - if err := json.Unmarshal(ev.Content(), &memberContent); err != nil { + membership, err := ev.Membership() + if err != nil { return "" } - return memberContent.Membership + return *membership } return "" } diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go index 1cc9c4e28..31f0fe4e1 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go @@ -15,11 +15,9 @@ package sync import ( - "encoding/json" "sync" log "github.com/Sirupsen/logrus" - "github.com/matrix-org/dendrite/clientapi/events" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" @@ -68,14 +66,14 @@ func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, pos types.StreamPosit // If this is an invite, also add in the invitee to this list. if ev.Type() == "m.room.member" && ev.StateKey() != nil { userID := *ev.StateKey() - var memberContent events.MemberContent - if err := json.Unmarshal(ev.Content(), &memberContent); err != nil { + membership, err := ev.Membership() + if err != nil { log.WithError(err).WithField("event_id", ev.EventID()).Errorf( "Notifier.OnNewEvent: Failed to unmarshal member event", ) } else { // Keep the joined user map up-to-date - switch memberContent.Membership { + switch *membership { case "invite": userIDs = append(userIDs, userID) case "join": diff --git a/sync-server-config.yaml b/sync-server-config.yaml deleted file mode 100644 index cbe452b35..000000000 --- a/sync-server-config.yaml +++ /dev/null @@ -1,8 +0,0 @@ -# A list of URIs which host Kafka logs. -consumer_uris: ["localhost:9092"] - -# The name of the topic which the sync server will consume events from. -roomserver_topic: "roomserverOutput" - -# The database URI to store sync server information. -database: "postgres://dendrite:itsasecret@localhost/syncserver?sslmode=disable" diff --git a/vendor/src/github.com/matrix-org/gomatrixserverlib/event.go b/vendor/src/github.com/matrix-org/gomatrixserverlib/event.go index 11a165131..896a6cf0c 100644 --- a/vendor/src/github.com/matrix-org/gomatrixserverlib/event.go +++ b/vendor/src/github.com/matrix-org/gomatrixserverlib/event.go @@ -18,8 +18,10 @@ package gomatrixserverlib import ( "encoding/json" "fmt" - "golang.org/x/crypto/ed25519" + "strings" "time" + + "golang.org/x/crypto/ed25519" ) // A StateKeyTuple is the combination of an event type and an event state key. @@ -54,7 +56,7 @@ type EventBuilder struct { Type string `json:"type"` // The state_key of the event if the event is a state event or nil if the event is not a state event. StateKey *string `json:"state_key,omitempty"` - // The events that immediately preceeded this event in the room history. + // The events that immediately preceded this event in the room history. PrevEvents []EventReference `json:"prev_events"` // The events needed to authenticate this event. AuthEvents []EventReference `json:"auth_events"` @@ -112,7 +114,7 @@ var emptyEventReferenceList = []EventReference{} // Build a new Event. // This is used when a local event is created on this server. // Call this after filling out the necessary fields. -// This can be called mutliple times on the same builder. +// This can be called multiple times on the same builder. // A different event ID must be supplied each time this is called. func (eb *EventBuilder) Build(eventID string, now time.Time, origin ServerName, keyID KeyID, privateKey ed25519.PrivateKey) (result Event, err error) { var event struct { @@ -467,11 +469,44 @@ func (e Event) PrevEvents() []EventReference { return e.fields.PrevEvents } +// PrevEventIDs returns the event IDs of the direct ancestors of the event. +func (e Event) PrevEventIDs() []string { + result := make([]string, len(e.fields.PrevEvents)) + for i := range e.fields.PrevEvents { + result[i] = e.fields.PrevEvents[i].EventID + } + return result +} + +// Membership returns the value of the content.membership field if this event +// is an "m.room.member" event. +// Returns an error if the event is not a m.room.member event or if the content +// is not valid m.room.member content. +func (e Event) Membership() (*string, error) { + if e.fields.Type != MRoomMember { + return nil, fmt.Errorf("gomatrixserverlib: not an m.room.member event") + } + var content memberContent + if err := json.Unmarshal(e.fields.Content, &content); err != nil { + return nil, err + } + return &content.Membership, nil +} + // AuthEvents returns references to the events needed to auth the event. func (e Event) AuthEvents() []EventReference { return e.fields.AuthEvents } +// AuthEventIDs returns the event IDs of the events needed to auth the event. +func (e Event) AuthEventIDs() []string { + result := make([]string, len(e.fields.PrevEvents)) + for i := range e.fields.PrevEvents { + result[i] = e.fields.PrevEvents[i].EventID + } + return result +} + // Redacts returns the event ID of the event this event redacts. func (e Event) Redacts() string { return e.fields.Redacts @@ -534,3 +569,19 @@ func (er EventReference) MarshalJSON() ([]byte, error) { return json.Marshal(&tuple) } + +// ParseID splits a matrix ID into a local part and a server name. +func ParseID(sigil byte, id string) (local string, domain ServerName, err error) { + // IDs have the format: SIGIL LOCALPART ":" DOMAIN + // Split on the first ":" character since the domain can contain ":" + // characters. + if len(id) == 0 || id[0] != sigil { + return "", "", fmt.Errorf("gomatriserverlib: invalid ID %q doesn't start with %q", id, sigil) + } + parts := strings.SplitN(id, ":", 2) + if len(parts) != 2 { + // The ID must have a ":" character. + return "", "", fmt.Errorf("gomatrixserverlib: invalid ID %q missing ':'", id) + } + return parts[0][1:], ServerName(parts[1]), nil +}