From 55aab58eeb069db0c5fb635e5024b9d84fbc09a9 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Fri, 3 Jan 2020 10:32:33 +0000 Subject: [PATCH] Implement interfaces for federationsender storage --- federationsender/consumers/roomserver.go | 4 +- federationsender/consumers/typingserver.go | 4 +- .../{ => postgres}/joined_hosts_table.go | 2 +- .../storage/{ => postgres}/room_table.go | 2 +- federationsender/storage/postgres/storage.go | 121 ++++++++++++++++++ federationsender/storage/storage.go | 120 +++-------------- 6 files changed, 142 insertions(+), 111 deletions(-) rename federationsender/storage/{ => postgres}/joined_hosts_table.go (99%) rename federationsender/storage/{ => postgres}/room_table.go (99%) create mode 100644 federationsender/storage/postgres/storage.go diff --git a/federationsender/consumers/roomserver.go b/federationsender/consumers/roomserver.go index 3ba978b1d..4568f44dc 100644 --- a/federationsender/consumers/roomserver.go +++ b/federationsender/consumers/roomserver.go @@ -33,7 +33,7 @@ import ( // OutputRoomEventConsumer consumes events that originated in the room server. type OutputRoomEventConsumer struct { roomServerConsumer *common.ContinualConsumer - db *storage.Database + db storage.Database queues *queue.OutgoingQueues query api.RoomserverQueryAPI } @@ -43,7 +43,7 @@ func NewOutputRoomEventConsumer( cfg *config.Dendrite, kafkaConsumer sarama.Consumer, queues *queue.OutgoingQueues, - store *storage.Database, + store storage.Database, queryAPI api.RoomserverQueryAPI, ) *OutputRoomEventConsumer { consumer := common.ContinualConsumer{ diff --git a/federationsender/consumers/typingserver.go b/federationsender/consumers/typingserver.go index c4cd0e599..590fcb257 100644 --- a/federationsender/consumers/typingserver.go +++ b/federationsender/consumers/typingserver.go @@ -29,7 +29,7 @@ import ( // OutputTypingEventConsumer consumes events that originate in typing server. type OutputTypingEventConsumer struct { consumer *common.ContinualConsumer - db *storage.Database + db storage.Database queues *queue.OutgoingQueues ServerName gomatrixserverlib.ServerName } @@ -39,7 +39,7 @@ func NewOutputTypingEventConsumer( cfg *config.Dendrite, kafkaConsumer sarama.Consumer, queues *queue.OutgoingQueues, - store *storage.Database, + store storage.Database, ) *OutputTypingEventConsumer { consumer := common.ContinualConsumer{ Topic: string(cfg.Kafka.Topics.OutputTypingEvent), diff --git a/federationsender/storage/joined_hosts_table.go b/federationsender/storage/postgres/joined_hosts_table.go similarity index 99% rename from federationsender/storage/joined_hosts_table.go rename to federationsender/storage/postgres/joined_hosts_table.go index 5d652a1a1..692461fb3 100644 --- a/federationsender/storage/joined_hosts_table.go +++ b/federationsender/storage/postgres/joined_hosts_table.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package storage +package postgres import ( "context" diff --git a/federationsender/storage/room_table.go b/federationsender/storage/postgres/room_table.go similarity index 99% rename from federationsender/storage/room_table.go rename to federationsender/storage/postgres/room_table.go index bb52b7076..e09ba2edd 100644 --- a/federationsender/storage/room_table.go +++ b/federationsender/storage/postgres/room_table.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package storage +package postgres import ( "context" diff --git a/federationsender/storage/postgres/storage.go b/federationsender/storage/postgres/storage.go new file mode 100644 index 000000000..6f659e127 --- /dev/null +++ b/federationsender/storage/postgres/storage.go @@ -0,0 +1,121 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package postgres + +import ( + "context" + "database/sql" + + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/federationsender/types" +) + +// Database stores information needed by the federation sender +type Database struct { + joinedHostsStatements + roomStatements + common.PartitionOffsetStatements + db *sql.DB +} + +// NewDatabase opens a new database +func NewDatabase(dataSourceName string) (*Database, error) { + var result Database + var err error + if result.db, err = sql.Open("postgres", dataSourceName); err != nil { + return nil, err + } + if err = result.prepare(); err != nil { + return nil, err + } + return &result, nil +} + +func (d *Database) prepare() error { + var err error + + if err = d.joinedHostsStatements.prepare(d.db); err != nil { + return err + } + + if err = d.roomStatements.prepare(d.db); err != nil { + return err + } + + return d.PartitionOffsetStatements.Prepare(d.db, "federationsender") +} + +// UpdateRoom updates the joined hosts for a room and returns what the joined +// hosts were before the update, or nil if this was a duplicate message. +// This is called when we receive a message from kafka, so we pass in +// oldEventID and newEventID to check that we haven't missed any messages or +// this isn't a duplicate message. +func (d *Database) UpdateRoom( + ctx context.Context, + roomID, oldEventID, newEventID string, + addHosts []types.JoinedHost, + removeHosts []string, +) (joinedHosts []types.JoinedHost, err error) { + err = common.WithTransaction(d.db, func(txn *sql.Tx) error { + err = d.insertRoom(ctx, txn, roomID) + if err != nil { + return err + } + + lastSentEventID, err := d.selectRoomForUpdate(ctx, txn, roomID) + if err != nil { + return err + } + + if lastSentEventID == newEventID { + // We've handled this message before, so let's just ignore it. + // We can only get a duplicate for the last message we processed, + // so its enough just to compare the newEventID with lastSentEventID + return nil + } + + if lastSentEventID != oldEventID { + return types.EventIDMismatchError{ + DatabaseID: lastSentEventID, RoomServerID: oldEventID, + } + } + + joinedHosts, err = d.selectJoinedHostsWithTx(ctx, txn, roomID) + if err != nil { + return err + } + + for _, add := range addHosts { + err = d.insertJoinedHosts(ctx, txn, roomID, add.MemberEventID, add.ServerName) + if err != nil { + return err + } + } + if err = d.deleteJoinedHosts(ctx, txn, removeHosts); err != nil { + return err + } + return d.updateRoom(ctx, txn, roomID, newEventID) + }) + return +} + +// GetJoinedHosts returns the currently joined hosts for room, +// as known to federationserver. +// Returns an error if something goes wrong. +func (d *Database) GetJoinedHosts( + ctx context.Context, roomID string, +) ([]types.JoinedHost, error) { + return d.selectJoinedHosts(ctx, roomID) +} diff --git a/federationsender/storage/storage.go b/federationsender/storage/storage.go index 3a0f87752..bafb5226a 100644 --- a/federationsender/storage/storage.go +++ b/federationsender/storage/storage.go @@ -1,121 +1,31 @@ -// Copyright 2017 Vector Creations Ltd -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - package storage import ( "context" - "database/sql" + "errors" + "net/url" "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/federationsender/storage/postgres" "github.com/matrix-org/dendrite/federationsender/types" ) -// Database stores information needed by the federation sender -type Database struct { - joinedHostsStatements - roomStatements - common.PartitionOffsetStatements - db *sql.DB +type Database interface { + common.PartitionStorer + UpdateRoom(ctx context.Context, roomID, oldEventID, newEventID string, addHosts []types.JoinedHost, removeHosts []string) (joinedHosts []types.JoinedHost, err error) + GetJoinedHosts(ctx context.Context, roomID string) ([]types.JoinedHost, error) } // NewDatabase opens a new database -func NewDatabase(dataSourceName string) (*Database, error) { - var result Database - var err error - if result.db, err = sql.Open("postgres", dataSourceName); err != nil { +func NewDatabase(dataSourceName string) (Database, error) { + uri, err := url.Parse(dataSourceName) + if err != nil { return nil, err } - if err = result.prepare(); err != nil { - return nil, err + switch uri.Scheme { + case "postgres": + return postgres.NewDatabase(dataSourceName) + default: + return nil, errors.New("unknown schema") } - return &result, nil -} - -func (d *Database) prepare() error { - var err error - - if err = d.joinedHostsStatements.prepare(d.db); err != nil { - return err - } - - if err = d.roomStatements.prepare(d.db); err != nil { - return err - } - - return d.PartitionOffsetStatements.Prepare(d.db, "federationsender") -} - -// UpdateRoom updates the joined hosts for a room and returns what the joined -// hosts were before the update, or nil if this was a duplicate message. -// This is called when we receive a message from kafka, so we pass in -// oldEventID and newEventID to check that we haven't missed any messages or -// this isn't a duplicate message. -func (d *Database) UpdateRoom( - ctx context.Context, - roomID, oldEventID, newEventID string, - addHosts []types.JoinedHost, - removeHosts []string, -) (joinedHosts []types.JoinedHost, err error) { - err = common.WithTransaction(d.db, func(txn *sql.Tx) error { - err = d.insertRoom(ctx, txn, roomID) - if err != nil { - return err - } - - lastSentEventID, err := d.selectRoomForUpdate(ctx, txn, roomID) - if err != nil { - return err - } - - if lastSentEventID == newEventID { - // We've handled this message before, so let's just ignore it. - // We can only get a duplicate for the last message we processed, - // so its enough just to compare the newEventID with lastSentEventID - return nil - } - - if lastSentEventID != oldEventID { - return types.EventIDMismatchError{ - DatabaseID: lastSentEventID, RoomServerID: oldEventID, - } - } - - joinedHosts, err = d.selectJoinedHostsWithTx(ctx, txn, roomID) - if err != nil { - return err - } - - for _, add := range addHosts { - err = d.insertJoinedHosts(ctx, txn, roomID, add.MemberEventID, add.ServerName) - if err != nil { - return err - } - } - if err = d.deleteJoinedHosts(ctx, txn, removeHosts); err != nil { - return err - } - return d.updateRoom(ctx, txn, roomID, newEventID) - }) - return -} - -// GetJoinedHosts returns the currently joined hosts for room, -// as known to federationserver. -// Returns an error if something goes wrong. -func (d *Database) GetJoinedHosts( - ctx context.Context, roomID string, -) ([]types.JoinedHost, error) { - return d.selectJoinedHosts(ctx, roomID) }