mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-10 16:33:11 -06:00
Merge branch 'markjh/utility' into markjh/invites
This commit is contained in:
commit
91ecad8f51
76
dendrite-config.yaml
Normal file
76
dendrite-config.yaml
Normal file
|
|
@ -0,0 +1,76 @@
|
|||
# The config file format version
|
||||
# This is used by dendrite to tell if it understands the config format.
|
||||
# This will change if the structure of the config file changes or if the meaning
|
||||
# of an existing config key changes.
|
||||
version: 0
|
||||
|
||||
# The matrix specific config
|
||||
matrix:
|
||||
# The name of the server. This is usually the domain name, e.g 'matrix.org', 'localhost'.
|
||||
server_name: "example.com"
|
||||
# The path to the PEM formatted matrix private key.
|
||||
private_key: "/etc/dendrite/matrix_key.pem"
|
||||
# The x509 certificates used by the federation listeners for this server
|
||||
federation_certificates: ["/etc/dendrite/federation_tls.pem"]
|
||||
|
||||
# The media repository config
|
||||
media:
|
||||
# The base path to where the media files will be stored. May be relative or absolute.
|
||||
base_path: /var/dendrite/media
|
||||
|
||||
# The maximum file size in bytes that is allowed to be stored on this server.
|
||||
# Note: if max_file_size_bytes is set to 0, the size is unlimited.
|
||||
# Note: if max_file_size_bytes is not set, it will default to 10485760 (10MB)
|
||||
max_file_size_bytes: 10485760
|
||||
|
||||
# Whether to dynamically generate thumbnails on-the-fly if the requested resolution is not already generated
|
||||
# NOTE: This is a possible denial-of-service attack vector - use at your own risk
|
||||
dynamic_thumbnails: false
|
||||
|
||||
# A list of thumbnail sizes to be pre-generated for downloaded remote / uploaded content
|
||||
# method is one of crop or scale. If omitted, it will default to scale.
|
||||
# crop scales to fill the requested dimensions and crops the excess.
|
||||
# scale scales to fit the requested dimensions and one dimension may be smaller than requested.
|
||||
thumbnail_sizes:
|
||||
- width: 32
|
||||
height: 32
|
||||
method: crop
|
||||
- width: 96
|
||||
height: 96
|
||||
method: crop
|
||||
- width: 320
|
||||
height: 240
|
||||
method: scale
|
||||
- width: 640
|
||||
height: 480
|
||||
method: scale
|
||||
- width: 800
|
||||
height: 600
|
||||
method: scale
|
||||
|
||||
# The config for communicating with kafka
|
||||
kafka:
|
||||
# Where the kafka servers are running.
|
||||
addresses: ["localhost:9092"]
|
||||
# The names of the kafka topics to use.
|
||||
topics:
|
||||
input_room_event: roomserverInput
|
||||
output_room_event: roomserverOutput
|
||||
|
||||
# The postgres connection configs for connecting to the databases e.g a postgres:// URI
|
||||
database:
|
||||
account: "postgres://dendrite:itsasecret@localhost/dendrite_account?sslmode=disable"
|
||||
device: "postgres://dendrite:itsasecret@localhost/dendrite_device?sslmode=disable"
|
||||
media_api: "postgres://dendrite:itsasecret@localhost/dendrite_mediaapi?sslmode=disable"
|
||||
sync_api: "postgres://dendrite:itsasecret@localhost/dendrite_syncapi?sslmode=disable"
|
||||
room_server: "postgres://dendrite:itsasecret@localhost/dendrite_roomserver?sslmode=disable"
|
||||
server_key: "postgres://dendrite:itsasecret@localhost/dendrite_serverkey?sslmode=disable"
|
||||
|
||||
# The TCP host:port pairs to bind the internal HTTP APIs to.
|
||||
# These shouldn't be exposed to the public internet.
|
||||
listen:
|
||||
room_server: "localhost:7770"
|
||||
client_api: "localhost:7771"
|
||||
federation_api: "localhost:7772"
|
||||
sync_api: "localhost:7773"
|
||||
media_api: "localhost:7774"
|
||||
|
|
@ -1,38 +0,0 @@
|
|||
# The name of the server. This is usually the domain name, e.g 'matrix.org', 'localhost'.
|
||||
server_name: "example.com"
|
||||
|
||||
# The base path to where the media files will be stored. May be relative or absolute.
|
||||
base_path: /var/dendrite/media
|
||||
|
||||
# The maximum file size in bytes that is allowed to be stored on this server.
|
||||
# Note: if max_file_size_bytes is set to 0, the size is unlimited.
|
||||
# Note: if max_file_size_bytes is not set, it will default to 10485760 (10MB)
|
||||
max_file_size_bytes: 10485760
|
||||
|
||||
# The postgres connection config for connecting to the database e.g a postgres:// URI
|
||||
database: "postgres://dendrite:itsasecret@localhost/mediaapi?sslmode=disable"
|
||||
|
||||
# Whether to dynamically generate thumbnails on-the-fly if the requested resolution is not already generated
|
||||
# NOTE: This is a possible denial-of-service attack vector - use at your own risk
|
||||
dynamic_thumbnails: false
|
||||
|
||||
# A list of thumbnail sizes to be pre-generated for downloaded remote / uploaded content
|
||||
# method is one of crop or scale. If omitted, it will default to scale.
|
||||
# crop scales to fill the requested dimensions and crops the excess.
|
||||
# scale scales to fit the requested dimensions and one dimension may be smaller than requested.
|
||||
thumbnail_sizes:
|
||||
- width: 32
|
||||
height: 32
|
||||
method: crop
|
||||
- width: 96
|
||||
height: 96
|
||||
method: crop
|
||||
- width: 320
|
||||
height: 240
|
||||
method: scale
|
||||
- width: 640
|
||||
height: 480
|
||||
method: scale
|
||||
- width: 800
|
||||
height: 600
|
||||
method: scale
|
||||
|
|
@ -49,7 +49,7 @@ func (c *RoomserverProducer) SendEvents(events []gomatrixserverlib.Event, sendAs
|
|||
ires[i] = api.InputRoomEvent{
|
||||
Kind: api.KindNew,
|
||||
Event: event.JSON(),
|
||||
AuthEventIDs: authEventIDs(event),
|
||||
AuthEventIDs: event.AuthEventIDs(),
|
||||
SendAsServer: string(sendAsServer),
|
||||
}
|
||||
eventIDs[i] = event.EventID()
|
||||
|
|
@ -71,7 +71,7 @@ func (c *RoomserverProducer) SendEventWithState(state gomatrixserverlib.RespStat
|
|||
ires[i] = api.InputRoomEvent{
|
||||
Kind: api.KindOutlier,
|
||||
Event: outlier.JSON(),
|
||||
AuthEventIDs: authEventIDs(outlier),
|
||||
AuthEventIDs: outlier.AuthEventIDs(),
|
||||
}
|
||||
eventIDs[i] = outlier.EventID()
|
||||
}
|
||||
|
|
@ -84,7 +84,7 @@ func (c *RoomserverProducer) SendEventWithState(state gomatrixserverlib.RespStat
|
|||
ires[len(outliers)] = api.InputRoomEvent{
|
||||
Kind: api.KindNew,
|
||||
Event: event.JSON(),
|
||||
AuthEventIDs: authEventIDs(event),
|
||||
AuthEventIDs: event.AuthEventIDs(),
|
||||
HasState: true,
|
||||
StateEventIDs: stateEventIDs,
|
||||
}
|
||||
|
|
@ -93,14 +93,6 @@ func (c *RoomserverProducer) SendEventWithState(state gomatrixserverlib.RespStat
|
|||
return c.SendInputRoomEvents(ires, eventIDs)
|
||||
}
|
||||
|
||||
// TODO Make this a method on gomatrixserverlib.Event
|
||||
func authEventIDs(event gomatrixserverlib.Event) (ids []string) {
|
||||
for _, ref := range event.AuthEvents() {
|
||||
ids = append(ids, ref.EventID)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// SendInputRoomEvents writes the given input room events to the roomserver input log. The length of both
|
||||
// arrays must match, and each element must correspond to the same event.
|
||||
func (c *RoomserverProducer) SendInputRoomEvents(ires []api.InputRoomEvent, eventIDs []string) error {
|
||||
|
|
|
|||
|
|
@ -16,6 +16,8 @@ package readers
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
||||
"github.com/matrix-org/dendrite/clientapi/httputil"
|
||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||
|
|
@ -23,8 +25,6 @@ import (
|
|||
"github.com/matrix-org/gomatrix"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/util"
|
||||
"net/http"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// DirectoryRoom looks up a room alias
|
||||
|
|
@ -35,7 +35,7 @@ func DirectoryRoom(
|
|||
federation *gomatrixserverlib.FederationClient,
|
||||
cfg *config.Dendrite,
|
||||
) util.JSONResponse {
|
||||
domain, err := domainFromID(roomAlias)
|
||||
_, domain, err := gomatrixserverlib.ParseID('#', roomAlias)
|
||||
if err != nil {
|
||||
return util.JSONResponse{
|
||||
Code: 400,
|
||||
|
|
@ -69,19 +69,3 @@ func DirectoryRoom(
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
// domainFromID returns everything after the first ":" character to extract
|
||||
// the domain part of a matrix ID.
|
||||
// TODO: duplicated from gomatrixserverlib.
|
||||
func domainFromID(id string) (gomatrixserverlib.ServerName, error) {
|
||||
// IDs have the format: SIGIL LOCALPART ":" DOMAIN
|
||||
// Split on the first ":" character since the domain can contain ":"
|
||||
// characters.
|
||||
parts := strings.SplitN(id, ":", 2)
|
||||
if len(parts) != 2 {
|
||||
// The ID must have a ":" character.
|
||||
return "", fmt.Errorf("invalid ID: %q", id)
|
||||
}
|
||||
// Return everything after the first ":" character.
|
||||
return gomatrixserverlib.ServerName(parts[1]), nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -60,14 +60,7 @@ func (r createRoomRequest) Validate() *util.JSONResponse {
|
|||
// It should be a struct (with pointers into a single string to avoid copying) and
|
||||
// we should update all refs to use UserID types rather than strings.
|
||||
// https://github.com/matrix-org/synapse/blob/v0.19.2/synapse/types.py#L92
|
||||
if len(userID) == 0 || userID[0] != '@' {
|
||||
return &util.JSONResponse{
|
||||
Code: 400,
|
||||
JSON: jsonerror.BadJSON("user id must start with '@'"),
|
||||
}
|
||||
}
|
||||
parts := strings.SplitN(userID[1:], ":", 2)
|
||||
if len(parts) != 2 {
|
||||
if _, _, err := gomatrixserverlib.ParseID('@', userID); err != nil {
|
||||
return &util.JSONResponse{
|
||||
Code: 400,
|
||||
JSON: jsonerror.BadJSON("user id must be in the form @localpart:domain"),
|
||||
|
|
@ -192,9 +185,13 @@ func createRoom(req *http.Request, device *authtypes.Device, cfg config.Dendrite
|
|||
return httputil.LogThenError(req, err)
|
||||
}
|
||||
|
||||
response := createRoomResponse{
|
||||
RoomID: roomID,
|
||||
}
|
||||
|
||||
return util.JSONResponse{
|
||||
Code: 200,
|
||||
JSON: builtEvents,
|
||||
JSON: response,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -16,6 +16,10 @@ package writers
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
||||
"github.com/matrix-org/dendrite/clientapi/httputil"
|
||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||
|
|
@ -25,9 +29,6 @@ import (
|
|||
"github.com/matrix-org/gomatrix"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/util"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// JoinRoomByIDOrAlias implements the "/join/{roomIDOrAlias}" API.
|
||||
|
|
@ -88,7 +89,7 @@ func (r joinRoomReq) joinRoomByID() util.JSONResponse {
|
|||
|
||||
// joinRoomByAlias joins a room using a room alias.
|
||||
func (r joinRoomReq) joinRoomByAlias(roomAlias string) util.JSONResponse {
|
||||
domain, err := domainFromID(roomAlias)
|
||||
_, domain, err := gomatrixserverlib.ParseID('#', roomAlias)
|
||||
if err != nil {
|
||||
return util.JSONResponse{
|
||||
Code: 400,
|
||||
|
|
@ -245,19 +246,3 @@ func (r joinRoomReq) joinRoomUsingServer(roomID string, server gomatrixserverlib
|
|||
}{roomID},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// domainFromID returns everything after the first ":" character to extract
|
||||
// the domain part of a matrix ID.
|
||||
// TODO: duplicated from gomatrixserverlib.
|
||||
func domainFromID(id string) (gomatrixserverlib.ServerName, error) {
|
||||
// IDs have the format: SIGIL LOCALPART ":" DOMAIN
|
||||
// Split on the first ":" character since the domain can contain ":"
|
||||
// characters.
|
||||
parts := strings.SplitN(id, ":", 2)
|
||||
if len(parts) != 2 {
|
||||
// The ID must have a ":" character.
|
||||
return "", fmt.Errorf("invalid ID: %q", id)
|
||||
}
|
||||
// Return everything after the first ":" character.
|
||||
return gomatrixserverlib.ServerName(parts[1]), nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,74 @@
|
|||
// Copyright 2017 Vector Creations Ltd
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"net/http"
|
||||
"os"
|
||||
|
||||
"github.com/matrix-org/dendrite/common"
|
||||
"github.com/matrix-org/dendrite/common/config"
|
||||
"github.com/matrix-org/dendrite/federationsender/consumers"
|
||||
"github.com/matrix-org/dendrite/federationsender/queue"
|
||||
"github.com/matrix-org/dendrite/federationsender/storage"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
|
||||
log "github.com/Sirupsen/logrus"
|
||||
)
|
||||
|
||||
var configPath = flag.String("config", "dendrite.yaml", "The path to the config file. For more information, see the config file in this repository.")
|
||||
|
||||
func main() {
|
||||
common.SetupLogging(os.Getenv("LOG_DIR"))
|
||||
|
||||
flag.Parse()
|
||||
|
||||
if *configPath == "" {
|
||||
log.Fatal("--config must be supplied")
|
||||
}
|
||||
cfg, err := config.Load(*configPath)
|
||||
if err != nil {
|
||||
log.Fatalf("Invalid config file: %s", err)
|
||||
}
|
||||
|
||||
log.Info("config: ", cfg)
|
||||
|
||||
db, err := storage.NewDatabase(string(cfg.Database.FederationSender))
|
||||
if err != nil {
|
||||
log.Panicf("startup: failed to create federation sender database with data source %s : %s", cfg.Database.FederationSender, err)
|
||||
}
|
||||
|
||||
federation := gomatrixserverlib.NewFederationClient(
|
||||
cfg.Matrix.ServerName, cfg.Matrix.KeyID, cfg.Matrix.PrivateKey,
|
||||
)
|
||||
|
||||
queues := queue.NewOutgoingQueues(cfg.Matrix.ServerName, federation)
|
||||
|
||||
consumer, err := consumers.NewOutputRoomEvent(cfg, queues, db)
|
||||
if err != nil {
|
||||
log.WithError(err).Panicf("startup: failed to create room server consumer")
|
||||
}
|
||||
if err = consumer.Start(); err != nil {
|
||||
log.WithError(err).Panicf("startup: failed to start room server consumer")
|
||||
}
|
||||
|
||||
http.DefaultServeMux.Handle("/metrics", prometheus.Handler())
|
||||
|
||||
if err := http.ListenAndServe(string(cfg.Listen.FederationSender), nil); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
|
@ -30,7 +30,7 @@ import (
|
|||
|
||||
// Version is the current version of the config format.
|
||||
// This will change whenever we make breaking changes to the config format.
|
||||
const Version = "v0"
|
||||
const Version = 0
|
||||
|
||||
// Dendrite contains all the config used by a dendrite process.
|
||||
// Relative paths are resolved relative to the current working directory
|
||||
|
|
@ -41,7 +41,7 @@ type Dendrite struct {
|
|||
// to update their config file to the current version.
|
||||
// The version of the file should only be different if there has
|
||||
// been a breaking change to the config file format.
|
||||
Version string `yaml:"version"`
|
||||
Version int `yaml:"version"`
|
||||
|
||||
// The configuration required for a matrix server.
|
||||
Matrix struct {
|
||||
|
|
@ -122,16 +122,20 @@ type Dendrite struct {
|
|||
// The RoomServer database stores information about matrix rooms.
|
||||
// It is only accessed by the RoomServer.
|
||||
RoomServer DataSource `yaml:"room_server"`
|
||||
// The FederationSender database stores information used by the FederationSender
|
||||
// It is only accessed by the FederationSender.
|
||||
FederationSender DataSource `yaml:"federation_sender"`
|
||||
} `yaml:"database"`
|
||||
|
||||
// The internal addresses the components will listen on.
|
||||
// These should not be exposed externally as they expose metrics and debugging APIs.
|
||||
Listen struct {
|
||||
MediaAPI Address `yaml:"media_api"`
|
||||
ClientAPI Address `yaml:"client_api"`
|
||||
FederationAPI Address `yaml:"federation_api"`
|
||||
SyncAPI Address `yaml:"sync_api"`
|
||||
RoomServer Address `yaml:"room_server"`
|
||||
MediaAPI Address `yaml:"media_api"`
|
||||
ClientAPI Address `yaml:"client_api"`
|
||||
FederationAPI Address `yaml:"federation_api"`
|
||||
SyncAPI Address `yaml:"sync_api"`
|
||||
RoomServer Address `yaml:"room_server"`
|
||||
FederationSender Address `yaml:"federation_sender"`
|
||||
} `yaml:"listen"`
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -32,7 +32,7 @@ func TestLoadConfigRelative(t *testing.T) {
|
|||
}
|
||||
|
||||
const testConfig = `
|
||||
version: v0
|
||||
version: 0
|
||||
matrix:
|
||||
server_name: localhost
|
||||
private_key: matrix_key.pem
|
||||
|
|
|
|||
|
|
@ -118,11 +118,7 @@ type unknownRoomError struct {
|
|||
func (e unknownRoomError) Error() string { return fmt.Sprintf("unknown room %q", e.roomID) }
|
||||
|
||||
func (t *txnReq) processEvent(e gomatrixserverlib.Event) error {
|
||||
refs := e.PrevEvents()
|
||||
prevEventIDs := make([]string, len(refs))
|
||||
for i := range refs {
|
||||
prevEventIDs[i] = refs[i].EventID
|
||||
}
|
||||
prevEventIDs := e.PrevEventIDs()
|
||||
|
||||
// Fetch the state needed to authenticate the event.
|
||||
needed := gomatrixserverlib.StateNeededForAuth([]gomatrixserverlib.Event{e})
|
||||
|
|
|
|||
|
|
@ -0,0 +1,342 @@
|
|||
// Copyright 2017 Vector Creations Ltd
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package consumers
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
log "github.com/Sirupsen/logrus"
|
||||
"github.com/matrix-org/dendrite/common"
|
||||
"github.com/matrix-org/dendrite/common/config"
|
||||
"github.com/matrix-org/dendrite/federationsender/queue"
|
||||
"github.com/matrix-org/dendrite/federationsender/storage"
|
||||
"github.com/matrix-org/dendrite/federationsender/types"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
sarama "gopkg.in/Shopify/sarama.v1"
|
||||
)
|
||||
|
||||
// OutputRoomEvent consumes events that originated in the room server.
|
||||
type OutputRoomEvent struct {
|
||||
roomServerConsumer *common.ContinualConsumer
|
||||
db *storage.Database
|
||||
queues *queue.OutgoingQueues
|
||||
query api.RoomserverQueryAPI
|
||||
}
|
||||
|
||||
// NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers.
|
||||
func NewOutputRoomEvent(cfg *config.Dendrite, queues *queue.OutgoingQueues, store *storage.Database) (*OutputRoomEvent, error) {
|
||||
kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
roomServerURL := cfg.RoomServerURL()
|
||||
|
||||
consumer := common.ContinualConsumer{
|
||||
Topic: string(cfg.Kafka.Topics.OutputRoomEvent),
|
||||
Consumer: kafkaConsumer,
|
||||
PartitionStore: store,
|
||||
}
|
||||
s := &OutputRoomEvent{
|
||||
roomServerConsumer: &consumer,
|
||||
db: store,
|
||||
queues: queues,
|
||||
query: api.NewRoomserverQueryAPIHTTP(roomServerURL, nil),
|
||||
}
|
||||
consumer.ProcessMessage = s.onMessage
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// Start consuming from room servers
|
||||
func (s *OutputRoomEvent) Start() error {
|
||||
return s.roomServerConsumer.Start()
|
||||
}
|
||||
|
||||
// onMessage is called when the federation server receives a new event from the room server output log.
|
||||
// It is unsafe to call this with messages for the same room in multiple gorountines
|
||||
// because updates it will likely fail with a types.EventIDMismatchError when it
|
||||
// realises that it cannot update the room state using the deltas.
|
||||
func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error {
|
||||
// Parse out the event JSON
|
||||
var output api.OutputRoomEvent
|
||||
if err := json.Unmarshal(msg.Value, &output); err != nil {
|
||||
// If the message was invalid, log it and move on to the next message in the stream
|
||||
log.WithError(err).Errorf("roomserver output log: message parse failure")
|
||||
return nil
|
||||
}
|
||||
|
||||
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(output.Event, false)
|
||||
if err != nil {
|
||||
log.WithError(err).Errorf("roomserver output log: event parse failure")
|
||||
return nil
|
||||
}
|
||||
log.WithFields(log.Fields{
|
||||
"event_id": ev.EventID(),
|
||||
"room_id": ev.RoomID(),
|
||||
"send_as_server": output.SendAsServer,
|
||||
}).Info("received event from roomserver")
|
||||
|
||||
if err = s.processMessage(output, ev); err != nil {
|
||||
// panic rather than continue with an inconsistent database
|
||||
log.WithFields(log.Fields{
|
||||
"event": string(ev.JSON()),
|
||||
log.ErrorKey: err,
|
||||
"add": output.AddsStateEventIDs,
|
||||
"del": output.RemovesStateEventIDs,
|
||||
}).Panicf("roomserver output log: write event failure")
|
||||
return nil
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// processMessage updates the list of currently joined hosts in the room
|
||||
// and then sends the event to the hosts that were joined before the event.
|
||||
func (s *OutputRoomEvent) processMessage(ore api.OutputRoomEvent, ev gomatrixserverlib.Event) error {
|
||||
addsStateEvents, err := s.lookupStateEvents(ore.AddsStateEventIDs, ev)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
addsJoinedHosts, err := joinedHostsFromEvents(addsStateEvents)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Update our copy of the current state.
|
||||
// We keep a copy of the current state because the state at each event is
|
||||
// expressed as a delta against the current state.
|
||||
// TODO: handle EventIDMismatchError and recover the current state by talking
|
||||
// to the roomserver
|
||||
oldJoinedHosts, err := s.db.UpdateRoom(
|
||||
ev.RoomID(), ore.LastSentEventID, ev.EventID(),
|
||||
addsJoinedHosts, ore.RemovesStateEventIDs,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if ore.SendAsServer == api.DoNotSendToOtherServers {
|
||||
// Ignore event that we don't need to send anywhere.
|
||||
return nil
|
||||
}
|
||||
|
||||
// Work out which hosts were joined at the event itself.
|
||||
joinedHostsAtEvent, err := s.joinedHostsAtEvent(ore, ev, oldJoinedHosts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Send the event.
|
||||
if err = s.queues.SendEvent(
|
||||
&ev, gomatrixserverlib.ServerName(ore.SendAsServer), joinedHostsAtEvent,
|
||||
); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// joinedHostsAtEvent works out a list of matrix servers that were joined to
|
||||
// the room at the event.
|
||||
// It is important to use the state at the event for sending messages because:
|
||||
// 1) We shouldn't send messages to servers that weren't in the room.
|
||||
// 2) If a server is kicked from the rooms it should still be told about the
|
||||
// kick event,
|
||||
// Usually the list can be calculated locally, but sometimes it will need fetch
|
||||
// events from the room server.
|
||||
// Returns an error if there was a problem talking to the room server.
|
||||
func (s *OutputRoomEvent) joinedHostsAtEvent(
|
||||
ore api.OutputRoomEvent, ev gomatrixserverlib.Event, oldJoinedHosts []types.JoinedHost,
|
||||
) ([]gomatrixserverlib.ServerName, error) {
|
||||
// Combine the delta into a single delta so that the adds and removes can
|
||||
// cancel each other out. This should reduce the number of times we need
|
||||
// to fetch a state event from the room server.
|
||||
combinedAdds, combinedRemoves := combineDeltas(
|
||||
ore.AddsStateEventIDs, ore.RemovesStateEventIDs,
|
||||
ore.StateBeforeAddsEventIDs, ore.StateBeforeRemovesEventIDs,
|
||||
)
|
||||
combinedAddsEvents, err := s.lookupStateEvents(combinedAdds, ev)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
combinedAddsJoinedHosts, err := joinedHostsFromEvents(combinedAddsEvents)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
removed := map[string]bool{}
|
||||
for _, eventID := range combinedRemoves {
|
||||
removed[eventID] = true
|
||||
}
|
||||
|
||||
joined := map[gomatrixserverlib.ServerName]bool{}
|
||||
for _, joinedHost := range oldJoinedHosts {
|
||||
if removed[joinedHost.MemberEventID] {
|
||||
// This m.room.member event is part of the current state of the
|
||||
// room, but not part of the state at the event we are processing
|
||||
// Therefore we can't use it to tell whether the server was in
|
||||
// the room at the event.
|
||||
continue
|
||||
}
|
||||
joined[joinedHost.ServerName] = true
|
||||
}
|
||||
|
||||
for _, joinedHost := range combinedAddsJoinedHosts {
|
||||
// This m.room.member event was part of the state of the room at the
|
||||
// event, but isn't part of the current state of the room now.
|
||||
joined[joinedHost.ServerName] = true
|
||||
}
|
||||
|
||||
var result []gomatrixserverlib.ServerName
|
||||
for serverName, include := range joined {
|
||||
if include {
|
||||
result = append(result, serverName)
|
||||
}
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// joinedHostsFromEvents turns a list of state events into a list of joined hosts.
|
||||
// This errors if one of the events was invalid.
|
||||
// It should be impossible for an invalid event to get this far in the pipeline.
|
||||
func joinedHostsFromEvents(evs []gomatrixserverlib.Event) ([]types.JoinedHost, error) {
|
||||
var joinedHosts []types.JoinedHost
|
||||
for _, ev := range evs {
|
||||
if ev.Type() != "m.room.member" || ev.StateKey() == nil {
|
||||
continue
|
||||
}
|
||||
membership, err := ev.Membership()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if *membership != "join" {
|
||||
continue
|
||||
}
|
||||
_, serverName, err := gomatrixserverlib.ParseID('@', *ev.StateKey())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
joinedHosts = append(joinedHosts, types.JoinedHost{
|
||||
MemberEventID: ev.EventID(), ServerName: serverName,
|
||||
})
|
||||
}
|
||||
return joinedHosts, nil
|
||||
}
|
||||
|
||||
// combineDeltas combines two deltas into a single delta.
|
||||
// Assumes that the order of operations is add(1), remove(1), add(2), remove(2).
|
||||
// Removes duplicate entries and redundant operations from each delta.
|
||||
func combineDeltas(adds1, removes1, adds2, removes2 []string) (adds, removes []string) {
|
||||
addSet := map[string]bool{}
|
||||
removeSet := map[string]bool{}
|
||||
|
||||
// combine processes each unique value in a list.
|
||||
// If the value is in the removeFrom set then it is removed from that set.
|
||||
// Otherwise it is added to the addTo set.
|
||||
combine := func(values []string, removeFrom, addTo map[string]bool) {
|
||||
processed := map[string]bool{}
|
||||
for _, value := range values {
|
||||
if processed[value] {
|
||||
continue
|
||||
}
|
||||
processed[value] = true
|
||||
if removeFrom[value] {
|
||||
delete(removeFrom, value)
|
||||
} else {
|
||||
addTo[value] = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
combine(adds1, nil, addSet)
|
||||
combine(removes1, addSet, removeSet)
|
||||
combine(adds2, removeSet, addSet)
|
||||
combine(removes2, addSet, removeSet)
|
||||
|
||||
for value := range addSet {
|
||||
adds = append(adds, value)
|
||||
}
|
||||
for value := range removeSet {
|
||||
removes = append(removes, value)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// lookupStateEvents looks up the state events that are added by a new event.
|
||||
func (s *OutputRoomEvent) lookupStateEvents(
|
||||
addsStateEventIDs []string, event gomatrixserverlib.Event,
|
||||
) ([]gomatrixserverlib.Event, error) {
|
||||
// Fast path if there aren't any new state events.
|
||||
if len(addsStateEventIDs) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Fast path if the only state event added is the event itself.
|
||||
if len(addsStateEventIDs) == 1 && addsStateEventIDs[0] == event.EventID() {
|
||||
return []gomatrixserverlib.Event{event}, nil
|
||||
}
|
||||
|
||||
missing := addsStateEventIDs
|
||||
var result []gomatrixserverlib.Event
|
||||
|
||||
// Check if event itself is being added.
|
||||
for _, eventID := range missing {
|
||||
if eventID == event.EventID() {
|
||||
result = append(result, event)
|
||||
break
|
||||
}
|
||||
}
|
||||
missing = missingEventsFrom(result, addsStateEventIDs)
|
||||
|
||||
if len(missing) == 0 {
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// At this point the missing events are neither the event itself nor are
|
||||
// they present in our local database. Our only option is to fetch them
|
||||
// from the roomserver using the query API.
|
||||
eventReq := api.QueryEventsByIDRequest{EventIDs: missing}
|
||||
var eventResp api.QueryEventsByIDResponse
|
||||
if err := s.query.QueryEventsByID(&eventReq, &eventResp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
result = append(result, eventResp.Events...)
|
||||
missing = missingEventsFrom(result, addsStateEventIDs)
|
||||
|
||||
if len(missing) != 0 {
|
||||
return nil, fmt.Errorf(
|
||||
"missing %d state events IDs at event %q", len(missing), event.EventID(),
|
||||
)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func missingEventsFrom(events []gomatrixserverlib.Event, required []string) []string {
|
||||
have := map[string]bool{}
|
||||
for _, event := range events {
|
||||
have[event.EventID()] = true
|
||||
}
|
||||
var missing []string
|
||||
for _, eventID := range required {
|
||||
if !have[eventID] {
|
||||
missing = append(missing, eventID)
|
||||
}
|
||||
}
|
||||
return missing
|
||||
}
|
||||
|
|
@ -0,0 +1,53 @@
|
|||
// Copyright 2017 Vector Creations Ltd
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package consumers
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestCombineNoOp(t *testing.T) {
|
||||
inputAdd1 := []string{"a", "b", "c"}
|
||||
inputDel1 := []string{"a", "b", "d"}
|
||||
inputAdd2 := []string{"a", "d", "e"}
|
||||
inputDel2 := []string{"a", "c", "e", "e"}
|
||||
|
||||
gotAdd, gotDel := combineDeltas(inputAdd1, inputDel1, inputAdd2, inputDel2)
|
||||
|
||||
if len(gotAdd) != 0 {
|
||||
t.Errorf("wanted combined adds to be an empty list, got %#v", gotAdd)
|
||||
}
|
||||
|
||||
if len(gotDel) != 0 {
|
||||
t.Errorf("wanted combined removes to be an empty list, got %#v", gotDel)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCombineDedup(t *testing.T) {
|
||||
inputAdd1 := []string{"a", "a"}
|
||||
inputDel1 := []string{"b", "b"}
|
||||
inputAdd2 := []string{"a", "a"}
|
||||
inputDel2 := []string{"b", "b"}
|
||||
|
||||
gotAdd, gotDel := combineDeltas(inputAdd1, inputDel1, inputAdd2, inputDel2)
|
||||
|
||||
if len(gotAdd) != 1 || gotAdd[0] != "a" {
|
||||
t.Errorf("wanted combined adds to be %#v, got %#v", []string{"a"}, gotAdd)
|
||||
}
|
||||
|
||||
if len(gotDel) != 1 || gotDel[0] != "b" {
|
||||
t.Errorf("wanted combined removes to be %#v, got %#v", []string{"b"}, gotDel)
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,105 @@
|
|||
// Copyright 2017 Vector Creations Ltd
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package queue
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
log "github.com/Sirupsen/logrus"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
// destinationQueue is a queue of events for a single destination.
|
||||
// It is responsible for sending the events to the destination and
|
||||
// ensures that only one request is in flight to a given destination
|
||||
// at a time.
|
||||
type destinationQueue struct {
|
||||
client *gomatrixserverlib.FederationClient
|
||||
origin gomatrixserverlib.ServerName
|
||||
destination gomatrixserverlib.ServerName
|
||||
// The running mutex protects running, sentCounter, lastTransactionIDs and
|
||||
// pendingEvents.
|
||||
runningMutex sync.Mutex
|
||||
running bool
|
||||
sentCounter int
|
||||
lastTransactionIDs []gomatrixserverlib.TransactionID
|
||||
pendingEvents []*gomatrixserverlib.Event
|
||||
}
|
||||
|
||||
// Send event adds the event to the pending queue for the destination.
|
||||
// If the queue is empty then it starts a background goroutine to
|
||||
// start sending events to that destination.
|
||||
func (oq *destinationQueue) sendEvent(ev *gomatrixserverlib.Event) {
|
||||
oq.runningMutex.Lock()
|
||||
defer oq.runningMutex.Unlock()
|
||||
oq.pendingEvents = append(oq.pendingEvents, ev)
|
||||
if !oq.running {
|
||||
oq.running = true
|
||||
go oq.backgroundSend()
|
||||
}
|
||||
}
|
||||
|
||||
func (oq *destinationQueue) backgroundSend() {
|
||||
for {
|
||||
t := oq.next()
|
||||
if t == nil {
|
||||
// If the queue is empty then stop processing for this destination.
|
||||
// TODO: Remove this destination from the queue map.
|
||||
return
|
||||
}
|
||||
|
||||
// TODO: handle retries.
|
||||
// TODO: blacklist uncooperative servers.
|
||||
|
||||
_, err := oq.client.SendTransaction(*t)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"destination": oq.destination,
|
||||
log.ErrorKey: err,
|
||||
}).Info("problem sending transaction")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// next creates a new transaction from the pending event queue
|
||||
// and flushes the queue.
|
||||
// Returns nil if the queue was empty.
|
||||
func (oq *destinationQueue) next() *gomatrixserverlib.Transaction {
|
||||
oq.runningMutex.Lock()
|
||||
defer oq.runningMutex.Unlock()
|
||||
if len(oq.pendingEvents) == 0 {
|
||||
oq.running = false
|
||||
return nil
|
||||
}
|
||||
var t gomatrixserverlib.Transaction
|
||||
now := gomatrixserverlib.AsTimestamp(time.Now())
|
||||
t.TransactionID = gomatrixserverlib.TransactionID(fmt.Sprintf("%d-%d", now, oq.sentCounter))
|
||||
t.Origin = oq.origin
|
||||
t.Destination = oq.destination
|
||||
t.OriginServerTS = now
|
||||
t.PreviousIDs = oq.lastTransactionIDs
|
||||
if t.PreviousIDs == nil {
|
||||
t.PreviousIDs = []gomatrixserverlib.TransactionID{}
|
||||
}
|
||||
oq.lastTransactionIDs = []gomatrixserverlib.TransactionID{t.TransactionID}
|
||||
for _, pdu := range oq.pendingEvents {
|
||||
t.PDUs = append(t.PDUs, *pdu)
|
||||
}
|
||||
oq.pendingEvents = nil
|
||||
oq.sentCounter += len(t.PDUs)
|
||||
return &t
|
||||
}
|
||||
|
|
@ -0,0 +1,95 @@
|
|||
// Copyright 2017 Vector Creations Ltd
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package queue
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
log "github.com/Sirupsen/logrus"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
// OutgoingQueues is a collection of queues for sending transactions to other
|
||||
// matrix servers
|
||||
type OutgoingQueues struct {
|
||||
origin gomatrixserverlib.ServerName
|
||||
client *gomatrixserverlib.FederationClient
|
||||
// The queuesMutex protects queues
|
||||
queuesMutex sync.Mutex
|
||||
queues map[gomatrixserverlib.ServerName]*destinationQueue
|
||||
}
|
||||
|
||||
// NewOutgoingQueues makes a new OutgoingQueues
|
||||
func NewOutgoingQueues(origin gomatrixserverlib.ServerName, client *gomatrixserverlib.FederationClient) *OutgoingQueues {
|
||||
return &OutgoingQueues{
|
||||
origin: origin,
|
||||
client: client,
|
||||
queues: map[gomatrixserverlib.ServerName]*destinationQueue{},
|
||||
}
|
||||
}
|
||||
|
||||
// SendEvent sends an event to the destinations
|
||||
func (oqs *OutgoingQueues) SendEvent(
|
||||
ev *gomatrixserverlib.Event, origin gomatrixserverlib.ServerName,
|
||||
destinations []gomatrixserverlib.ServerName,
|
||||
) error {
|
||||
if origin != oqs.origin {
|
||||
// TODO: Support virtual hosting by allowing us to send events using
|
||||
// different origin server names.
|
||||
// For now assume we are always asked to send as the single server configured
|
||||
// in the dendrite config.
|
||||
return fmt.Errorf(
|
||||
"sendevent: unexpected server to send as: got %q expected %q",
|
||||
origin, oqs.origin,
|
||||
)
|
||||
}
|
||||
|
||||
// Remove our own server from the list of destinations.
|
||||
destinations = filterDestinations(oqs.origin, destinations)
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"destinations": destinations, "event": ev.EventID(),
|
||||
}).Info("Sending event")
|
||||
|
||||
oqs.queuesMutex.Lock()
|
||||
defer oqs.queuesMutex.Unlock()
|
||||
for _, destination := range destinations {
|
||||
oq := oqs.queues[destination]
|
||||
if oq == nil {
|
||||
oq = &destinationQueue{
|
||||
origin: oqs.origin,
|
||||
destination: destination,
|
||||
client: oqs.client,
|
||||
}
|
||||
oqs.queues[destination] = oq
|
||||
}
|
||||
oq.sendEvent(ev)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// filterDestinations removes our own server from the list of destinations.
|
||||
// Otherwise we could end up trying to talk to ourselves.
|
||||
func filterDestinations(origin gomatrixserverlib.ServerName, destinations []gomatrixserverlib.ServerName) []gomatrixserverlib.ServerName {
|
||||
var result []gomatrixserverlib.ServerName
|
||||
for _, destination := range destinations {
|
||||
if destination == origin {
|
||||
continue
|
||||
}
|
||||
result = append(result, destination)
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
|
@ -0,0 +1,111 @@
|
|||
// Copyright 2017 Vector Creations Ltd
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package storage
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
|
||||
"github.com/lib/pq"
|
||||
"github.com/matrix-org/dendrite/federationsender/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
const joinedHostsSchema = `
|
||||
-- The joined_hosts table stores a list of m.room.member event ids in the
|
||||
-- current state for each room where the membership is "join".
|
||||
-- There will be an entry for every user that is joined to the room.
|
||||
CREATE TABLE IF NOT EXISTS joined_hosts (
|
||||
-- The string ID of the room.
|
||||
room_id TEXT NOT NULL,
|
||||
-- The event ID of the m.room.member join event.
|
||||
event_id TEXT NOT NULL,
|
||||
-- The domain part of the user ID the m.room.member event is for.
|
||||
server_name TEXT NOT NULL
|
||||
);
|
||||
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS joined_hosts_event_id_idx
|
||||
ON joined_hosts (event_id);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS joined_hosts_room_id_idx
|
||||
ON joined_hosts (room_id)
|
||||
`
|
||||
|
||||
const insertJoinedHostsSQL = "" +
|
||||
"INSERT INTO joined_hosts (room_id, event_id, server_name)" +
|
||||
" VALUES ($1, $2, $3)"
|
||||
|
||||
const deleteJoinedHostsSQL = "" +
|
||||
"DELETE FROM joined_hosts WHERE event_id = ANY($1)"
|
||||
|
||||
const selectJoinedHostsSQL = "" +
|
||||
"SELECT event_id, server_name FROM joined_hosts" +
|
||||
" WHERE room_id = $1"
|
||||
|
||||
type joinedHostsStatements struct {
|
||||
insertJoinedHostsStmt *sql.Stmt
|
||||
deleteJoinedHostsStmt *sql.Stmt
|
||||
selectJoinedHostsStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func (s *joinedHostsStatements) prepare(db *sql.DB) (err error) {
|
||||
_, err = db.Exec(joinedHostsSchema)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if s.insertJoinedHostsStmt, err = db.Prepare(insertJoinedHostsSQL); err != nil {
|
||||
return
|
||||
}
|
||||
if s.deleteJoinedHostsStmt, err = db.Prepare(deleteJoinedHostsSQL); err != nil {
|
||||
return
|
||||
}
|
||||
if s.selectJoinedHostsStmt, err = db.Prepare(selectJoinedHostsSQL); err != nil {
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (s *joinedHostsStatements) insertJoinedHosts(
|
||||
txn *sql.Tx, roomID, eventID string, serverName gomatrixserverlib.ServerName,
|
||||
) error {
|
||||
_, err := txn.Stmt(s.insertJoinedHostsStmt).Exec(roomID, eventID, serverName)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *joinedHostsStatements) deleteJoinedHosts(txn *sql.Tx, eventIDs []string) error {
|
||||
_, err := txn.Stmt(s.deleteJoinedHostsStmt).Exec(pq.StringArray(eventIDs))
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *joinedHostsStatements) selectJoinedHosts(txn *sql.Tx, roomID string,
|
||||
) ([]types.JoinedHost, error) {
|
||||
rows, err := txn.Stmt(s.selectJoinedHostsStmt).Query(roomID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var result []types.JoinedHost
|
||||
for rows.Next() {
|
||||
var eventID, serverName string
|
||||
if err = rows.Scan(&eventID, &serverName); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result = append(result, types.JoinedHost{
|
||||
MemberEventID: eventID,
|
||||
ServerName: gomatrixserverlib.ServerName(serverName),
|
||||
})
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
|
@ -0,0 +1,89 @@
|
|||
// Copyright 2017 Vector Creations Ltd
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package storage
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
)
|
||||
|
||||
const roomSchema = `
|
||||
CREATE TABLE IF NOT EXISTS rooms (
|
||||
-- The string ID of the room
|
||||
room_id TEXT PRIMARY KEY,
|
||||
-- The most recent event state by the room server.
|
||||
-- We can use this to tell if our view of the room state has become
|
||||
-- desynchronised.
|
||||
last_event_id TEXT NOT NULL
|
||||
);`
|
||||
|
||||
const insertRoomSQL = "" +
|
||||
"INSERT INTO rooms (room_id, last_event_id) VALUES ($1, '')" +
|
||||
" ON CONFLICT DO NOTHING"
|
||||
|
||||
const selectRoomForUpdateSQL = "" +
|
||||
"SELECT last_event_id FROM rooms WHERE room_id = $1 FOR UPDATE"
|
||||
|
||||
const updateRoomSQL = "" +
|
||||
"UPDATE rooms SET last_event_id = $2 WHERE room_id = $1"
|
||||
|
||||
type roomStatements struct {
|
||||
insertRoomStmt *sql.Stmt
|
||||
selectRoomForUpdateStmt *sql.Stmt
|
||||
updateRoomStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func (s *roomStatements) prepare(db *sql.DB) (err error) {
|
||||
_, err = db.Exec(roomSchema)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if s.insertRoomStmt, err = db.Prepare(insertRoomSQL); err != nil {
|
||||
return
|
||||
}
|
||||
if s.selectRoomForUpdateStmt, err = db.Prepare(selectRoomForUpdateSQL); err != nil {
|
||||
return
|
||||
}
|
||||
if s.updateRoomStmt, err = db.Prepare(updateRoomSQL); err != nil {
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// insertRoom inserts the room if it didn't already exist.
|
||||
// If the room didn't exist then last_event_id is set to the empty string.
|
||||
func (s *roomStatements) insertRoom(txn *sql.Tx, roomID string) error {
|
||||
_, err := txn.Stmt(s.insertRoomStmt).Exec(roomID)
|
||||
return err
|
||||
}
|
||||
|
||||
// selectRoomForUpdate locks the row for the room and returns the last_event_id.
|
||||
// The row must already exist in the table. Callers can ensure that the row
|
||||
// exists by calling insertRoom first.
|
||||
func (s *roomStatements) selectRoomForUpdate(txn *sql.Tx, roomID string) (string, error) {
|
||||
var lastEventID string
|
||||
err := txn.Stmt(s.selectRoomForUpdateStmt).QueryRow(roomID).Scan(&lastEventID)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return lastEventID, nil
|
||||
}
|
||||
|
||||
// updateRoom updates the last_event_id for the room. selectRoomForUpdate should
|
||||
// have already been called earlier within the transaction.
|
||||
func (s *roomStatements) updateRoom(txn *sql.Tx, roomID, lastEventID string) error {
|
||||
_, err := txn.Stmt(s.updateRoomStmt).Exec(roomID, lastEventID)
|
||||
return err
|
||||
}
|
||||
|
|
@ -0,0 +1,126 @@
|
|||
// Copyright 2017 Vector Creations Ltd
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package storage
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
|
||||
"github.com/matrix-org/dendrite/common"
|
||||
"github.com/matrix-org/dendrite/federationsender/types"
|
||||
)
|
||||
|
||||
// Database stores information needed by the federation sender
|
||||
type Database struct {
|
||||
joinedHostsStatements
|
||||
roomStatements
|
||||
common.PartitionOffsetStatements
|
||||
db *sql.DB
|
||||
}
|
||||
|
||||
// NewDatabase opens a new database
|
||||
func NewDatabase(dataSourceName string) (*Database, error) {
|
||||
var result Database
|
||||
var err error
|
||||
if result.db, err = sql.Open("postgres", dataSourceName); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = result.prepare(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
func (d *Database) prepare() error {
|
||||
var err error
|
||||
|
||||
if err = d.joinedHostsStatements.prepare(d.db); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = d.roomStatements.prepare(d.db); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = d.PartitionOffsetStatements.Prepare(d.db); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// PartitionOffsets implements common.PartitionStorer
|
||||
func (d *Database) PartitionOffsets(topic string) ([]common.PartitionOffset, error) {
|
||||
return d.SelectPartitionOffsets(topic)
|
||||
}
|
||||
|
||||
// SetPartitionOffset implements common.PartitionStorer
|
||||
func (d *Database) SetPartitionOffset(topic string, partition int32, offset int64) error {
|
||||
return d.UpsertPartitionOffset(topic, partition, offset)
|
||||
}
|
||||
|
||||
// UpdateRoom updates the joined hosts for a room and returns what the joined
|
||||
// hosts were before the update.
|
||||
func (d *Database) UpdateRoom(
|
||||
roomID, oldEventID, newEventID string,
|
||||
addHosts []types.JoinedHost,
|
||||
removeHosts []string,
|
||||
) (joinedHosts []types.JoinedHost, err error) {
|
||||
err = runTransaction(d.db, func(txn *sql.Tx) error {
|
||||
if err = d.insertRoom(txn, roomID); err != nil {
|
||||
return err
|
||||
}
|
||||
lastSentEventID, err := d.selectRoomForUpdate(txn, roomID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if lastSentEventID != oldEventID {
|
||||
return types.EventIDMismatchError{lastSentEventID, oldEventID}
|
||||
}
|
||||
joinedHosts, err = d.selectJoinedHosts(txn, roomID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, add := range addHosts {
|
||||
err = d.insertJoinedHosts(txn, roomID, add.MemberEventID, add.ServerName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if err = d.deleteJoinedHosts(txn, removeHosts); err != nil {
|
||||
return err
|
||||
}
|
||||
return d.updateRoom(txn, roomID, newEventID)
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
func runTransaction(db *sql.DB, fn func(txn *sql.Tx) error) (err error) {
|
||||
txn, err := db.Begin()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
txn.Rollback()
|
||||
panic(r)
|
||||
} else if err != nil {
|
||||
txn.Rollback()
|
||||
} else {
|
||||
err = txn.Commit()
|
||||
}
|
||||
}()
|
||||
err = fn(txn)
|
||||
return
|
||||
}
|
||||
|
|
@ -0,0 +1,45 @@
|
|||
// Copyright 2017 Vector Creations Ltd
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package types
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
// A JoinedHost is a server that is joined to a matrix room.
|
||||
type JoinedHost struct {
|
||||
// The MemberEventID of a m.room.member join event.
|
||||
MemberEventID string
|
||||
// The domain part of the state key of the m.room.member join event
|
||||
ServerName gomatrixserverlib.ServerName
|
||||
}
|
||||
|
||||
// A EventIDMismatchError indicates that we have got out of sync with the
|
||||
// room server.
|
||||
type EventIDMismatchError struct {
|
||||
// The event ID we have stored in our local database.
|
||||
DatabaseID string
|
||||
// The event ID received from the room server.
|
||||
RoomServerID string
|
||||
}
|
||||
|
||||
func (e EventIDMismatchError) Error() string {
|
||||
return fmt.Sprintf(
|
||||
"mismatched last sent event ID: had %q in database got %q from room server",
|
||||
e.DatabaseID, e.RoomServerID,
|
||||
)
|
||||
}
|
||||
|
|
@ -20,7 +20,6 @@ import (
|
|||
"net/http"
|
||||
"net/url"
|
||||
"path"
|
||||
"strings"
|
||||
|
||||
log "github.com/Sirupsen/logrus"
|
||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||
|
|
@ -29,6 +28,7 @@ import (
|
|||
"github.com/matrix-org/dendrite/mediaapi/storage"
|
||||
"github.com/matrix-org/dendrite/mediaapi/thumbnailer"
|
||||
"github.com/matrix-org/dendrite/mediaapi/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/util"
|
||||
)
|
||||
|
||||
|
|
@ -193,14 +193,7 @@ func (r *uploadRequest) Validate(maxFileSizeBytes config.FileSizeBytes) *util.JS
|
|||
// It should be a struct (with pointers into a single string to avoid copying) and
|
||||
// we should update all refs to use UserID types rather than strings.
|
||||
// https://github.com/matrix-org/synapse/blob/v0.19.2/synapse/types.py#L92
|
||||
if len(r.MediaMetadata.UserID) == 0 || r.MediaMetadata.UserID[0] != '@' {
|
||||
return &util.JSONResponse{
|
||||
Code: 400,
|
||||
JSON: jsonerror.Unknown("user id must start with '@'"),
|
||||
}
|
||||
}
|
||||
parts := strings.SplitN(string(r.MediaMetadata.UserID[1:]), ":", 2)
|
||||
if len(parts) != 2 {
|
||||
if _, _, err := gomatrixserverlib.ParseID('@', string(r.MediaMetadata.UserID)); err != nil {
|
||||
return &util.JSONResponse{
|
||||
Code: 400,
|
||||
JSON: jsonerror.BadJSON("user id must be in the form @localpart:domain"),
|
||||
|
|
|
|||
|
|
@ -16,11 +16,9 @@ package storage
|
|||
|
||||
import (
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
// Import the postgres database driver.
|
||||
_ "github.com/lib/pq"
|
||||
"github.com/matrix-org/dendrite/clientapi/events"
|
||||
"github.com/matrix-org/dendrite/common"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
|
|
@ -127,13 +125,15 @@ func (d *SyncServerDatabase) updateRoomState(
|
|||
// ignore non state events
|
||||
continue
|
||||
}
|
||||
var membership *string
|
||||
var (
|
||||
membership *string
|
||||
err error
|
||||
)
|
||||
if event.Type() == "m.room.member" {
|
||||
var memberContent events.MemberContent
|
||||
if err := json.Unmarshal(event.Content(), &memberContent); err != nil {
|
||||
membership, err = event.Membership()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
membership = &memberContent.Membership
|
||||
}
|
||||
if err := d.roomstate.upsertRoomState(txn, event, membership, int64(streamPos)); err != nil {
|
||||
return err
|
||||
|
|
@ -473,11 +473,11 @@ func removeDuplicates(stateEvents, recentEvents []gomatrixserverlib.Event) []gom
|
|||
// with type 'm.room.member' and state_key of userID. Otherwise, an empty string is returned.
|
||||
func getMembershipFromEvent(ev *gomatrixserverlib.Event, userID string) string {
|
||||
if ev.Type() == "m.room.member" && ev.StateKeyEquals(userID) {
|
||||
var memberContent events.MemberContent
|
||||
if err := json.Unmarshal(ev.Content(), &memberContent); err != nil {
|
||||
membership, err := ev.Membership()
|
||||
if err != nil {
|
||||
return ""
|
||||
}
|
||||
return memberContent.Membership
|
||||
return *membership
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
|
|
|||
|
|
@ -15,11 +15,9 @@
|
|||
package sync
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"sync"
|
||||
|
||||
log "github.com/Sirupsen/logrus"
|
||||
"github.com/matrix-org/dendrite/clientapi/events"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
|
|
@ -68,14 +66,14 @@ func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, pos types.StreamPosit
|
|||
// If this is an invite, also add in the invitee to this list.
|
||||
if ev.Type() == "m.room.member" && ev.StateKey() != nil {
|
||||
userID := *ev.StateKey()
|
||||
var memberContent events.MemberContent
|
||||
if err := json.Unmarshal(ev.Content(), &memberContent); err != nil {
|
||||
membership, err := ev.Membership()
|
||||
if err != nil {
|
||||
log.WithError(err).WithField("event_id", ev.EventID()).Errorf(
|
||||
"Notifier.OnNewEvent: Failed to unmarshal member event",
|
||||
)
|
||||
} else {
|
||||
// Keep the joined user map up-to-date
|
||||
switch memberContent.Membership {
|
||||
switch *membership {
|
||||
case "invite":
|
||||
userIDs = append(userIDs, userID)
|
||||
case "join":
|
||||
|
|
|
|||
|
|
@ -1,8 +0,0 @@
|
|||
# A list of URIs which host Kafka logs.
|
||||
consumer_uris: ["localhost:9092"]
|
||||
|
||||
# The name of the topic which the sync server will consume events from.
|
||||
roomserver_topic: "roomserverOutput"
|
||||
|
||||
# The database URI to store sync server information.
|
||||
database: "postgres://dendrite:itsasecret@localhost/syncserver?sslmode=disable"
|
||||
|
|
@ -18,8 +18,10 @@ package gomatrixserverlib
|
|||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"golang.org/x/crypto/ed25519"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"golang.org/x/crypto/ed25519"
|
||||
)
|
||||
|
||||
// A StateKeyTuple is the combination of an event type and an event state key.
|
||||
|
|
@ -54,7 +56,7 @@ type EventBuilder struct {
|
|||
Type string `json:"type"`
|
||||
// The state_key of the event if the event is a state event or nil if the event is not a state event.
|
||||
StateKey *string `json:"state_key,omitempty"`
|
||||
// The events that immediately preceeded this event in the room history.
|
||||
// The events that immediately preceded this event in the room history.
|
||||
PrevEvents []EventReference `json:"prev_events"`
|
||||
// The events needed to authenticate this event.
|
||||
AuthEvents []EventReference `json:"auth_events"`
|
||||
|
|
@ -112,7 +114,7 @@ var emptyEventReferenceList = []EventReference{}
|
|||
// Build a new Event.
|
||||
// This is used when a local event is created on this server.
|
||||
// Call this after filling out the necessary fields.
|
||||
// This can be called mutliple times on the same builder.
|
||||
// This can be called multiple times on the same builder.
|
||||
// A different event ID must be supplied each time this is called.
|
||||
func (eb *EventBuilder) Build(eventID string, now time.Time, origin ServerName, keyID KeyID, privateKey ed25519.PrivateKey) (result Event, err error) {
|
||||
var event struct {
|
||||
|
|
@ -467,11 +469,44 @@ func (e Event) PrevEvents() []EventReference {
|
|||
return e.fields.PrevEvents
|
||||
}
|
||||
|
||||
// PrevEventIDs returns the event IDs of the direct ancestors of the event.
|
||||
func (e Event) PrevEventIDs() []string {
|
||||
result := make([]string, len(e.fields.PrevEvents))
|
||||
for i := range e.fields.PrevEvents {
|
||||
result[i] = e.fields.PrevEvents[i].EventID
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// Membership returns the value of the content.membership field if this event
|
||||
// is an "m.room.member" event.
|
||||
// Returns an error if the event is not a m.room.member event or if the content
|
||||
// is not valid m.room.member content.
|
||||
func (e Event) Membership() (*string, error) {
|
||||
if e.fields.Type != MRoomMember {
|
||||
return nil, fmt.Errorf("gomatrixserverlib: not an m.room.member event")
|
||||
}
|
||||
var content memberContent
|
||||
if err := json.Unmarshal(e.fields.Content, &content); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &content.Membership, nil
|
||||
}
|
||||
|
||||
// AuthEvents returns references to the events needed to auth the event.
|
||||
func (e Event) AuthEvents() []EventReference {
|
||||
return e.fields.AuthEvents
|
||||
}
|
||||
|
||||
// AuthEventIDs returns the event IDs of the events needed to auth the event.
|
||||
func (e Event) AuthEventIDs() []string {
|
||||
result := make([]string, len(e.fields.PrevEvents))
|
||||
for i := range e.fields.PrevEvents {
|
||||
result[i] = e.fields.PrevEvents[i].EventID
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// Redacts returns the event ID of the event this event redacts.
|
||||
func (e Event) Redacts() string {
|
||||
return e.fields.Redacts
|
||||
|
|
@ -534,3 +569,19 @@ func (er EventReference) MarshalJSON() ([]byte, error) {
|
|||
|
||||
return json.Marshal(&tuple)
|
||||
}
|
||||
|
||||
// ParseID splits a matrix ID into a local part and a server name.
|
||||
func ParseID(sigil byte, id string) (local string, domain ServerName, err error) {
|
||||
// IDs have the format: SIGIL LOCALPART ":" DOMAIN
|
||||
// Split on the first ":" character since the domain can contain ":"
|
||||
// characters.
|
||||
if len(id) == 0 || id[0] != sigil {
|
||||
return "", "", fmt.Errorf("gomatriserverlib: invalid ID %q doesn't start with %q", id, sigil)
|
||||
}
|
||||
parts := strings.SplitN(id, ":", 2)
|
||||
if len(parts) != 2 {
|
||||
// The ID must have a ":" character.
|
||||
return "", "", fmt.Errorf("gomatrixserverlib: invalid ID %q missing ':'", id)
|
||||
}
|
||||
return parts[0][1:], ServerName(parts[1]), nil
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue