From e3bbeae5a25216322662a6a177dc50678a5a48ee Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Thu, 5 Mar 2020 14:18:29 +0000 Subject: [PATCH] Remove lib/pq for wasm for syncapi --- syncapi/storage/interface.go | 39 +++++++++++++++++ .../sqlite3/current_room_state_table.go | 9 ++-- .../sqlite3/output_room_events_table.go | 42 ++++++++++++++----- syncapi/storage/sqlite3/syncserver.go | 3 +- syncapi/storage/storage.go | 36 +--------------- syncapi/storage/storage_wasm.go | 38 +++++++++++++++++ 6 files changed, 116 insertions(+), 51 deletions(-) create mode 100644 syncapi/storage/interface.go create mode 100644 syncapi/storage/storage_wasm.go diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go new file mode 100644 index 000000000..8b031b7e7 --- /dev/null +++ b/syncapi/storage/interface.go @@ -0,0 +1,39 @@ +package storage + +import ( + "context" + "time" + + "github.com/matrix-org/dendrite/clientapi/auth/authtypes" + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/dendrite/typingserver/cache" + "github.com/matrix-org/gomatrixserverlib" +) + +type Database interface { + common.PartitionStorer + AllJoinedUsersInRooms(ctx context.Context) (map[string][]string, error) + Events(ctx context.Context, eventIDs []string) ([]gomatrixserverlib.Event, error) + WriteEvent(context.Context, *gomatrixserverlib.Event, []gomatrixserverlib.Event, []string, []string, *api.TransactionID, bool) (types.StreamPosition, error) + GetStateEvent(ctx context.Context, roomID, evType, stateKey string) (*gomatrixserverlib.Event, error) + GetStateEventsForRoom(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter) (stateEvents []gomatrixserverlib.Event, err error) + SyncPosition(ctx context.Context) (types.PaginationToken, error) + IncrementalSync(ctx context.Context, device authtypes.Device, fromPos, toPos types.PaginationToken, numRecentEventsPerRoom int, wantFullState bool) (*types.Response, error) + CompleteSync(ctx context.Context, userID string, numRecentEventsPerRoom int) (*types.Response, error) + GetAccountDataInRange(ctx context.Context, userID string, oldPos, newPos types.StreamPosition, accountDataFilterPart *gomatrixserverlib.EventFilter) (map[string][]string, error) + UpsertAccountData(ctx context.Context, userID, roomID, dataType string) (types.StreamPosition, error) + AddInviteEvent(ctx context.Context, inviteEvent gomatrixserverlib.Event) (types.StreamPosition, error) + RetireInviteEvent(ctx context.Context, inviteEventID string) error + SetTypingTimeoutCallback(fn cache.TimeoutCallbackFn) + AddTypingUser(userID, roomID string, expireTime *time.Time) types.StreamPosition + RemoveTypingUser(userID, roomID string) types.StreamPosition + GetEventsInRange(ctx context.Context, from, to *types.PaginationToken, roomID string, limit int, backwardOrdering bool) (events []types.StreamEvent, err error) + EventPositionInTopology(ctx context.Context, eventID string) (types.StreamPosition, error) + EventsAtTopologicalPosition(ctx context.Context, roomID string, pos types.StreamPosition) ([]types.StreamEvent, error) + BackwardExtremitiesForRoom(ctx context.Context, roomID string) (backwardExtremities []string, err error) + MaxTopologicalPosition(ctx context.Context, roomID string) (types.StreamPosition, error) + StreamEventsToEvents(device *authtypes.Device, in []types.StreamEvent) []gomatrixserverlib.Event + SyncStreamPosition(ctx context.Context) (types.StreamPosition, error) +} diff --git a/syncapi/storage/sqlite3/current_room_state_table.go b/syncapi/storage/sqlite3/current_room_state_table.go index eb969c956..ed76177be 100644 --- a/syncapi/storage/sqlite3/current_room_state_table.go +++ b/syncapi/storage/sqlite3/current_room_state_table.go @@ -21,7 +21,6 @@ import ( "encoding/json" "strings" - "github.com/lib/pq" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" @@ -175,10 +174,10 @@ func (s *currentRoomStateStatements) selectCurrentState( ) ([]gomatrixserverlib.Event, error) { stmt := common.TxStmt(txn, s.selectCurrentStateStmt) rows, err := stmt.QueryContext(ctx, roomID, - pq.StringArray(stateFilterPart.Senders), - pq.StringArray(stateFilterPart.NotSenders), - pq.StringArray(filterConvertTypeWildcardToSQL(stateFilterPart.Types)), - pq.StringArray(filterConvertTypeWildcardToSQL(stateFilterPart.NotTypes)), + nil, // FIXME: pq.StringArray(stateFilterPart.Senders), + nil, // FIXME: pq.StringArray(stateFilterPart.NotSenders), + nil, // FIXME: pq.StringArray(filterConvertTypeWildcardToSQL(stateFilterPart.Types)), + nil, // FIXME: pq.StringArray(filterConvertTypeWildcardToSQL(stateFilterPart.NotTypes)), stateFilterPart.ContainsURL, stateFilterPart.Limit, ) diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go index 5d5c7b11b..a1cb9d2dc 100644 --- a/syncapi/storage/sqlite3/output_room_events_table.go +++ b/syncapi/storage/sqlite3/output_room_events_table.go @@ -24,7 +24,6 @@ import ( "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/types" - "github.com/lib/pq" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/gomatrixserverlib" log "github.com/sirupsen/logrus" @@ -40,8 +39,8 @@ CREATE TABLE IF NOT EXISTS syncapi_output_room_events ( type TEXT NOT NULL, sender TEXT NOT NULL, contains_url BOOL NOT NULL, - add_state_ids TEXT[], - remove_state_ids TEXT[], + add_state_ids TEXT, -- JSON encoded string array + remove_state_ids TEXT, -- JSON encoded string array session_id BIGINT, transaction_id TEXT, exclude_from_sync BOOL NOT NULL DEFAULT FALSE @@ -176,20 +175,34 @@ func (s *outputRoomEventsStatements) selectStateInRange( streamPos types.StreamPosition eventBytes []byte excludeFromSync bool - addIDs pq.StringArray - delIDs pq.StringArray + addIDsJSON string + delIDsJSON string ) - if err := rows.Scan(&streamPos, &eventBytes, &excludeFromSync, &addIDs, &delIDs); err != nil { + if err := rows.Scan(&streamPos, &eventBytes, &excludeFromSync, &addIDsJSON, &delIDsJSON); err != nil { return nil, nil, err } + + var addIDs []string + var delIDs []string + + if len(addIDsJSON) > 0 { + if err := json.Unmarshal([]byte(addIDsJSON), &addIDs); err != nil { + return nil, nil, err + } + } + if len(delIDsJSON) > 0 { + if err := json.Unmarshal([]byte(delIDsJSON), &delIDs); err != nil { + return nil, nil, err + } + } // Sanity check for deleted state and whine if we see it. We don't need to do anything // since it'll just mark the event as not being needed. if len(addIDs) < len(delIDs) { log.WithFields(log.Fields{ "since": oldPos, "current": newPos, - "adds": addIDs, - "dels": delIDs, + "adds": addIDsJSON, + "dels": delIDsJSON, }).Warn("StateBetween: ignoring deleted state") } @@ -262,6 +275,15 @@ func (s *outputRoomEventsStatements) insertEvent( return } + addStateJSON, err := json.Marshal(addState) + if err != nil { + return + } + removeStateJSON, err := json.Marshal(removeState) + if err != nil { + return + } + insertStmt := common.TxStmt(txn, s.insertEventStmt) _, err = insertStmt.ExecContext( ctx, @@ -272,8 +294,8 @@ func (s *outputRoomEventsStatements) insertEvent( event.Type(), event.Sender(), containsURL, - pq.StringArray(addState), - pq.StringArray(removeState), + string(addStateJSON), + string(removeStateJSON), sessionID, txnID, excludeFromSync, diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go index 91425db3b..37c292995 100644 --- a/syncapi/storage/sqlite3/syncserver.go +++ b/syncapi/storage/sqlite3/syncserver.go @@ -29,8 +29,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/roomserver/api" - // Import the postgres database driver. - _ "github.com/lib/pq" + // Import the sqlite3 package _ "github.com/mattn/go-sqlite3" "github.com/matrix-org/dendrite/common" diff --git a/syncapi/storage/storage.go b/syncapi/storage/storage.go index c87024b29..c56db0635 100644 --- a/syncapi/storage/storage.go +++ b/syncapi/storage/storage.go @@ -12,49 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. +// +build !wasm + package storage import ( - "context" "net/url" - "time" - "github.com/matrix-org/dendrite/clientapi/auth/authtypes" - "github.com/matrix-org/dendrite/common" - "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/storage/postgres" "github.com/matrix-org/dendrite/syncapi/storage/sqlite3" - "github.com/matrix-org/dendrite/syncapi/types" - "github.com/matrix-org/dendrite/typingserver/cache" - "github.com/matrix-org/gomatrixserverlib" ) -type Database interface { - common.PartitionStorer - AllJoinedUsersInRooms(ctx context.Context) (map[string][]string, error) - Events(ctx context.Context, eventIDs []string) ([]gomatrixserverlib.Event, error) - WriteEvent(context.Context, *gomatrixserverlib.Event, []gomatrixserverlib.Event, []string, []string, *api.TransactionID, bool) (types.StreamPosition, error) - GetStateEvent(ctx context.Context, roomID, evType, stateKey string) (*gomatrixserverlib.Event, error) - GetStateEventsForRoom(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter) (stateEvents []gomatrixserverlib.Event, err error) - SyncPosition(ctx context.Context) (types.PaginationToken, error) - IncrementalSync(ctx context.Context, device authtypes.Device, fromPos, toPos types.PaginationToken, numRecentEventsPerRoom int, wantFullState bool) (*types.Response, error) - CompleteSync(ctx context.Context, userID string, numRecentEventsPerRoom int) (*types.Response, error) - GetAccountDataInRange(ctx context.Context, userID string, oldPos, newPos types.StreamPosition, accountDataFilterPart *gomatrixserverlib.EventFilter) (map[string][]string, error) - UpsertAccountData(ctx context.Context, userID, roomID, dataType string) (types.StreamPosition, error) - AddInviteEvent(ctx context.Context, inviteEvent gomatrixserverlib.Event) (types.StreamPosition, error) - RetireInviteEvent(ctx context.Context, inviteEventID string) error - SetTypingTimeoutCallback(fn cache.TimeoutCallbackFn) - AddTypingUser(userID, roomID string, expireTime *time.Time) types.StreamPosition - RemoveTypingUser(userID, roomID string) types.StreamPosition - GetEventsInRange(ctx context.Context, from, to *types.PaginationToken, roomID string, limit int, backwardOrdering bool) (events []types.StreamEvent, err error) - EventPositionInTopology(ctx context.Context, eventID string) (types.StreamPosition, error) - EventsAtTopologicalPosition(ctx context.Context, roomID string, pos types.StreamPosition) ([]types.StreamEvent, error) - BackwardExtremitiesForRoom(ctx context.Context, roomID string) (backwardExtremities []string, err error) - MaxTopologicalPosition(ctx context.Context, roomID string) (types.StreamPosition, error) - StreamEventsToEvents(device *authtypes.Device, in []types.StreamEvent) []gomatrixserverlib.Event - SyncStreamPosition(ctx context.Context) (types.StreamPosition, error) -} - // NewPublicRoomsServerDatabase opens a database connection. func NewSyncServerDatasource(dataSourceName string) (Database, error) { uri, err := url.Parse(dataSourceName) diff --git a/syncapi/storage/storage_wasm.go b/syncapi/storage/storage_wasm.go new file mode 100644 index 000000000..43806a012 --- /dev/null +++ b/syncapi/storage/storage_wasm.go @@ -0,0 +1,38 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "fmt" + "net/url" + + "github.com/matrix-org/dendrite/syncapi/storage/sqlite3" +) + +// NewPublicRoomsServerDatabase opens a database connection. +func NewSyncServerDatasource(dataSourceName string) (Database, error) { + uri, err := url.Parse(dataSourceName) + if err != nil { + return nil, fmt.Errorf("Cannot use postgres implementation") + } + switch uri.Scheme { + case "postgres": + return nil, fmt.Errorf("Cannot use postgres implementation") + case "file": + return sqlite3.NewSyncServerDatasource(dataSourceName) + default: + return nil, fmt.Errorf("Cannot use postgres implementation") + } +}