From 151d52ce23678cbfe746fc365e9dda539e3b9c19 Mon Sep 17 00:00:00 2001 From: Piotr Kozimor Date: Wed, 17 Nov 2021 17:00:36 +0100 Subject: [PATCH] Add Pushserver component with Pushers API Co-authored-by: Tommie Gannert Co-authored-by: Dan Peleg --- go.mod | 1 + go.sum | 2 + pushserver/api/api.go | 53 ++++++++ pushserver/internal/api.go | 73 ++++++++++ pushserver/internal/api_test.go | 143 ++++++++++++++++++++ pushserver/inthttp/client.go | 70 ++++++++++ pushserver/inthttp/server.go | 44 ++++++ pushserver/pushserver.go | 38 ++++++ pushserver/storage/interface.go | 16 +++ pushserver/storage/postgres/storage.go | 62 +++++++++ pushserver/storage/shared/pusher_table.go | 157 ++++++++++++++++++++++ pushserver/storage/shared/storage.go | 79 +++++++++++ pushserver/storage/sqlite3/storage.go | 57 ++++++++ pushserver/storage/storage.go | 24 ++++ pushserver/storage/storage_test.go | 133 ++++++++++++++++++ pushserver/storage/storage_wasm.go | 20 +++ pushserver/storage/tables/interface.go | 25 ++++ 17 files changed, 997 insertions(+) create mode 100644 pushserver/api/api.go create mode 100644 pushserver/internal/api.go create mode 100644 pushserver/internal/api_test.go create mode 100644 pushserver/inthttp/client.go create mode 100644 pushserver/inthttp/server.go create mode 100644 pushserver/pushserver.go create mode 100644 pushserver/storage/interface.go create mode 100644 pushserver/storage/postgres/storage.go create mode 100644 pushserver/storage/shared/pusher_table.go create mode 100644 pushserver/storage/shared/storage.go create mode 100644 pushserver/storage/sqlite3/storage.go create mode 100644 pushserver/storage/storage.go create mode 100644 pushserver/storage/storage_test.go create mode 100644 pushserver/storage/storage_wasm.go create mode 100644 pushserver/storage/tables/interface.go diff --git a/go.mod b/go.mod index 9b970c334..0a15e04f1 100644 --- a/go.mod +++ b/go.mod @@ -35,6 +35,7 @@ require ( github.com/matrix-org/naffka v0.0.0-20210623111924-14ff508b58e0 github.com/matrix-org/pinecone v0.0.0-20211116111603-febf3501584d github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 + github.com/matryer/is v1.4.0 // indirect github.com/mattn/go-sqlite3 v1.14.8 github.com/morikuni/aec v1.0.0 // indirect github.com/neilalexander/utp v0.1.1-0.20210727203401-54ae7b1cd5f9 diff --git a/go.sum b/go.sum index 2c19e76f0..9f6f930d2 100644 --- a/go.sum +++ b/go.sum @@ -1002,6 +1002,8 @@ github.com/matrix-org/pinecone v0.0.0-20211116111603-febf3501584d/go.mod h1:r6ds github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U= github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 h1:eCEHXWDv9Rm335MSuB49mFUK44bwZPFSDde3ORE3syk= github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U= +github.com/matryer/is v1.4.0 h1:sosSmIWwkYITGrxZ25ULNDeKiMNzFSr4V/eqBQP0PeE= +github.com/matryer/is v1.4.0/go.mod h1:8I/i5uYgLzgsgEloJE1U6xx5HkBQpAZvepWuujKwMRU= github.com/mattn/go-colorable v0.0.6/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ= diff --git a/pushserver/api/api.go b/pushserver/api/api.go new file mode 100644 index 000000000..b4aa8235d --- /dev/null +++ b/pushserver/api/api.go @@ -0,0 +1,53 @@ +package api + +import ( + "context" + + "github.com/matrix-org/gomatrixserverlib" +) + +type PushserverInternalAPI interface { + PerformPusherSet(ctx context.Context, req *PerformPusherSetRequest, res *struct{}) error + PerformPusherDeletion(ctx context.Context, req *PerformPusherDeletionRequest, res *struct{}) error + QueryPushers(ctx context.Context, req *QueryPushersRequest, res *QueryPushersResponse) error +} + +type QueryPushersRequest struct { + Localpart string +} + +type QueryPushersResponse struct { + Pushers []Pusher `json:"pushers"` +} + +type PerformPusherSetRequest struct { + Pusher // Anonymous field because that's how clientapi unmarshals it. + Localpart string + Append bool `json:"append"` +} + +type PerformPusherDeletionRequest struct { + Localpart string + SessionID int64 +} + +// Pusher represents a push notification subscriber +type Pusher struct { + SessionID int64 `json:"session_id,omitempty"` + PushKey string `json:"pushkey"` + PushKeyTS gomatrixserverlib.Timestamp `json:"pushkey_ts,omitempty"` + Kind PusherKind `json:"kind"` + AppID string `json:"app_id"` + AppDisplayName string `json:"app_display_name"` + DeviceDisplayName string `json:"device_display_name"` + ProfileTag string `json:"profile_tag"` + Language string `json:"lang"` + Data map[string]interface{} `json:"data"` +} + +type PusherKind string + +const ( + EmailKind PusherKind = "email" + HTTPKind PusherKind = "http" +) diff --git a/pushserver/internal/api.go b/pushserver/internal/api.go new file mode 100644 index 000000000..183d2a54e --- /dev/null +++ b/pushserver/internal/api.go @@ -0,0 +1,73 @@ +package internal + +import ( + "context" + "time" + + "github.com/matrix-org/dendrite/pushserver/api" + "github.com/matrix-org/dendrite/pushserver/storage" + "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" + "github.com/sirupsen/logrus" +) + +// PushserverInternalAPI implements api.PushserverInternalAPI +type PushserverInternalAPI struct { + Cfg *config.PushServer + DB storage.Database +} + +func NewPushserverAPI( + cfg *config.PushServer, pushserverDB storage.Database, +) *PushserverInternalAPI { + a := &PushserverInternalAPI{ + Cfg: cfg, + DB: pushserverDB, + } + return a +} + +func (a *PushserverInternalAPI) PerformPusherSet(ctx context.Context, req *api.PerformPusherSetRequest, res *struct{}) error { + util.GetLogger(ctx).WithFields(logrus.Fields{ + "localpart": req.Localpart, + "pushkey": req.Pusher.PushKey, + "display_name": req.Pusher.AppDisplayName, + }).Info("PerformPusherCreation") + if !req.Append { + err := a.DB.RemovePushers(ctx, req.Pusher.AppID, req.Pusher.PushKey) + if err != nil { + return err + } + } + if req.Pusher.Kind == "" { + return a.DB.RemovePusher(ctx, req.Pusher.AppID, req.Pusher.PushKey, req.Localpart) + } + if req.Pusher.PushKeyTS == 0 { + req.Pusher.PushKeyTS = gomatrixserverlib.AsTimestamp(time.Now()) + } + return a.DB.CreatePusher(ctx, req.Pusher, req.Localpart) +} + +func (a *PushserverInternalAPI) PerformPusherDeletion(ctx context.Context, req *api.PerformPusherDeletionRequest, res *struct{}) error { + pushers, err := a.DB.GetPushers(ctx, req.Localpart) + if err != nil { + return err + } + for i := range pushers { + logrus.Warnf("pusher session: %d, req session: %d", pushers[i].SessionID, req.SessionID) + if pushers[i].SessionID != req.SessionID { + err := a.DB.RemovePusher(ctx, pushers[i].AppID, pushers[i].PushKey, req.Localpart) + if err != nil { + return err + } + } + } + return nil +} + +func (a *PushserverInternalAPI) QueryPushers(ctx context.Context, req *api.QueryPushersRequest, res *api.QueryPushersResponse) error { + var err error + res.Pushers, err = a.DB.GetPushers(ctx, req.Localpart) + return err +} diff --git a/pushserver/internal/api_test.go b/pushserver/internal/api_test.go new file mode 100644 index 000000000..aedb40905 --- /dev/null +++ b/pushserver/internal/api_test.go @@ -0,0 +1,143 @@ +package internal + +import ( + "context" + "math/rand" + "os" + "strconv" + "testing" + + "github.com/matrix-org/dendrite/pushserver/api" + "github.com/matrix-org/dendrite/pushserver/storage" + "github.com/matrix-org/dendrite/setup/config" + "github.com/matryer/is" +) + +var ( + ctx = context.Background() + localpart = "foo" + testPusher = api.Pusher{ + SessionID: 42984798792, + PushKey: "dc_GxbDa8El0pWKkDIM-rQ:APA91bHflmL6ycJMbLKX8VYLD-Ebft3t-SLQwIap-pDWP-evu1AWxsXxzyl1pgSZxDMn6OeznZsjXhTU0m5xz05dyJ4syX86S89uwxBwtbK-k0PHQt9wF8CgOcibm-OYZodpY5TtmknZ", + Kind: "http", + AppID: "com.example.app.ios", + AppDisplayName: "Mat Rix", + DeviceDisplayName: "iPhone 9", + ProfileTag: "xxyyzz", + Language: "pl", + Data: map[string]interface{}{ + "format": "event_id_only", + "url": "https://push-gateway.location.there/_matrix/push/v1/notify", + }, + } + testPusher2 = api.Pusher{ + SessionID: 42984798792, + PushKey: "dc_GxbDa8El0pWKkDIM-rQ:APA91bHflmL6ycJMbLKX8VYLD-Ebft3t-SLQwIap-pDWP-evu1AWxsXxzyl1pgSZxDMn6OeznZsjXhTU0m5xz05dyJ4syX86S89uwxBwtbK-k0PHQt9wF8CgOcibm-OYZodpY5TtmknZ---", + Kind: "http", + AppID: "com.example.app.ios", + AppDisplayName: "Mat Rix", + DeviceDisplayName: "iPhone 9", + ProfileTag: "xxyyzz", + Language: "pl", + Data: map[string]interface{}{ + "format": "event_id_only", + "url": "https://push-gateway.location.there/_matrix/push/v1/notify", + }, + } + nilPusher = api.Pusher{ + PushKey: "dc_GxbDa8El0pWKkDIM-rQ:APA91bHflmL6ycJMbLKX8VYLD-Ebft3t-SLQwIap-pDWP-evu1AWxsXxzyl1pgSZxDMn6OeznZsjXhTU0m5xz05dyJ4syX86S89uwxBwtbK-k0PHQt9wF8CgOcibm-OYZodpY5TtmknZ", + AppID: "com.example.app.ios", + } +) + +func TestPerformPusherSet(t *testing.T) { + is := is.New(t) + dut := mustNewPushserverAPI(is) + pushers := mustSetPushers(is, dut, testPusher) + is.Equal(len(pushers.Pushers), 1) + pushKeyTS := pushers.Pushers[0].PushKeyTS + is.True(pushKeyTS != 0) + pushers.Pushers[0].PushKeyTS = 0 + is.Equal(pushers.Pushers[0], testPusher) + pushers.Pushers[0].PushKeyTS = pushKeyTS + +} + +func TestPerformPusherSet_Append(t *testing.T) { + is := is.New(t) + dut := mustNewPushserverAPI(is) + mustSetPushers(is, dut, testPusher) + pushers := mustAppendPushers(is, dut, testPusher2) + is.Equal(len(pushers.Pushers), 2) + is.True(pushers.Pushers[1].PushKeyTS != 0) + pushers.Pushers[1].PushKeyTS = 0 + is.Equal(pushers.Pushers[1], testPusher2) +} + +func TestPerformPusherSet_Delete(t *testing.T) { + is := is.New(t) + dut := mustNewPushserverAPI(is) + mustSetPushers(is, dut, testPusher) + pushers := mustSetPushers(is, dut, nilPusher) + // pushers := mustAppendPushers(is, dut, testPusher2) + is.Equal(len(pushers.Pushers), 0) +} + +func TestPerformPusherSet_AppendDelete(t *testing.T) { + is := is.New(t) + dut := mustNewPushserverAPI(is) + mustSetPushers(is, dut, testPusher) + mustAppendPushers(is, dut, testPusher2) + pushers := mustAppendPushers(is, dut, nilPusher) + is.Equal(len(pushers.Pushers), 1) + is.True(pushers.Pushers[0].PushKeyTS != 0) + pushers.Pushers[0].PushKeyTS = 0 + is.Equal(pushers.Pushers[0], testPusher2) +} + +func mustNewPushserverAPI(is *is.I) api.PushserverInternalAPI { + db := mustNewDatabase(is) + return &PushserverInternalAPI{ + DB: db, + } +} + +func mustNewDatabase(is *is.I) storage.Database { + randPostfix := strconv.Itoa(rand.Int()) + dbPath := os.TempDir() + "/dendrite-" + randPostfix + dut, err := storage.Open(&config.DatabaseOptions{ + ConnectionString: config.DataSource("file:" + dbPath), + }) + is.NoErr(err) + return dut +} + +func mustSetPushers(is *is.I, dut api.PushserverInternalAPI, p api.Pusher) *api.QueryPushersResponse { + err := dut.PerformPusherSet(ctx, &api.PerformPusherSetRequest{ + Localpart: localpart, + Append: false, + Pusher: p, + }, &struct{}{}) + is.NoErr(err) + var pushers api.QueryPushersResponse + err = dut.QueryPushers(ctx, &api.QueryPushersRequest{ + Localpart: localpart, + }, &pushers) + is.NoErr(err) + return &pushers +} + +func mustAppendPushers(is *is.I, dut api.PushserverInternalAPI, p api.Pusher) *api.QueryPushersResponse { + err := dut.PerformPusherSet(ctx, &api.PerformPusherSetRequest{ + Localpart: localpart, + Append: true, + Pusher: p, + }, &struct{}{}) + is.NoErr(err) + var pushers api.QueryPushersResponse + err = dut.QueryPushers(ctx, &api.QueryPushersRequest{ + Localpart: localpart, + }, &pushers) + is.NoErr(err) + return &pushers +} diff --git a/pushserver/inthttp/client.go b/pushserver/inthttp/client.go new file mode 100644 index 000000000..47e05b100 --- /dev/null +++ b/pushserver/inthttp/client.go @@ -0,0 +1,70 @@ +package inthttp + +import ( + "context" + "errors" + "net/http" + + "github.com/matrix-org/dendrite/internal/httputil" + "github.com/matrix-org/dendrite/pushserver/api" + "github.com/opentracing/opentracing-go" +) + +type httpPushserverInternalAPI struct { + pushserverURL string + httpClient *http.Client +} + +const ( + QueryNotificationsPath = "/pushserver/queryNotifications" + + PerformPusherSetPath = "/pushserver/performPusherSet" + PerformPusherDeletionPath = "/pushserver/performPusherDeletion" + QueryPushersPath = "/pushserver/queryPushers" + + PerformPushRulesPutPath = "/pushserver/performPushRulesPut" + QueryPushRulesPath = "/pushserver/queryPushRules" +) + +// NewPushserverClient creates a PushserverInternalAPI implemented by talking to a HTTP POST API. +// If httpClient is nil an error is returned +func NewPushserverClient( + pushserverURL string, + httpClient *http.Client, +) (api.PushserverInternalAPI, error) { + if httpClient == nil { + return nil, errors.New("NewPushserverClient: httpClient is ") + } + return &httpPushserverInternalAPI{ + pushserverURL: pushserverURL, + httpClient: httpClient, + }, nil +} + +func (h *httpPushserverInternalAPI) PerformPusherSet( + ctx context.Context, + request *api.PerformPusherSetRequest, + response *struct{}, +) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "PerformPusherSet") + defer span.Finish() + + apiURL := h.pushserverURL + PerformPusherSetPath + return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) +} + +func (h *httpPushserverInternalAPI) PerformPusherDeletion(ctx context.Context, req *api.PerformPusherDeletionRequest, res *struct{}) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "PerformPusherDeletion") + defer span.Finish() + + apiURL := h.pushserverURL + PerformPusherDeletionPath + return httputil.PostJSON(ctx, span, h.httpClient, apiURL, req, res) +} + +func (h *httpPushserverInternalAPI) QueryPushers(ctx context.Context, req *api.QueryPushersRequest, res *api.QueryPushersResponse) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "QueryPushers") + defer span.Finish() + + apiURL := h.pushserverURL + QueryPushersPath + return httputil.PostJSON(ctx, span, h.httpClient, apiURL, req, res) +} diff --git a/pushserver/inthttp/server.go b/pushserver/inthttp/server.go new file mode 100644 index 000000000..876d71914 --- /dev/null +++ b/pushserver/inthttp/server.go @@ -0,0 +1,44 @@ +package inthttp + +import ( + "encoding/json" + "net/http" + + "github.com/gorilla/mux" + "github.com/matrix-org/dendrite/internal/httputil" + "github.com/matrix-org/dendrite/pushserver/api" + "github.com/matrix-org/util" +) + +// AddRoutes adds the PushserverInternalAPI handlers to the http.ServeMux. +// nolint: gocyclo +func AddRoutes(r api.PushserverInternalAPI, internalAPIMux *mux.Router) { + + internalAPIMux.Handle(PerformPusherSetPath, + httputil.MakeInternalAPI("performPusherSet", func(req *http.Request) util.JSONResponse { + request := api.PerformPusherSetRequest{} + response := struct{}{} + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.MessageResponse(http.StatusBadRequest, err.Error()) + } + if err := r.PerformPusherSet(req.Context(), &request, &response); err != nil { + return util.ErrorResponse(err) + } + return util.JSONResponse{Code: http.StatusOK, JSON: &response} + }), + ) + internalAPIMux.Handle(PerformPusherDeletionPath, + httputil.MakeInternalAPI("performPusherDeletion", func(req *http.Request) util.JSONResponse { + request := api.PerformPusherDeletionRequest{} + response := struct{}{} + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.MessageResponse(http.StatusBadRequest, err.Error()) + } + if err := r.PerformPusherDeletion(req.Context(), &request, &response); err != nil { + return util.ErrorResponse(err) + } + return util.JSONResponse{Code: http.StatusOK, JSON: &response} + }), + ) + +} diff --git a/pushserver/pushserver.go b/pushserver/pushserver.go new file mode 100644 index 000000000..9dc68d5d3 --- /dev/null +++ b/pushserver/pushserver.go @@ -0,0 +1,38 @@ +package pushserver + +import ( + "github.com/gorilla/mux" + "github.com/matrix-org/dendrite/pushserver/api" + "github.com/matrix-org/dendrite/pushserver/internal" + "github.com/matrix-org/dendrite/pushserver/inthttp" + "github.com/matrix-org/dendrite/pushserver/storage" + roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/setup" + "github.com/sirupsen/logrus" +) + +// AddInternalRoutes registers HTTP handlers for the internal API. Invokes functions +// on the given input API. +func AddInternalRoutes(router *mux.Router, intAPI api.PushserverInternalAPI) { + inthttp.AddRoutes(intAPI, router) +} + +// NewInternalAPI returns a concerete implementation of the internal API. Callers +// can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes. +func NewInternalAPI( + base *setup.BaseDendrite, + rsAPI roomserverAPI.RoomserverInternalAPI, +) api.PushserverInternalAPI { + cfg := &base.Cfg.PushServer + + db, err := storage.Open(&cfg.Database) + if err != nil { + logrus.WithError(err).Panicf("failed to connect to push server db") + } + + psAPI := internal.NewPushserverAPI( + cfg, db, + ) + + return psAPI +} diff --git a/pushserver/storage/interface.go b/pushserver/storage/interface.go new file mode 100644 index 000000000..dffebf4f1 --- /dev/null +++ b/pushserver/storage/interface.go @@ -0,0 +1,16 @@ +package storage + +import ( + "context" + + "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/pushserver/api" +) + +type Database interface { + internal.PartitionStorer + CreatePusher(ctx context.Context, pusher api.Pusher, localpart string) error + GetPushers(ctx context.Context, localpart string) ([]api.Pusher, error) + RemovePusher(ctx context.Context, appId, pushkey, localpart string) error + RemovePushers(ctx context.Context, appId, pushkey string) error +} diff --git a/pushserver/storage/postgres/storage.go b/pushserver/storage/postgres/storage.go new file mode 100644 index 000000000..40c99bfb5 --- /dev/null +++ b/pushserver/storage/postgres/storage.go @@ -0,0 +1,62 @@ +package postgres + +import ( + "database/sql" + "fmt" + + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/pushserver/storage/shared" + "github.com/matrix-org/dendrite/setup/config" +) + +type Database struct { + shared.Database + sqlutil.PartitionOffsetStatements +} + +func Open(dbProperties *config.DatabaseOptions) (*Database, error) { + var d Database + var err error + if d.DB, err = sqlutil.Open(dbProperties); err != nil { + return nil, fmt.Errorf("sqlutil.Open: %w", err) + } + d.Writer = sqlutil.NewDummyWriter() + + if err = d.PartitionOffsetStatements.Prepare(d.DB, d.Writer, "pushserver"); err != nil { + return nil, err + } + + if err = createNotificationsTable(d.DB); err != nil { + return nil, err + } + if err = shared.CreatePushersTable(d.DB); err != nil { + return nil, err + } + if err = d.Database.Prepare(); err != nil { + return nil, err + } + + return &d, nil +} + +func createNotificationsTable(db *sql.DB) error { + _, err := db.Exec(notificationsSchema) + return err +} + +const notificationsSchema = ` +CREATE TABLE IF NOT EXISTS pushserver_notifications ( + id BIGSERIAL PRIMARY KEY, + localpart TEXT NOT NULL, + room_id TEXT NOT NULL, + event_id TEXT NOT NULL, + ts_ms BIGINT NOT NULL, + highlight BOOLEAN NOT NULL, + notification_json TEXT NOT NULL, + read BOOLEAN NOT NULL DEFAULT FALSE +); + +CREATE INDEX IF NOT EXISTS notification_localpart_room_id_event_id_idx ON pushserver_notifications(localpart, room_id, event_id); +CREATE INDEX IF NOT EXISTS notification_localpart_room_id_id_idx ON pushserver_notifications(localpart, room_id, id); +CREATE INDEX IF NOT EXISTS notification_localpart_id_idx ON pushserver_notifications(localpart, id); +` diff --git a/pushserver/storage/shared/pusher_table.go b/pushserver/storage/shared/pusher_table.go new file mode 100644 index 000000000..747e03bac --- /dev/null +++ b/pushserver/storage/shared/pusher_table.go @@ -0,0 +1,157 @@ +// Copyright 2021 Dan Peleg +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package shared + +import ( + "context" + "database/sql" + "encoding/json" + + "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/pushserver/api" + "github.com/matrix-org/dendrite/pushserver/storage/tables" + "github.com/matrix-org/gomatrixserverlib" + "github.com/sirupsen/logrus" +) + +// See https://matrix.org/docs/spec/client_server/r0.6.1#get-matrix-client-r0-pushers +const pushersSchema = ` +CREATE TABLE IF NOT EXISTS pushserver_pushers ( + id SERIAL PRIMARY KEY, + -- The Matrix user ID localpart for this pusher + localpart TEXT NOT NULL, + session_id BIGINT DEFAULT NULL, + profile_tag TEXT, + kind TEXT NOT NULL, + app_id TEXT NOT NULL, + app_display_name TEXT NOT NULL, + device_display_name TEXT NOT NULL, + pushkey TEXT NOT NULL, + pushkey_ts_ms BIGINT NOT NULL DEFAULT 0, + lang TEXT NOT NULL, + data TEXT NOT NULL +); + +-- For faster deleting by app_id, pushkey pair. +CREATE INDEX IF NOT EXISTS pusher_app_id_pushkey_idx ON pushserver_pushers(app_id, pushkey); + +-- For faster retrieving by localpart. +CREATE INDEX IF NOT EXISTS pusher_localpart_idx ON pushserver_pushers(localpart); + +-- Pushkey must be unique for a given user and app. +CREATE UNIQUE INDEX IF NOT EXISTS pusher_app_id_pushkey_localpart_idx ON pushserver_pushers(app_id, pushkey, localpart); +` + +const insertPusherSQL = "" + + "INSERT INTO pushserver_pushers (localpart, session_id, pushkey, pushkey_ts_ms, kind, app_id, app_display_name, device_display_name, profile_tag, lang, data) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)" + +const selectPushersSQL = "" + + "SELECT session_id, pushkey, pushkey_ts_ms, kind, app_id, app_display_name, device_display_name, profile_tag, lang, data FROM pushserver_pushers WHERE localpart = $1" + +const deletePusherSQL = "" + + "DELETE FROM pushserver_pushers WHERE app_id = $1 AND pushkey = $2 AND localpart = $3" + +const deletePushersByAppIdAndPushKeySQL = "" + + "DELETE FROM pushserver_pushers WHERE app_id = $1 AND pushkey = $2" + +type pushersStatements struct { + insertPusherStmt *sql.Stmt + selectPushersStmt *sql.Stmt + deletePusherStmt *sql.Stmt + deletePushersByAppIdAndPushKeyStmt *sql.Stmt +} + +func CreatePushersTable(db *sql.DB) error { + _, err := db.Exec(pushersSchema) + return err +} + +func preparePushersTable(db *sql.DB) (tables.Pusher, error) { + s := &pushersStatements{} + + return s, sqlutil.StatementList{ + {&s.insertPusherStmt, insertPusherSQL}, + {&s.selectPushersStmt, selectPushersSQL}, + {&s.deletePusherStmt, deletePusherSQL}, + {&s.deletePushersByAppIdAndPushKeyStmt, deletePushersByAppIdAndPushKeySQL}, + }.Prepare(db) +} + +// insertPusher creates a new pusher. +// Returns an error if the user already has a pusher with the given pusher pushkey. +// Returns nil error success. +func (s *pushersStatements) InsertPusher( + ctx context.Context, session_id int64, + pushkey string, pushkeyTS gomatrixserverlib.Timestamp, kind api.PusherKind, appid, appdisplayname, devicedisplayname, profiletag, lang, data, localpart string, +) error { + _, err := s.insertPusherStmt.ExecContext(ctx, localpart, session_id, pushkey, pushkeyTS, kind, appid, appdisplayname, devicedisplayname, profiletag, lang, data) + logrus.Debugf("Created pusher %d", session_id) + return err +} + +func (s *pushersStatements) SelectPushers( + ctx context.Context, localpart string, +) ([]api.Pusher, error) { + pushers := []api.Pusher{} + rows, err := s.selectPushersStmt.QueryContext(ctx, localpart) + + if err != nil { + return pushers, err + } + defer internal.CloseAndLogIfError(ctx, rows, "SelectPushers: rows.close() failed") + + for rows.Next() { + var pusher api.Pusher + var data []byte + err = rows.Scan( + &pusher.SessionID, + &pusher.PushKey, + &pusher.PushKeyTS, + &pusher.Kind, + &pusher.AppID, + &pusher.AppDisplayName, + &pusher.DeviceDisplayName, + &pusher.ProfileTag, + &pusher.Language, + &data) + if err != nil { + return pushers, err + } + err := json.Unmarshal(data, &pusher.Data) + if err != nil { + return pushers, err + } + pushers = append(pushers, pusher) + } + + logrus.Debugf("Database returned %d pushers", len(pushers)) + return pushers, rows.Err() +} + +// deletePusher removes a single pusher by pushkey and user localpart. +func (s *pushersStatements) DeletePusher( + ctx context.Context, appid, pushkey, localpart string, +) error { + _, err := s.deletePusherStmt.ExecContext(ctx, appid, pushkey, localpart) + return err +} + +func (s *pushersStatements) DeletePushers( + ctx context.Context, appid, pushkey string, +) error { + _, err := s.deletePushersByAppIdAndPushKeyStmt.ExecContext(ctx, appid, pushkey) + return err +} diff --git a/pushserver/storage/shared/storage.go b/pushserver/storage/shared/storage.go new file mode 100644 index 000000000..3d11ed641 --- /dev/null +++ b/pushserver/storage/shared/storage.go @@ -0,0 +1,79 @@ +package shared + +import ( + "context" + "database/sql" + "encoding/json" + + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/pushserver/api" + "github.com/matrix-org/dendrite/pushserver/storage/tables" +) + +type Database struct { + DB *sql.DB + Writer sqlutil.Writer + pushers tables.Pusher +} + +func (d *Database) Prepare() (err error) { + d.pushers, err = preparePushersTable(d.DB) + return +} + +func (d *Database) CreatePusher( + ctx context.Context, p api.Pusher, localpart string, +) error { + data, err := json.Marshal(p.Data) + if err != nil { + return err + } + return d.Writer.Do(nil, nil, func(_ *sql.Tx) error { + return d.pushers.InsertPusher( + ctx, + p.SessionID, + p.PushKey, + p.PushKeyTS, + p.Kind, + p.AppID, + p.AppDisplayName, + p.DeviceDisplayName, + p.ProfileTag, + p.Language, + string(data), + localpart) + }) +} + +// GetPushers returns the pushers matching the given localpart. +func (d *Database) GetPushers( + ctx context.Context, localpart string, +) ([]api.Pusher, error) { + return d.pushers.SelectPushers(ctx, localpart) +} + +// RemovePusher deletes one pusher +// Invoked when `append` is true and `kind` is null in +// https://matrix.org/docs/spec/client_server/r0.6.1#post-matrix-client-r0-pushers-set +func (d *Database) RemovePusher( + ctx context.Context, appid, pushkey, localpart string, +) error { + return d.Writer.Do(nil, nil, func(_ *sql.Tx) error { + err := d.pushers.DeletePusher(ctx, appid, pushkey, localpart) + if err == sql.ErrNoRows { + return nil + } + return err + }) +} + +// RemovePushers deletes all pushers that match given App Id and Push Key pair. +// Invoked when `append` parameter is false in +// https://matrix.org/docs/spec/client_server/r0.6.1#post-matrix-client-r0-pushers-set +func (d *Database) RemovePushers( + ctx context.Context, appid, pushkey string, +) error { + return d.Writer.Do(nil, nil, func(_ *sql.Tx) error { + return d.pushers.DeletePushers(ctx, appid, pushkey) + }) +} diff --git a/pushserver/storage/sqlite3/storage.go b/pushserver/storage/sqlite3/storage.go new file mode 100644 index 000000000..d1137d822 --- /dev/null +++ b/pushserver/storage/sqlite3/storage.go @@ -0,0 +1,57 @@ +package sqlite3 + +import ( + "database/sql" + "fmt" + + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/pushserver/storage/shared" + "github.com/matrix-org/dendrite/setup/config" +) + +type Database struct { + shared.Database + sqlutil.PartitionOffsetStatements +} + +func Open(dbProperties *config.DatabaseOptions) (*Database, error) { + var d Database + var err error + if d.DB, err = sqlutil.Open(dbProperties); err != nil { + return nil, fmt.Errorf("sqlutil.Open: %w", err) + } + d.Writer = sqlutil.NewExclusiveWriter() + + if err = d.PartitionOffsetStatements.Prepare(d.DB, d.Writer, "pushserver"); err != nil { + return nil, err + } + + if err = createNotificationsTable(d.DB); err != nil { + return nil, err + } + if err = shared.CreatePushersTable(d.DB); err != nil { + return nil, err + } + if err = d.Database.Prepare(); err != nil { + return nil, err + } + + return &d, nil +} + +func createNotificationsTable(db *sql.DB) error { + _, err := db.Exec(notificationsSchema) + return err +} + +const notificationsSchema = ` +CREATE TABLE IF NOT EXISTS pushserver_notifications ( + id INTEGER PRIMARY KEY, + localpart TEXT NOT NULL, + room_id TEXT NOT NULL, + event_id TEXT NOT NULL, + ts_ms BIGINT NOT NULL, + highlight BOOLEAN NOT NULL, + notification_json TEXT NOT NULL, + read BOOLEAN NOT NULL DEFAULT FALSE +);` diff --git a/pushserver/storage/storage.go b/pushserver/storage/storage.go new file mode 100644 index 000000000..cb049db57 --- /dev/null +++ b/pushserver/storage/storage.go @@ -0,0 +1,24 @@ +//go:build !wasm +// +build !wasm + +package storage + +import ( + "fmt" + + "github.com/matrix-org/dendrite/pushserver/storage/postgres" + "github.com/matrix-org/dendrite/pushserver/storage/sqlite3" + "github.com/matrix-org/dendrite/setup/config" +) + +// Open opens a database connection. +func Open(dbProperties *config.DatabaseOptions) (Database, error) { + switch { + case dbProperties.ConnectionString.IsSQLite(): + return sqlite3.Open(dbProperties) + case dbProperties.ConnectionString.IsPostgres(): + return postgres.Open(dbProperties) + default: + return nil, fmt.Errorf("unexpected database type") + } +} diff --git a/pushserver/storage/storage_test.go b/pushserver/storage/storage_test.go new file mode 100644 index 000000000..cf3a1b441 --- /dev/null +++ b/pushserver/storage/storage_test.go @@ -0,0 +1,133 @@ +package storage + +import ( + "context" + "math/rand" + "os" + "strconv" + "testing" + + "github.com/matrix-org/dendrite/pushserver/api" + "github.com/matrix-org/dendrite/setup/config" + "github.com/matryer/is" +) + +var testCtx = context.Background() + +var testPushers = []api.Pusher{ + { + SessionID: 42984798792, + PushKey: "dc_GxbDa8El0pWKkDIM-rQ:APA91bHflmL6ycJMbLKX8VYLD-Ebft3t-SLQwIap-pDWP-evu1AWxsXxzyl1pgSZxDMn6OeznZsjXhTU0m5xz05dyJ4syX86S89uwxBwtbK-k0PHQt9wF8CgOcibm-OYZodpY5TtmknZ", + Kind: "http", + AppID: "com.example.app.ios", + AppDisplayName: "Mat Rix", + DeviceDisplayName: "iPhone 9", + ProfileTag: "xxyyzz", + Language: "pl", + Data: map[string]interface{}{ + "format": "event_id_only", + "url": "https://push-gateway.location.there/_matrix/push/v1/notify", + }, + }, + { + SessionID: 4298479873432, + PushKey: "dnjekDa8El0pWKkDIM-rQ:APA91bHflmL6ycJMbLKX8VYLD-Ebft3t-SLQwIap-pDWP-evu1AWxsXxzyl1pgSZxDMn6OeznZsjXhTU0m5xz05dyJ4syX86S89uwxBwtbK-k0PHQt9wF8CgOcibm-OYZodpY5TtmknZ", + Kind: "http", + AppID: "com.example.app.ios", + AppDisplayName: "Riot", + DeviceDisplayName: "Android 11", + ProfileTag: "aabbcc", + Language: "en", + Data: map[string]interface{}{ + "format": "event_id_only", + "url": "https://push-gateway.location.there/_matrix/push/v1/notify", + }, + }, + { + SessionID: 4298479873432, + PushKey: "dc_GxbDa8El0pWKkDIM-rQ:APA91bHflmL6ycJMbLKX8VYLD-Ebft3t-SLQwIap-pDWP-evu1AWxsXxzyl1pgSZxDMn6OeznZsjXhTU0m5xz05dyJ4syX86S89uwxBwtbK-k0PHQt9wF8CgOcibm-OYZodpY5TtmknZ", + Kind: "http", + AppID: "com.example.app.ios", + AppDisplayName: "Riot", + DeviceDisplayName: "Android 11", + ProfileTag: "aabbcc", + Language: "en", + Data: map[string]interface{}{ + "format": "event_id_only", + "url": "https://push-gateway.location.there/_matrix/push/v1/notify", + }, + }, +} + +var testUsers = []string{ + "admin", + "admin", + "admin0", +} + +func mustNewDatabaseWithTestPushers(is *is.I) Database { + randPostfix := strconv.Itoa(rand.Int()) + dbPath := os.TempDir() + "/dendrite-" + randPostfix + dut, err := Open(&config.DatabaseOptions{ + ConnectionString: config.DataSource("file:" + dbPath), + }) + is.NoErr(err) + for i, testPusher := range testPushers { + err = dut.CreatePusher(testCtx, testPusher, testUsers[i]) + is.NoErr(err) + } + return dut +} + +func TestCreatePusher(t *testing.T) { + is := is.New(t) + mustNewDatabaseWithTestPushers(is) +} + +func TestSelectPushers(t *testing.T) { + is := is.New(t) + dut := mustNewDatabaseWithTestPushers(is) + pushers, err := dut.GetPushers(testCtx, "admin") + is.NoErr(err) + is.Equal(len(pushers), 2) + is.Equal(pushers[0], testPushers[0]) + is.Equal(pushers[1], testPushers[1]) + // for i := range testPushers { + // } +} + +func TestDeletePusher(t *testing.T) { + is := is.New(t) + dut := mustNewDatabaseWithTestPushers(is) + err := dut.RemovePusher( + testCtx, + "com.example.app.ios", + "dc_GxbDa8El0pWKkDIM-rQ:APA91bHflmL6ycJMbLKX8VYLD-Ebft3t-SLQwIap-pDWP-evu1AWxsXxzyl1pgSZxDMn6OeznZsjXhTU0m5xz05dyJ4syX86S89uwxBwtbK-k0PHQt9wF8CgOcibm-OYZodpY5TtmknZ", + "admin") + is.NoErr(err) + pushers, err := dut.GetPushers(testCtx, "admin") + is.NoErr(err) + is.Equal(len(pushers), 1) + is.Equal(pushers[0], testPushers[1]) + pushers, err = dut.GetPushers(testCtx, "admin0") + is.NoErr(err) + is.Equal(len(pushers), 1) + is.Equal(pushers[0], testPushers[2]) +} + +func TestDeletePushers(t *testing.T) { + is := is.New(t) + dut := mustNewDatabaseWithTestPushers(is) + err := dut.RemovePushers( + testCtx, + "com.example.app.ios", + "dc_GxbDa8El0pWKkDIM-rQ:APA91bHflmL6ycJMbLKX8VYLD-Ebft3t-SLQwIap-pDWP-evu1AWxsXxzyl1pgSZxDMn6OeznZsjXhTU0m5xz05dyJ4syX86S89uwxBwtbK-k0PHQt9wF8CgOcibm-OYZodpY5TtmknZ") + is.NoErr(err) + pushers, err := dut.GetPushers(testCtx, "admin") + is.NoErr(err) + is.Equal(len(pushers), 1) + is.Equal(pushers[0], testPushers[1]) + pushers, err = dut.GetPushers(testCtx, "admin0") + is.NoErr(err) + is.Equal(len(pushers), 0) +} diff --git a/pushserver/storage/storage_wasm.go b/pushserver/storage/storage_wasm.go new file mode 100644 index 000000000..f3ed8673f --- /dev/null +++ b/pushserver/storage/storage_wasm.go @@ -0,0 +1,20 @@ +package storage + +import ( + "fmt" + + "github.com/matrix-org/dendrite/pushserver/storage/sqlite3" + "github.com/matrix-org/dendrite/setup/config" +) + +// NewDatabase opens a new database +func Open(dbProperties *config.DatabaseOptions) (Database, error) { + switch { + case dbProperties.ConnectionString.IsSQLite(): + return sqlite3.Open(dbProperties) + case dbProperties.ConnectionString.IsPostgres(): + return nil, fmt.Errorf("can't use Postgres implementation") + default: + return nil, fmt.Errorf("unexpected database type") + } +} diff --git a/pushserver/storage/tables/interface.go b/pushserver/storage/tables/interface.go new file mode 100644 index 000000000..89656abbf --- /dev/null +++ b/pushserver/storage/tables/interface.go @@ -0,0 +1,25 @@ +package tables + +import ( + "context" + + "github.com/matrix-org/dendrite/pushserver/api" + "github.com/matrix-org/gomatrixserverlib" +) + +type Pusher interface { + InsertPusher( + ctx context.Context, session_id int64, + pushkey string, pushkeyTS gomatrixserverlib.Timestamp, kind api.PusherKind, + appid, appdisplayname, devicedisplayname, profiletag, lang, data, localpart string, + ) error + SelectPushers( + ctx context.Context, localpart string, + ) ([]api.Pusher, error) + DeletePusher( + ctx context.Context, appid, pushkey, localpart string, + ) error + DeletePushers( + ctx context.Context, appid, pushkey string, + ) error +}