From dc9e1e7060690530b721d15c5edf49476e0b679e Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Fri, 3 Jan 2020 11:29:51 +0000 Subject: [PATCH] Implement interfaces for syncapi storage --- syncapi/consumers/clientapi.go | 4 +- syncapi/consumers/roomserver.go | 4 +- syncapi/consumers/typingserver.go | 4 +- syncapi/routing/routing.go | 2 +- syncapi/routing/state.go | 4 +- .../{ => postgres}/account_data_table.go | 2 +- .../current_room_state_table.go | 2 +- syncapi/storage/{ => postgres}/filtering.go | 2 +- .../storage/{ => postgres}/invites_table.go | 2 +- .../output_room_events_table.go | 2 +- syncapi/storage/{ => postgres}/syncserver.go | 2 +- syncapi/storage/storage.go | 63 +++++++++++++++++++ syncapi/sync/notifier.go | 2 +- syncapi/sync/requestpool.go | 4 +- 14 files changed, 81 insertions(+), 18 deletions(-) rename syncapi/storage/{ => postgres}/account_data_table.go (99%) rename syncapi/storage/{ => postgres}/current_room_state_table.go (99%) rename syncapi/storage/{ => postgres}/filtering.go (98%) rename syncapi/storage/{ => postgres}/invites_table.go (99%) rename syncapi/storage/{ => postgres}/output_room_events_table.go (99%) rename syncapi/storage/{ => postgres}/syncserver.go (99%) create mode 100644 syncapi/storage/storage.go diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go index f0db56427..ed39cd2d0 100644 --- a/syncapi/consumers/clientapi.go +++ b/syncapi/consumers/clientapi.go @@ -30,7 +30,7 @@ import ( // OutputClientDataConsumer consumes events that originated in the client API server. type OutputClientDataConsumer struct { clientAPIConsumer *common.ContinualConsumer - db *storage.SyncServerDatasource + db storage.Database notifier *sync.Notifier } @@ -39,7 +39,7 @@ func NewOutputClientDataConsumer( cfg *config.Dendrite, kafkaConsumer sarama.Consumer, n *sync.Notifier, - store *storage.SyncServerDatasource, + store storage.Database, ) *OutputClientDataConsumer { consumer := common.ContinualConsumer{ diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index e4f1ab460..cde2f5080 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -33,7 +33,7 @@ import ( // OutputRoomEventConsumer consumes events that originated in the room server. type OutputRoomEventConsumer struct { roomServerConsumer *common.ContinualConsumer - db *storage.SyncServerDatasource + db storage.Database notifier *sync.Notifier query api.RoomserverQueryAPI } @@ -43,7 +43,7 @@ func NewOutputRoomEventConsumer( cfg *config.Dendrite, kafkaConsumer sarama.Consumer, n *sync.Notifier, - store *storage.SyncServerDatasource, + store storage.Database, queryAPI api.RoomserverQueryAPI, ) *OutputRoomEventConsumer { diff --git a/syncapi/consumers/typingserver.go b/syncapi/consumers/typingserver.go index 5d998a18a..392f79872 100644 --- a/syncapi/consumers/typingserver.go +++ b/syncapi/consumers/typingserver.go @@ -30,7 +30,7 @@ import ( // OutputTypingEventConsumer consumes events that originated in the typing server. type OutputTypingEventConsumer struct { typingConsumer *common.ContinualConsumer - db *storage.SyncServerDatasource + db storage.Database notifier *sync.Notifier } @@ -40,7 +40,7 @@ func NewOutputTypingEventConsumer( cfg *config.Dendrite, kafkaConsumer sarama.Consumer, n *sync.Notifier, - store *storage.SyncServerDatasource, + store storage.Database, ) *OutputTypingEventConsumer { consumer := common.ContinualConsumer{ diff --git a/syncapi/routing/routing.go b/syncapi/routing/routing.go index 0f5019fc3..bd9389bdd 100644 --- a/syncapi/routing/routing.go +++ b/syncapi/routing/routing.go @@ -34,7 +34,7 @@ const pathPrefixR0 = "/_matrix/client/r0" // Due to Setup being used to call many other functions, a gocyclo nolint is // applied: // nolint: gocyclo -func Setup(apiMux *mux.Router, srp *sync.RequestPool, syncDB *storage.SyncServerDatasource, deviceDB *devices.Database) { +func Setup(apiMux *mux.Router, srp *sync.RequestPool, syncDB storage.Database, deviceDB *devices.Database) { r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter() authData := auth.Data{ diff --git a/syncapi/routing/state.go b/syncapi/routing/state.go index 87a93d194..61eaf218a 100644 --- a/syncapi/routing/state.go +++ b/syncapi/routing/state.go @@ -40,7 +40,7 @@ type stateEventInStateResp struct { // TODO: Check if the user is in the room. If not, check if the room's history // is publicly visible. Current behaviour is returning an empty array if the // user cannot see the room's history. -func OnIncomingStateRequest(req *http.Request, db *storage.SyncServerDatasource, roomID string) util.JSONResponse { +func OnIncomingStateRequest(req *http.Request, db storage.Database, roomID string) util.JSONResponse { // TODO(#287): Auth request and handle the case where the user has left (where // we should return the state at the poin they left) @@ -87,7 +87,7 @@ func OnIncomingStateRequest(req *http.Request, db *storage.SyncServerDatasource, // /rooms/{roomID}/state/{type}/{statekey} request. It will look in current // state to see if there is an event with that type and state key, if there // is then (by default) we return the content, otherwise a 404. -func OnIncomingStateTypeRequest(req *http.Request, db *storage.SyncServerDatasource, roomID string, evType, stateKey string) util.JSONResponse { +func OnIncomingStateTypeRequest(req *http.Request, db storage.Database, roomID string, evType, stateKey string) util.JSONResponse { // TODO(#287): Auth request and handle the case where the user has left (where // we should return the state at the poin they left) diff --git a/syncapi/storage/account_data_table.go b/syncapi/storage/postgres/account_data_table.go similarity index 99% rename from syncapi/storage/account_data_table.go rename to syncapi/storage/postgres/account_data_table.go index 7b4803e3d..c8ce35fdf 100644 --- a/syncapi/storage/account_data_table.go +++ b/syncapi/storage/postgres/account_data_table.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package storage +package postgres import ( "context" diff --git a/syncapi/storage/current_room_state_table.go b/syncapi/storage/postgres/current_room_state_table.go similarity index 99% rename from syncapi/storage/current_room_state_table.go rename to syncapi/storage/postgres/current_room_state_table.go index 1ab70879a..61baa85f3 100644 --- a/syncapi/storage/current_room_state_table.go +++ b/syncapi/storage/postgres/current_room_state_table.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package storage +package postgres import ( "context" diff --git a/syncapi/storage/filtering.go b/syncapi/storage/postgres/filtering.go similarity index 98% rename from syncapi/storage/filtering.go rename to syncapi/storage/postgres/filtering.go index 27b0b888a..dcc421362 100644 --- a/syncapi/storage/filtering.go +++ b/syncapi/storage/postgres/filtering.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package storage +package postgres import ( "strings" diff --git a/syncapi/storage/invites_table.go b/syncapi/storage/postgres/invites_table.go similarity index 99% rename from syncapi/storage/invites_table.go rename to syncapi/storage/postgres/invites_table.go index 9f52087f6..30193b911 100644 --- a/syncapi/storage/invites_table.go +++ b/syncapi/storage/postgres/invites_table.go @@ -1,4 +1,4 @@ -package storage +package postgres import ( "context" diff --git a/syncapi/storage/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go similarity index 99% rename from syncapi/storage/output_room_events_table.go rename to syncapi/storage/postgres/output_room_events_table.go index e1803a17d..0cf9d9390 100644 --- a/syncapi/storage/output_room_events_table.go +++ b/syncapi/storage/postgres/output_room_events_table.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package storage +package postgres import ( "context" diff --git a/syncapi/storage/syncserver.go b/syncapi/storage/postgres/syncserver.go similarity index 99% rename from syncapi/storage/syncserver.go rename to syncapi/storage/postgres/syncserver.go index cda44d2e3..3b6f6ba76 100644 --- a/syncapi/storage/syncserver.go +++ b/syncapi/storage/postgres/syncserver.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package storage +package postgres import ( "context" diff --git a/syncapi/storage/storage.go b/syncapi/storage/storage.go new file mode 100644 index 000000000..38606f0f3 --- /dev/null +++ b/syncapi/storage/storage.go @@ -0,0 +1,63 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + "errors" + "net/url" + "time" + + "github.com/matrix-org/dendrite/clientapi/auth/authtypes" + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/syncapi/storage/postgres" + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/dendrite/typingserver/cache" + "github.com/matrix-org/gomatrixserverlib" +) + +type Database interface { + common.PartitionStorer + AllJoinedUsersInRooms(ctx context.Context) (map[string][]string, error) + Events(ctx context.Context, eventIDs []string) ([]gomatrixserverlib.Event, error) + WriteEvent(ctx context.Context, ev *gomatrixserverlib.Event, addStateEvents []gomatrixserverlib.Event, addStateEventIDs, removeStateEventIDs []string, transactionID *api.TransactionID) (pduPosition int64, returnErr error) + GetStateEvent(ctx context.Context, roomID, evType, stateKey string) (*gomatrixserverlib.Event, error) + GetStateEventsForRoom(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.FilterPart) (stateEvents []gomatrixserverlib.Event, err error) + SyncPosition(ctx context.Context) (types.SyncPosition, error) + IncrementalSync(ctx context.Context, device authtypes.Device, fromPos, toPos types.SyncPosition, numRecentEventsPerRoom int, wantFullState bool) (*types.Response, error) + CompleteSync(ctx context.Context, userID string, numRecentEventsPerRoom int) (*types.Response, error) + GetAccountDataInRange(ctx context.Context, userID string, oldPos, newPos int64, accountDataFilterPart *gomatrixserverlib.FilterPart) (map[string][]string, error) + UpsertAccountData(ctx context.Context, userID, roomID, dataType string) (int64, error) + AddInviteEvent(ctx context.Context, inviteEvent gomatrixserverlib.Event) (int64, error) + RetireInviteEvent(ctx context.Context, inviteEventID string) error + SetTypingTimeoutCallback(fn cache.TimeoutCallbackFn) + AddTypingUser(userID, roomID string, expireTime *time.Time) int64 + RemoveTypingUser(userID, roomID string) int64 +} + +// NewPublicRoomsServerDatabase opens a database connection. +func NewSyncServerDatasource(dataSourceName string) (Database, error) { + uri, err := url.Parse(dataSourceName) + if err != nil { + return nil, err + } + switch uri.Scheme { + case "postgres": + return postgres.NewSyncServerDatasource(dataSourceName) + default: + return nil, errors.New("unknown schema") + } +} diff --git a/syncapi/sync/notifier.go b/syncapi/sync/notifier.go index 15d6b070c..548a17acd 100644 --- a/syncapi/sync/notifier.go +++ b/syncapi/sync/notifier.go @@ -141,7 +141,7 @@ func (n *Notifier) GetListener(req syncRequest) UserStreamListener { } // Load the membership states required to notify users correctly. -func (n *Notifier) Load(ctx context.Context, db *storage.SyncServerDatasource) error { +func (n *Notifier) Load(ctx context.Context, db storage.Database) error { roomToUsers, err := db.AllJoinedUsersInRooms(ctx) if err != nil { return err diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index 94a369001..82505e681 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -31,13 +31,13 @@ import ( // RequestPool manages HTTP long-poll connections for /sync type RequestPool struct { - db *storage.SyncServerDatasource + db storage.Database accountDB *accounts.Database notifier *Notifier } // NewRequestPool makes a new RequestPool -func NewRequestPool(db *storage.SyncServerDatasource, n *Notifier, adb *accounts.Database) *RequestPool { +func NewRequestPool(db storage.Database, n *Notifier, adb *accounts.Database) *RequestPool { return &RequestPool{db, adb, n} }