Add Pushserver component with Pushers API

Co-authored-by: Tommie Gannert <tommie@gannert.se>
Co-authored-by: Dan Peleg <dan@globekeeper.com>
This commit is contained in:
Piotr Kozimor 2021-11-17 17:00:36 +01:00
parent e81ef1649b
commit 151d52ce23
17 changed files with 997 additions and 0 deletions

1
go.mod
View file

@ -35,6 +35,7 @@ require (
github.com/matrix-org/naffka v0.0.0-20210623111924-14ff508b58e0 github.com/matrix-org/naffka v0.0.0-20210623111924-14ff508b58e0
github.com/matrix-org/pinecone v0.0.0-20211116111603-febf3501584d github.com/matrix-org/pinecone v0.0.0-20211116111603-febf3501584d
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
github.com/matryer/is v1.4.0 // indirect
github.com/mattn/go-sqlite3 v1.14.8 github.com/mattn/go-sqlite3 v1.14.8
github.com/morikuni/aec v1.0.0 // indirect github.com/morikuni/aec v1.0.0 // indirect
github.com/neilalexander/utp v0.1.1-0.20210727203401-54ae7b1cd5f9 github.com/neilalexander/utp v0.1.1-0.20210727203401-54ae7b1cd5f9

2
go.sum
View file

@ -1002,6 +1002,8 @@ github.com/matrix-org/pinecone v0.0.0-20211116111603-febf3501584d/go.mod h1:r6ds
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U= github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 h1:eCEHXWDv9Rm335MSuB49mFUK44bwZPFSDde3ORE3syk= github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 h1:eCEHXWDv9Rm335MSuB49mFUK44bwZPFSDde3ORE3syk=
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U= github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=
github.com/matryer/is v1.4.0 h1:sosSmIWwkYITGrxZ25ULNDeKiMNzFSr4V/eqBQP0PeE=
github.com/matryer/is v1.4.0/go.mod h1:8I/i5uYgLzgsgEloJE1U6xx5HkBQpAZvepWuujKwMRU=
github.com/mattn/go-colorable v0.0.6/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-colorable v0.0.6/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ= github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ=

53
pushserver/api/api.go Normal file
View file

@ -0,0 +1,53 @@
package api
import (
"context"
"github.com/matrix-org/gomatrixserverlib"
)
type PushserverInternalAPI interface {
PerformPusherSet(ctx context.Context, req *PerformPusherSetRequest, res *struct{}) error
PerformPusherDeletion(ctx context.Context, req *PerformPusherDeletionRequest, res *struct{}) error
QueryPushers(ctx context.Context, req *QueryPushersRequest, res *QueryPushersResponse) error
}
type QueryPushersRequest struct {
Localpart string
}
type QueryPushersResponse struct {
Pushers []Pusher `json:"pushers"`
}
type PerformPusherSetRequest struct {
Pusher // Anonymous field because that's how clientapi unmarshals it.
Localpart string
Append bool `json:"append"`
}
type PerformPusherDeletionRequest struct {
Localpart string
SessionID int64
}
// Pusher represents a push notification subscriber
type Pusher struct {
SessionID int64 `json:"session_id,omitempty"`
PushKey string `json:"pushkey"`
PushKeyTS gomatrixserverlib.Timestamp `json:"pushkey_ts,omitempty"`
Kind PusherKind `json:"kind"`
AppID string `json:"app_id"`
AppDisplayName string `json:"app_display_name"`
DeviceDisplayName string `json:"device_display_name"`
ProfileTag string `json:"profile_tag"`
Language string `json:"lang"`
Data map[string]interface{} `json:"data"`
}
type PusherKind string
const (
EmailKind PusherKind = "email"
HTTPKind PusherKind = "http"
)

View file

@ -0,0 +1,73 @@
package internal
import (
"context"
"time"
"github.com/matrix-org/dendrite/pushserver/api"
"github.com/matrix-org/dendrite/pushserver/storage"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
)
// PushserverInternalAPI implements api.PushserverInternalAPI
type PushserverInternalAPI struct {
Cfg *config.PushServer
DB storage.Database
}
func NewPushserverAPI(
cfg *config.PushServer, pushserverDB storage.Database,
) *PushserverInternalAPI {
a := &PushserverInternalAPI{
Cfg: cfg,
DB: pushserverDB,
}
return a
}
func (a *PushserverInternalAPI) PerformPusherSet(ctx context.Context, req *api.PerformPusherSetRequest, res *struct{}) error {
util.GetLogger(ctx).WithFields(logrus.Fields{
"localpart": req.Localpart,
"pushkey": req.Pusher.PushKey,
"display_name": req.Pusher.AppDisplayName,
}).Info("PerformPusherCreation")
if !req.Append {
err := a.DB.RemovePushers(ctx, req.Pusher.AppID, req.Pusher.PushKey)
if err != nil {
return err
}
}
if req.Pusher.Kind == "" {
return a.DB.RemovePusher(ctx, req.Pusher.AppID, req.Pusher.PushKey, req.Localpart)
}
if req.Pusher.PushKeyTS == 0 {
req.Pusher.PushKeyTS = gomatrixserverlib.AsTimestamp(time.Now())
}
return a.DB.CreatePusher(ctx, req.Pusher, req.Localpart)
}
func (a *PushserverInternalAPI) PerformPusherDeletion(ctx context.Context, req *api.PerformPusherDeletionRequest, res *struct{}) error {
pushers, err := a.DB.GetPushers(ctx, req.Localpart)
if err != nil {
return err
}
for i := range pushers {
logrus.Warnf("pusher session: %d, req session: %d", pushers[i].SessionID, req.SessionID)
if pushers[i].SessionID != req.SessionID {
err := a.DB.RemovePusher(ctx, pushers[i].AppID, pushers[i].PushKey, req.Localpart)
if err != nil {
return err
}
}
}
return nil
}
func (a *PushserverInternalAPI) QueryPushers(ctx context.Context, req *api.QueryPushersRequest, res *api.QueryPushersResponse) error {
var err error
res.Pushers, err = a.DB.GetPushers(ctx, req.Localpart)
return err
}

View file

@ -0,0 +1,143 @@
package internal
import (
"context"
"math/rand"
"os"
"strconv"
"testing"
"github.com/matrix-org/dendrite/pushserver/api"
"github.com/matrix-org/dendrite/pushserver/storage"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matryer/is"
)
var (
ctx = context.Background()
localpart = "foo"
testPusher = api.Pusher{
SessionID: 42984798792,
PushKey: "dc_GxbDa8El0pWKkDIM-rQ:APA91bHflmL6ycJMbLKX8VYLD-Ebft3t-SLQwIap-pDWP-evu1AWxsXxzyl1pgSZxDMn6OeznZsjXhTU0m5xz05dyJ4syX86S89uwxBwtbK-k0PHQt9wF8CgOcibm-OYZodpY5TtmknZ",
Kind: "http",
AppID: "com.example.app.ios",
AppDisplayName: "Mat Rix",
DeviceDisplayName: "iPhone 9",
ProfileTag: "xxyyzz",
Language: "pl",
Data: map[string]interface{}{
"format": "event_id_only",
"url": "https://push-gateway.location.there/_matrix/push/v1/notify",
},
}
testPusher2 = api.Pusher{
SessionID: 42984798792,
PushKey: "dc_GxbDa8El0pWKkDIM-rQ:APA91bHflmL6ycJMbLKX8VYLD-Ebft3t-SLQwIap-pDWP-evu1AWxsXxzyl1pgSZxDMn6OeznZsjXhTU0m5xz05dyJ4syX86S89uwxBwtbK-k0PHQt9wF8CgOcibm-OYZodpY5TtmknZ---",
Kind: "http",
AppID: "com.example.app.ios",
AppDisplayName: "Mat Rix",
DeviceDisplayName: "iPhone 9",
ProfileTag: "xxyyzz",
Language: "pl",
Data: map[string]interface{}{
"format": "event_id_only",
"url": "https://push-gateway.location.there/_matrix/push/v1/notify",
},
}
nilPusher = api.Pusher{
PushKey: "dc_GxbDa8El0pWKkDIM-rQ:APA91bHflmL6ycJMbLKX8VYLD-Ebft3t-SLQwIap-pDWP-evu1AWxsXxzyl1pgSZxDMn6OeznZsjXhTU0m5xz05dyJ4syX86S89uwxBwtbK-k0PHQt9wF8CgOcibm-OYZodpY5TtmknZ",
AppID: "com.example.app.ios",
}
)
func TestPerformPusherSet(t *testing.T) {
is := is.New(t)
dut := mustNewPushserverAPI(is)
pushers := mustSetPushers(is, dut, testPusher)
is.Equal(len(pushers.Pushers), 1)
pushKeyTS := pushers.Pushers[0].PushKeyTS
is.True(pushKeyTS != 0)
pushers.Pushers[0].PushKeyTS = 0
is.Equal(pushers.Pushers[0], testPusher)
pushers.Pushers[0].PushKeyTS = pushKeyTS
}
func TestPerformPusherSet_Append(t *testing.T) {
is := is.New(t)
dut := mustNewPushserverAPI(is)
mustSetPushers(is, dut, testPusher)
pushers := mustAppendPushers(is, dut, testPusher2)
is.Equal(len(pushers.Pushers), 2)
is.True(pushers.Pushers[1].PushKeyTS != 0)
pushers.Pushers[1].PushKeyTS = 0
is.Equal(pushers.Pushers[1], testPusher2)
}
func TestPerformPusherSet_Delete(t *testing.T) {
is := is.New(t)
dut := mustNewPushserverAPI(is)
mustSetPushers(is, dut, testPusher)
pushers := mustSetPushers(is, dut, nilPusher)
// pushers := mustAppendPushers(is, dut, testPusher2)
is.Equal(len(pushers.Pushers), 0)
}
func TestPerformPusherSet_AppendDelete(t *testing.T) {
is := is.New(t)
dut := mustNewPushserverAPI(is)
mustSetPushers(is, dut, testPusher)
mustAppendPushers(is, dut, testPusher2)
pushers := mustAppendPushers(is, dut, nilPusher)
is.Equal(len(pushers.Pushers), 1)
is.True(pushers.Pushers[0].PushKeyTS != 0)
pushers.Pushers[0].PushKeyTS = 0
is.Equal(pushers.Pushers[0], testPusher2)
}
func mustNewPushserverAPI(is *is.I) api.PushserverInternalAPI {
db := mustNewDatabase(is)
return &PushserverInternalAPI{
DB: db,
}
}
func mustNewDatabase(is *is.I) storage.Database {
randPostfix := strconv.Itoa(rand.Int())
dbPath := os.TempDir() + "/dendrite-" + randPostfix
dut, err := storage.Open(&config.DatabaseOptions{
ConnectionString: config.DataSource("file:" + dbPath),
})
is.NoErr(err)
return dut
}
func mustSetPushers(is *is.I, dut api.PushserverInternalAPI, p api.Pusher) *api.QueryPushersResponse {
err := dut.PerformPusherSet(ctx, &api.PerformPusherSetRequest{
Localpart: localpart,
Append: false,
Pusher: p,
}, &struct{}{})
is.NoErr(err)
var pushers api.QueryPushersResponse
err = dut.QueryPushers(ctx, &api.QueryPushersRequest{
Localpart: localpart,
}, &pushers)
is.NoErr(err)
return &pushers
}
func mustAppendPushers(is *is.I, dut api.PushserverInternalAPI, p api.Pusher) *api.QueryPushersResponse {
err := dut.PerformPusherSet(ctx, &api.PerformPusherSetRequest{
Localpart: localpart,
Append: true,
Pusher: p,
}, &struct{}{})
is.NoErr(err)
var pushers api.QueryPushersResponse
err = dut.QueryPushers(ctx, &api.QueryPushersRequest{
Localpart: localpart,
}, &pushers)
is.NoErr(err)
return &pushers
}

View file

@ -0,0 +1,70 @@
package inthttp
import (
"context"
"errors"
"net/http"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/pushserver/api"
"github.com/opentracing/opentracing-go"
)
type httpPushserverInternalAPI struct {
pushserverURL string
httpClient *http.Client
}
const (
QueryNotificationsPath = "/pushserver/queryNotifications"
PerformPusherSetPath = "/pushserver/performPusherSet"
PerformPusherDeletionPath = "/pushserver/performPusherDeletion"
QueryPushersPath = "/pushserver/queryPushers"
PerformPushRulesPutPath = "/pushserver/performPushRulesPut"
QueryPushRulesPath = "/pushserver/queryPushRules"
)
// NewPushserverClient creates a PushserverInternalAPI implemented by talking to a HTTP POST API.
// If httpClient is nil an error is returned
func NewPushserverClient(
pushserverURL string,
httpClient *http.Client,
) (api.PushserverInternalAPI, error) {
if httpClient == nil {
return nil, errors.New("NewPushserverClient: httpClient is <nil>")
}
return &httpPushserverInternalAPI{
pushserverURL: pushserverURL,
httpClient: httpClient,
}, nil
}
func (h *httpPushserverInternalAPI) PerformPusherSet(
ctx context.Context,
request *api.PerformPusherSetRequest,
response *struct{},
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "PerformPusherSet")
defer span.Finish()
apiURL := h.pushserverURL + PerformPusherSetPath
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
func (h *httpPushserverInternalAPI) PerformPusherDeletion(ctx context.Context, req *api.PerformPusherDeletionRequest, res *struct{}) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "PerformPusherDeletion")
defer span.Finish()
apiURL := h.pushserverURL + PerformPusherDeletionPath
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, req, res)
}
func (h *httpPushserverInternalAPI) QueryPushers(ctx context.Context, req *api.QueryPushersRequest, res *api.QueryPushersResponse) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "QueryPushers")
defer span.Finish()
apiURL := h.pushserverURL + QueryPushersPath
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, req, res)
}

View file

@ -0,0 +1,44 @@
package inthttp
import (
"encoding/json"
"net/http"
"github.com/gorilla/mux"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/pushserver/api"
"github.com/matrix-org/util"
)
// AddRoutes adds the PushserverInternalAPI handlers to the http.ServeMux.
// nolint: gocyclo
func AddRoutes(r api.PushserverInternalAPI, internalAPIMux *mux.Router) {
internalAPIMux.Handle(PerformPusherSetPath,
httputil.MakeInternalAPI("performPusherSet", func(req *http.Request) util.JSONResponse {
request := api.PerformPusherSetRequest{}
response := struct{}{}
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
if err := r.PerformPusherSet(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
internalAPIMux.Handle(PerformPusherDeletionPath,
httputil.MakeInternalAPI("performPusherDeletion", func(req *http.Request) util.JSONResponse {
request := api.PerformPusherDeletionRequest{}
response := struct{}{}
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
if err := r.PerformPusherDeletion(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
}

38
pushserver/pushserver.go Normal file
View file

@ -0,0 +1,38 @@
package pushserver
import (
"github.com/gorilla/mux"
"github.com/matrix-org/dendrite/pushserver/api"
"github.com/matrix-org/dendrite/pushserver/internal"
"github.com/matrix-org/dendrite/pushserver/inthttp"
"github.com/matrix-org/dendrite/pushserver/storage"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup"
"github.com/sirupsen/logrus"
)
// AddInternalRoutes registers HTTP handlers for the internal API. Invokes functions
// on the given input API.
func AddInternalRoutes(router *mux.Router, intAPI api.PushserverInternalAPI) {
inthttp.AddRoutes(intAPI, router)
}
// NewInternalAPI returns a concerete implementation of the internal API. Callers
// can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes.
func NewInternalAPI(
base *setup.BaseDendrite,
rsAPI roomserverAPI.RoomserverInternalAPI,
) api.PushserverInternalAPI {
cfg := &base.Cfg.PushServer
db, err := storage.Open(&cfg.Database)
if err != nil {
logrus.WithError(err).Panicf("failed to connect to push server db")
}
psAPI := internal.NewPushserverAPI(
cfg, db,
)
return psAPI
}

View file

@ -0,0 +1,16 @@
package storage
import (
"context"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/pushserver/api"
)
type Database interface {
internal.PartitionStorer
CreatePusher(ctx context.Context, pusher api.Pusher, localpart string) error
GetPushers(ctx context.Context, localpart string) ([]api.Pusher, error)
RemovePusher(ctx context.Context, appId, pushkey, localpart string) error
RemovePushers(ctx context.Context, appId, pushkey string) error
}

View file

@ -0,0 +1,62 @@
package postgres
import (
"database/sql"
"fmt"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/pushserver/storage/shared"
"github.com/matrix-org/dendrite/setup/config"
)
type Database struct {
shared.Database
sqlutil.PartitionOffsetStatements
}
func Open(dbProperties *config.DatabaseOptions) (*Database, error) {
var d Database
var err error
if d.DB, err = sqlutil.Open(dbProperties); err != nil {
return nil, fmt.Errorf("sqlutil.Open: %w", err)
}
d.Writer = sqlutil.NewDummyWriter()
if err = d.PartitionOffsetStatements.Prepare(d.DB, d.Writer, "pushserver"); err != nil {
return nil, err
}
if err = createNotificationsTable(d.DB); err != nil {
return nil, err
}
if err = shared.CreatePushersTable(d.DB); err != nil {
return nil, err
}
if err = d.Database.Prepare(); err != nil {
return nil, err
}
return &d, nil
}
func createNotificationsTable(db *sql.DB) error {
_, err := db.Exec(notificationsSchema)
return err
}
const notificationsSchema = `
CREATE TABLE IF NOT EXISTS pushserver_notifications (
id BIGSERIAL PRIMARY KEY,
localpart TEXT NOT NULL,
room_id TEXT NOT NULL,
event_id TEXT NOT NULL,
ts_ms BIGINT NOT NULL,
highlight BOOLEAN NOT NULL,
notification_json TEXT NOT NULL,
read BOOLEAN NOT NULL DEFAULT FALSE
);
CREATE INDEX IF NOT EXISTS notification_localpart_room_id_event_id_idx ON pushserver_notifications(localpart, room_id, event_id);
CREATE INDEX IF NOT EXISTS notification_localpart_room_id_id_idx ON pushserver_notifications(localpart, room_id, id);
CREATE INDEX IF NOT EXISTS notification_localpart_id_idx ON pushserver_notifications(localpart, id);
`

View file

@ -0,0 +1,157 @@
// Copyright 2021 Dan Peleg <dan@globekeeper.com>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package shared
import (
"context"
"database/sql"
"encoding/json"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/pushserver/api"
"github.com/matrix-org/dendrite/pushserver/storage/tables"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
)
// See https://matrix.org/docs/spec/client_server/r0.6.1#get-matrix-client-r0-pushers
const pushersSchema = `
CREATE TABLE IF NOT EXISTS pushserver_pushers (
id SERIAL PRIMARY KEY,
-- The Matrix user ID localpart for this pusher
localpart TEXT NOT NULL,
session_id BIGINT DEFAULT NULL,
profile_tag TEXT,
kind TEXT NOT NULL,
app_id TEXT NOT NULL,
app_display_name TEXT NOT NULL,
device_display_name TEXT NOT NULL,
pushkey TEXT NOT NULL,
pushkey_ts_ms BIGINT NOT NULL DEFAULT 0,
lang TEXT NOT NULL,
data TEXT NOT NULL
);
-- For faster deleting by app_id, pushkey pair.
CREATE INDEX IF NOT EXISTS pusher_app_id_pushkey_idx ON pushserver_pushers(app_id, pushkey);
-- For faster retrieving by localpart.
CREATE INDEX IF NOT EXISTS pusher_localpart_idx ON pushserver_pushers(localpart);
-- Pushkey must be unique for a given user and app.
CREATE UNIQUE INDEX IF NOT EXISTS pusher_app_id_pushkey_localpart_idx ON pushserver_pushers(app_id, pushkey, localpart);
`
const insertPusherSQL = "" +
"INSERT INTO pushserver_pushers (localpart, session_id, pushkey, pushkey_ts_ms, kind, app_id, app_display_name, device_display_name, profile_tag, lang, data) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)"
const selectPushersSQL = "" +
"SELECT session_id, pushkey, pushkey_ts_ms, kind, app_id, app_display_name, device_display_name, profile_tag, lang, data FROM pushserver_pushers WHERE localpart = $1"
const deletePusherSQL = "" +
"DELETE FROM pushserver_pushers WHERE app_id = $1 AND pushkey = $2 AND localpart = $3"
const deletePushersByAppIdAndPushKeySQL = "" +
"DELETE FROM pushserver_pushers WHERE app_id = $1 AND pushkey = $2"
type pushersStatements struct {
insertPusherStmt *sql.Stmt
selectPushersStmt *sql.Stmt
deletePusherStmt *sql.Stmt
deletePushersByAppIdAndPushKeyStmt *sql.Stmt
}
func CreatePushersTable(db *sql.DB) error {
_, err := db.Exec(pushersSchema)
return err
}
func preparePushersTable(db *sql.DB) (tables.Pusher, error) {
s := &pushersStatements{}
return s, sqlutil.StatementList{
{&s.insertPusherStmt, insertPusherSQL},
{&s.selectPushersStmt, selectPushersSQL},
{&s.deletePusherStmt, deletePusherSQL},
{&s.deletePushersByAppIdAndPushKeyStmt, deletePushersByAppIdAndPushKeySQL},
}.Prepare(db)
}
// insertPusher creates a new pusher.
// Returns an error if the user already has a pusher with the given pusher pushkey.
// Returns nil error success.
func (s *pushersStatements) InsertPusher(
ctx context.Context, session_id int64,
pushkey string, pushkeyTS gomatrixserverlib.Timestamp, kind api.PusherKind, appid, appdisplayname, devicedisplayname, profiletag, lang, data, localpart string,
) error {
_, err := s.insertPusherStmt.ExecContext(ctx, localpart, session_id, pushkey, pushkeyTS, kind, appid, appdisplayname, devicedisplayname, profiletag, lang, data)
logrus.Debugf("Created pusher %d", session_id)
return err
}
func (s *pushersStatements) SelectPushers(
ctx context.Context, localpart string,
) ([]api.Pusher, error) {
pushers := []api.Pusher{}
rows, err := s.selectPushersStmt.QueryContext(ctx, localpart)
if err != nil {
return pushers, err
}
defer internal.CloseAndLogIfError(ctx, rows, "SelectPushers: rows.close() failed")
for rows.Next() {
var pusher api.Pusher
var data []byte
err = rows.Scan(
&pusher.SessionID,
&pusher.PushKey,
&pusher.PushKeyTS,
&pusher.Kind,
&pusher.AppID,
&pusher.AppDisplayName,
&pusher.DeviceDisplayName,
&pusher.ProfileTag,
&pusher.Language,
&data)
if err != nil {
return pushers, err
}
err := json.Unmarshal(data, &pusher.Data)
if err != nil {
return pushers, err
}
pushers = append(pushers, pusher)
}
logrus.Debugf("Database returned %d pushers", len(pushers))
return pushers, rows.Err()
}
// deletePusher removes a single pusher by pushkey and user localpart.
func (s *pushersStatements) DeletePusher(
ctx context.Context, appid, pushkey, localpart string,
) error {
_, err := s.deletePusherStmt.ExecContext(ctx, appid, pushkey, localpart)
return err
}
func (s *pushersStatements) DeletePushers(
ctx context.Context, appid, pushkey string,
) error {
_, err := s.deletePushersByAppIdAndPushKeyStmt.ExecContext(ctx, appid, pushkey)
return err
}

View file

@ -0,0 +1,79 @@
package shared
import (
"context"
"database/sql"
"encoding/json"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/pushserver/api"
"github.com/matrix-org/dendrite/pushserver/storage/tables"
)
type Database struct {
DB *sql.DB
Writer sqlutil.Writer
pushers tables.Pusher
}
func (d *Database) Prepare() (err error) {
d.pushers, err = preparePushersTable(d.DB)
return
}
func (d *Database) CreatePusher(
ctx context.Context, p api.Pusher, localpart string,
) error {
data, err := json.Marshal(p.Data)
if err != nil {
return err
}
return d.Writer.Do(nil, nil, func(_ *sql.Tx) error {
return d.pushers.InsertPusher(
ctx,
p.SessionID,
p.PushKey,
p.PushKeyTS,
p.Kind,
p.AppID,
p.AppDisplayName,
p.DeviceDisplayName,
p.ProfileTag,
p.Language,
string(data),
localpart)
})
}
// GetPushers returns the pushers matching the given localpart.
func (d *Database) GetPushers(
ctx context.Context, localpart string,
) ([]api.Pusher, error) {
return d.pushers.SelectPushers(ctx, localpart)
}
// RemovePusher deletes one pusher
// Invoked when `append` is true and `kind` is null in
// https://matrix.org/docs/spec/client_server/r0.6.1#post-matrix-client-r0-pushers-set
func (d *Database) RemovePusher(
ctx context.Context, appid, pushkey, localpart string,
) error {
return d.Writer.Do(nil, nil, func(_ *sql.Tx) error {
err := d.pushers.DeletePusher(ctx, appid, pushkey, localpart)
if err == sql.ErrNoRows {
return nil
}
return err
})
}
// RemovePushers deletes all pushers that match given App Id and Push Key pair.
// Invoked when `append` parameter is false in
// https://matrix.org/docs/spec/client_server/r0.6.1#post-matrix-client-r0-pushers-set
func (d *Database) RemovePushers(
ctx context.Context, appid, pushkey string,
) error {
return d.Writer.Do(nil, nil, func(_ *sql.Tx) error {
return d.pushers.DeletePushers(ctx, appid, pushkey)
})
}

View file

@ -0,0 +1,57 @@
package sqlite3
import (
"database/sql"
"fmt"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/pushserver/storage/shared"
"github.com/matrix-org/dendrite/setup/config"
)
type Database struct {
shared.Database
sqlutil.PartitionOffsetStatements
}
func Open(dbProperties *config.DatabaseOptions) (*Database, error) {
var d Database
var err error
if d.DB, err = sqlutil.Open(dbProperties); err != nil {
return nil, fmt.Errorf("sqlutil.Open: %w", err)
}
d.Writer = sqlutil.NewExclusiveWriter()
if err = d.PartitionOffsetStatements.Prepare(d.DB, d.Writer, "pushserver"); err != nil {
return nil, err
}
if err = createNotificationsTable(d.DB); err != nil {
return nil, err
}
if err = shared.CreatePushersTable(d.DB); err != nil {
return nil, err
}
if err = d.Database.Prepare(); err != nil {
return nil, err
}
return &d, nil
}
func createNotificationsTable(db *sql.DB) error {
_, err := db.Exec(notificationsSchema)
return err
}
const notificationsSchema = `
CREATE TABLE IF NOT EXISTS pushserver_notifications (
id INTEGER PRIMARY KEY,
localpart TEXT NOT NULL,
room_id TEXT NOT NULL,
event_id TEXT NOT NULL,
ts_ms BIGINT NOT NULL,
highlight BOOLEAN NOT NULL,
notification_json TEXT NOT NULL,
read BOOLEAN NOT NULL DEFAULT FALSE
);`

View file

@ -0,0 +1,24 @@
//go:build !wasm
// +build !wasm
package storage
import (
"fmt"
"github.com/matrix-org/dendrite/pushserver/storage/postgres"
"github.com/matrix-org/dendrite/pushserver/storage/sqlite3"
"github.com/matrix-org/dendrite/setup/config"
)
// Open opens a database connection.
func Open(dbProperties *config.DatabaseOptions) (Database, error) {
switch {
case dbProperties.ConnectionString.IsSQLite():
return sqlite3.Open(dbProperties)
case dbProperties.ConnectionString.IsPostgres():
return postgres.Open(dbProperties)
default:
return nil, fmt.Errorf("unexpected database type")
}
}

View file

@ -0,0 +1,133 @@
package storage
import (
"context"
"math/rand"
"os"
"strconv"
"testing"
"github.com/matrix-org/dendrite/pushserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matryer/is"
)
var testCtx = context.Background()
var testPushers = []api.Pusher{
{
SessionID: 42984798792,
PushKey: "dc_GxbDa8El0pWKkDIM-rQ:APA91bHflmL6ycJMbLKX8VYLD-Ebft3t-SLQwIap-pDWP-evu1AWxsXxzyl1pgSZxDMn6OeznZsjXhTU0m5xz05dyJ4syX86S89uwxBwtbK-k0PHQt9wF8CgOcibm-OYZodpY5TtmknZ",
Kind: "http",
AppID: "com.example.app.ios",
AppDisplayName: "Mat Rix",
DeviceDisplayName: "iPhone 9",
ProfileTag: "xxyyzz",
Language: "pl",
Data: map[string]interface{}{
"format": "event_id_only",
"url": "https://push-gateway.location.there/_matrix/push/v1/notify",
},
},
{
SessionID: 4298479873432,
PushKey: "dnjekDa8El0pWKkDIM-rQ:APA91bHflmL6ycJMbLKX8VYLD-Ebft3t-SLQwIap-pDWP-evu1AWxsXxzyl1pgSZxDMn6OeznZsjXhTU0m5xz05dyJ4syX86S89uwxBwtbK-k0PHQt9wF8CgOcibm-OYZodpY5TtmknZ",
Kind: "http",
AppID: "com.example.app.ios",
AppDisplayName: "Riot",
DeviceDisplayName: "Android 11",
ProfileTag: "aabbcc",
Language: "en",
Data: map[string]interface{}{
"format": "event_id_only",
"url": "https://push-gateway.location.there/_matrix/push/v1/notify",
},
},
{
SessionID: 4298479873432,
PushKey: "dc_GxbDa8El0pWKkDIM-rQ:APA91bHflmL6ycJMbLKX8VYLD-Ebft3t-SLQwIap-pDWP-evu1AWxsXxzyl1pgSZxDMn6OeznZsjXhTU0m5xz05dyJ4syX86S89uwxBwtbK-k0PHQt9wF8CgOcibm-OYZodpY5TtmknZ",
Kind: "http",
AppID: "com.example.app.ios",
AppDisplayName: "Riot",
DeviceDisplayName: "Android 11",
ProfileTag: "aabbcc",
Language: "en",
Data: map[string]interface{}{
"format": "event_id_only",
"url": "https://push-gateway.location.there/_matrix/push/v1/notify",
},
},
}
var testUsers = []string{
"admin",
"admin",
"admin0",
}
func mustNewDatabaseWithTestPushers(is *is.I) Database {
randPostfix := strconv.Itoa(rand.Int())
dbPath := os.TempDir() + "/dendrite-" + randPostfix
dut, err := Open(&config.DatabaseOptions{
ConnectionString: config.DataSource("file:" + dbPath),
})
is.NoErr(err)
for i, testPusher := range testPushers {
err = dut.CreatePusher(testCtx, testPusher, testUsers[i])
is.NoErr(err)
}
return dut
}
func TestCreatePusher(t *testing.T) {
is := is.New(t)
mustNewDatabaseWithTestPushers(is)
}
func TestSelectPushers(t *testing.T) {
is := is.New(t)
dut := mustNewDatabaseWithTestPushers(is)
pushers, err := dut.GetPushers(testCtx, "admin")
is.NoErr(err)
is.Equal(len(pushers), 2)
is.Equal(pushers[0], testPushers[0])
is.Equal(pushers[1], testPushers[1])
// for i := range testPushers {
// }
}
func TestDeletePusher(t *testing.T) {
is := is.New(t)
dut := mustNewDatabaseWithTestPushers(is)
err := dut.RemovePusher(
testCtx,
"com.example.app.ios",
"dc_GxbDa8El0pWKkDIM-rQ:APA91bHflmL6ycJMbLKX8VYLD-Ebft3t-SLQwIap-pDWP-evu1AWxsXxzyl1pgSZxDMn6OeznZsjXhTU0m5xz05dyJ4syX86S89uwxBwtbK-k0PHQt9wF8CgOcibm-OYZodpY5TtmknZ",
"admin")
is.NoErr(err)
pushers, err := dut.GetPushers(testCtx, "admin")
is.NoErr(err)
is.Equal(len(pushers), 1)
is.Equal(pushers[0], testPushers[1])
pushers, err = dut.GetPushers(testCtx, "admin0")
is.NoErr(err)
is.Equal(len(pushers), 1)
is.Equal(pushers[0], testPushers[2])
}
func TestDeletePushers(t *testing.T) {
is := is.New(t)
dut := mustNewDatabaseWithTestPushers(is)
err := dut.RemovePushers(
testCtx,
"com.example.app.ios",
"dc_GxbDa8El0pWKkDIM-rQ:APA91bHflmL6ycJMbLKX8VYLD-Ebft3t-SLQwIap-pDWP-evu1AWxsXxzyl1pgSZxDMn6OeznZsjXhTU0m5xz05dyJ4syX86S89uwxBwtbK-k0PHQt9wF8CgOcibm-OYZodpY5TtmknZ")
is.NoErr(err)
pushers, err := dut.GetPushers(testCtx, "admin")
is.NoErr(err)
is.Equal(len(pushers), 1)
is.Equal(pushers[0], testPushers[1])
pushers, err = dut.GetPushers(testCtx, "admin0")
is.NoErr(err)
is.Equal(len(pushers), 0)
}

View file

@ -0,0 +1,20 @@
package storage
import (
"fmt"
"github.com/matrix-org/dendrite/pushserver/storage/sqlite3"
"github.com/matrix-org/dendrite/setup/config"
)
// NewDatabase opens a new database
func Open(dbProperties *config.DatabaseOptions) (Database, error) {
switch {
case dbProperties.ConnectionString.IsSQLite():
return sqlite3.Open(dbProperties)
case dbProperties.ConnectionString.IsPostgres():
return nil, fmt.Errorf("can't use Postgres implementation")
default:
return nil, fmt.Errorf("unexpected database type")
}
}

View file

@ -0,0 +1,25 @@
package tables
import (
"context"
"github.com/matrix-org/dendrite/pushserver/api"
"github.com/matrix-org/gomatrixserverlib"
)
type Pusher interface {
InsertPusher(
ctx context.Context, session_id int64,
pushkey string, pushkeyTS gomatrixserverlib.Timestamp, kind api.PusherKind,
appid, appdisplayname, devicedisplayname, profiletag, lang, data, localpart string,
) error
SelectPushers(
ctx context.Context, localpart string,
) ([]api.Pusher, error)
DeletePusher(
ctx context.Context, appid, pushkey, localpart string,
) error
DeletePushers(
ctx context.Context, appid, pushkey string,
) error
}