Merge branch 'master' into kegan/readme

This commit is contained in:
Kegan Dougal 2020-07-14 10:52:27 +01:00
commit d53485d272
42 changed files with 1282 additions and 322 deletions

View file

@ -3,6 +3,7 @@ package gobind
import ( import (
"context" "context"
"crypto/tls" "crypto/tls"
"encoding/hex"
"fmt" "fmt"
"net" "net"
"net/http" "net/http"
@ -16,6 +17,7 @@ import (
"github.com/matrix-org/dendrite/eduserver" "github.com/matrix-org/dendrite/eduserver"
"github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/federationsender" "github.com/matrix-org/dendrite/federationsender"
"github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/internal/setup" "github.com/matrix-org/dendrite/internal/setup"
@ -23,12 +25,18 @@ import (
"github.com/matrix-org/dendrite/userapi" "github.com/matrix-org/dendrite/userapi"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
"go.uber.org/atomic"
) )
type DendriteMonolith struct { type DendriteMonolith struct {
logger logrus.Logger
YggdrasilNode *yggconn.Node YggdrasilNode *yggconn.Node
StorageDirectory string StorageDirectory string
listener net.Listener listener net.Listener
httpServer *http.Server
httpListening atomic.Bool
yggListening atomic.Bool
} }
func (m *DendriteMonolith) BaseURL() string { func (m *DendriteMonolith) BaseURL() string {
@ -56,9 +64,10 @@ func (m *DendriteMonolith) DisconnectMulticastPeers() {
} }
func (m *DendriteMonolith) Start() { func (m *DendriteMonolith) Start() {
logger := logrus.Logger{ m.logger = logrus.Logger{
Out: BindLogger{}, Out: BindLogger{},
} }
m.logger.SetOutput(BindLogger{})
logrus.SetOutput(BindLogger{}) logrus.SetOutput(BindLogger{})
var err error var err error
@ -93,6 +102,8 @@ func (m *DendriteMonolith) Start() {
cfg.Database.AppService = config.DataSource(fmt.Sprintf("file:%s/dendrite-appservice.db", m.StorageDirectory)) cfg.Database.AppService = config.DataSource(fmt.Sprintf("file:%s/dendrite-appservice.db", m.StorageDirectory))
cfg.Database.CurrentState = config.DataSource(fmt.Sprintf("file:%s/dendrite-currentstate.db", m.StorageDirectory)) cfg.Database.CurrentState = config.DataSource(fmt.Sprintf("file:%s/dendrite-currentstate.db", m.StorageDirectory))
cfg.Database.Naffka = config.DataSource(fmt.Sprintf("file:%s/dendrite-naffka.db", m.StorageDirectory)) cfg.Database.Naffka = config.DataSource(fmt.Sprintf("file:%s/dendrite-naffka.db", m.StorageDirectory))
cfg.Media.BasePath = config.Path(fmt.Sprintf("%s/tmp", m.StorageDirectory))
cfg.Media.AbsBasePath = config.Path(fmt.Sprintf("%s/tmp", m.StorageDirectory))
if err = cfg.Derive(); err != nil { if err = cfg.Derive(); err != nil {
panic(err) panic(err)
} }
@ -158,12 +169,39 @@ func (m *DendriteMonolith) Start() {
base.UseHTTPAPIs, base.UseHTTPAPIs,
) )
ygg.NewSession = func(serverName gomatrixserverlib.ServerName) {
logrus.Infof("Found new session %q", serverName)
time.Sleep(time.Second * 3)
req := &api.PerformServersAliveRequest{
Servers: []gomatrixserverlib.ServerName{serverName},
}
res := &api.PerformServersAliveResponse{}
if err := fsAPI.PerformServersAlive(context.TODO(), req, res); err != nil {
logrus.WithError(err).Warn("Failed to notify server alive due to new session")
}
}
ygg.NotifyLinkNew(func(_ crypto.BoxPubKey, sigPubKey crypto.SigPubKey, linkType, remote string) {
serverName := hex.EncodeToString(sigPubKey[:])
logrus.Infof("Found new peer %q", serverName)
time.Sleep(time.Second * 3)
req := &api.PerformServersAliveRequest{
Servers: []gomatrixserverlib.ServerName{
gomatrixserverlib.ServerName(serverName),
},
}
res := &api.PerformServersAliveResponse{}
if err := fsAPI.PerformServersAlive(context.TODO(), req, res); err != nil {
logrus.WithError(err).Warn("Failed to notify server alive due to new session")
}
})
// Build both ends of a HTTP multiplex. // Build both ends of a HTTP multiplex.
httpServer := &http.Server{ m.httpServer = &http.Server{
Addr: ":0", Addr: ":0",
TLSNextProto: map[string]func(*http.Server, *tls.Conn, http.Handler){}, TLSNextProto: map[string]func(*http.Server, *tls.Conn, http.Handler){},
ReadTimeout: 15 * time.Second, ReadTimeout: 30 * time.Second,
WriteTimeout: 45 * time.Second, WriteTimeout: 30 * time.Second,
IdleTimeout: 60 * time.Second, IdleTimeout: 60 * time.Second,
BaseContext: func(_ net.Listener) context.Context { BaseContext: func(_ net.Listener) context.Context {
return context.Background() return context.Background()
@ -171,19 +209,33 @@ func (m *DendriteMonolith) Start() {
Handler: base.BaseMux, Handler: base.BaseMux,
} }
go func() { m.Resume()
logger.Info("Listening on ", ygg.DerivedServerName())
logger.Fatal(httpServer.Serve(ygg))
}()
go func() {
logger.Info("Listening on ", m.BaseURL())
logger.Fatal(httpServer.Serve(m.listener))
}()
} }
func (m *DendriteMonolith) Stop() { func (m *DendriteMonolith) Resume() {
if err := m.listener.Close(); err != nil { logrus.Info("Resuming monolith")
logrus.Warn("Error stopping listener:", err) if listener, err := net.Listen("tcp", "localhost:65432"); err == nil {
m.listener = listener
}
if m.yggListening.CAS(false, true) {
go func() {
m.logger.Info("Listening on ", m.YggdrasilNode.DerivedServerName())
m.logger.Fatal(m.httpServer.Serve(m.YggdrasilNode))
m.yggListening.Store(false)
}()
}
if m.httpListening.CAS(false, true) {
go func() {
m.logger.Info("Listening on ", m.BaseURL())
m.logger.Fatal(m.httpServer.Serve(m.listener))
m.httpListening.Store(false)
}()
}
}
func (m *DendriteMonolith) Suspend() {
m.logger.Info("Suspending monolith")
if err := m.httpServer.Close(); err != nil {
m.logger.Warn("Error stopping HTTP server:", err)
} }
m.YggdrasilNode.Stop()
} }

View file

@ -0,0 +1,75 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package auth
import (
"context"
"net/http"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/userutil"
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/util"
)
type GetAccountByPassword func(ctx context.Context, localpart, password string) (*api.Account, error)
type PasswordRequest struct {
Login
Password string `json:"password"`
}
// LoginTypePassword implements https://matrix.org/docs/spec/client_server/r0.6.1#password-based
type LoginTypePassword struct {
GetAccountByPassword GetAccountByPassword
Config *config.Dendrite
}
func (t *LoginTypePassword) Name() string {
return "m.login.password"
}
func (t *LoginTypePassword) Request() interface{} {
return &PasswordRequest{}
}
func (t *LoginTypePassword) Login(ctx context.Context, req interface{}) (*Login, *util.JSONResponse) {
r := req.(*PasswordRequest)
username := r.Username()
if username == "" {
return nil, &util.JSONResponse{
Code: http.StatusUnauthorized,
JSON: jsonerror.BadJSON("'user' must be supplied."),
}
}
localpart, err := userutil.ParseUsernameParam(username, &t.Config.Matrix.ServerName)
if err != nil {
return nil, &util.JSONResponse{
Code: http.StatusUnauthorized,
JSON: jsonerror.InvalidUsername(err.Error()),
}
}
_, err = t.GetAccountByPassword(ctx, localpart, r.Password)
if err != nil {
// Technically we could tell them if the user does not exist by checking if err == sql.ErrNoRows
// but that would leak the existence of the user.
return nil, &util.JSONResponse{
Code: http.StatusForbidden,
JSON: jsonerror.Forbidden("username or password was incorrect, or the account does not exist"),
}
}
return &r.Login, nil
}

View file

@ -0,0 +1,248 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package auth
import (
"context"
"encoding/json"
"net/http"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
"github.com/tidwall/gjson"
)
// Type represents an auth type
// https://matrix.org/docs/spec/client_server/r0.6.1#authentication-types
type Type interface {
// Name returns the name of the auth type e.g `m.login.password`
Name() string
// Request returns a pointer to a new request body struct to unmarshal into.
Request() interface{}
// Login with the auth type, returning an error response on failure.
// Not all types support login, only m.login.password and m.login.token
// See https://matrix.org/docs/spec/client_server/r0.6.1#post-matrix-client-r0-login
// `req` is guaranteed to be the type returned from Request()
// This function will be called when doing login and when doing 'sudo' style
// actions e.g deleting devices. The response must be a 401 as per:
// "If the homeserver decides that an attempt on a stage was unsuccessful, but the
// client may make a second attempt, it returns the same HTTP status 401 response as above,
// with the addition of the standard errcode and error fields describing the error."
Login(ctx context.Context, req interface{}) (login *Login, errRes *util.JSONResponse)
// TODO: Extend to support Register() flow
// Register(ctx context.Context, sessionID string, req interface{})
}
// LoginIdentifier represents identifier types
// https://matrix.org/docs/spec/client_server/r0.6.1#identifier-types
type LoginIdentifier struct {
Type string `json:"type"`
// when type = m.id.user
User string `json:"user"`
// when type = m.id.thirdparty
Medium string `json:"medium"`
Address string `json:"address"`
}
// Login represents the shared fields used in all forms of login/sudo endpoints.
type Login struct {
Type string `json:"type"`
Identifier LoginIdentifier `json:"identifier"`
User string `json:"user"` // deprecated in favour of identifier
Medium string `json:"medium"` // deprecated in favour of identifier
Address string `json:"address"` // deprecated in favour of identifier
// Both DeviceID and InitialDisplayName can be omitted, or empty strings ("")
// Thus a pointer is needed to differentiate between the two
InitialDisplayName *string `json:"initial_device_display_name"`
DeviceID *string `json:"device_id"`
}
// Username returns the user localpart/user_id in this request, if it exists.
func (r *Login) Username() string {
if r.Identifier.Type == "m.id.user" {
return r.Identifier.User
}
// deprecated but without it Riot iOS won't log in
return r.User
}
// ThirdPartyID returns the 3PID medium and address for this login, if it exists.
func (r *Login) ThirdPartyID() (medium, address string) {
if r.Identifier.Type == "m.id.thirdparty" {
return r.Identifier.Medium, r.Identifier.Address
}
// deprecated
if r.Medium == "email" {
return "email", r.Address
}
return "", ""
}
type userInteractiveFlow struct {
Stages []string `json:"stages"`
}
// UserInteractive checks that the user is who they claim to be, via a UI auth.
// This is used for things like device deletion and password reset where
// the user already has a valid access token, but we want to double-check
// that it isn't stolen by re-authenticating them.
type UserInteractive struct {
Flows []userInteractiveFlow
// Map of login type to implementation
Types map[string]Type
// Map of session ID to completed login types, will need to be extended in future
Sessions map[string][]string
}
func NewUserInteractive(getAccByPass GetAccountByPassword, cfg *config.Dendrite) *UserInteractive {
typePassword := &LoginTypePassword{
GetAccountByPassword: getAccByPass,
Config: cfg,
}
// TODO: Add SSO login
return &UserInteractive{
Flows: []userInteractiveFlow{
{
Stages: []string{typePassword.Name()},
},
},
Types: map[string]Type{
typePassword.Name(): typePassword,
},
Sessions: make(map[string][]string),
}
}
func (u *UserInteractive) IsSingleStageFlow(authType string) bool {
for _, f := range u.Flows {
if len(f.Stages) == 1 && f.Stages[0] == authType {
return true
}
}
return false
}
func (u *UserInteractive) AddCompletedStage(sessionID, authType string) {
// TODO: Handle multi-stage flows
delete(u.Sessions, sessionID)
}
// Challenge returns an HTTP 401 with the supported flows for authenticating
func (u *UserInteractive) Challenge(sessionID string) *util.JSONResponse {
return &util.JSONResponse{
Code: 401,
JSON: struct {
Flows []userInteractiveFlow `json:"flows"`
Session string `json:"session"`
// TODO: Return any additional `params`
Params map[string]interface{} `json:"params"`
}{
u.Flows,
sessionID,
make(map[string]interface{}),
},
}
}
// NewSession returns a challenge with a new session ID and remembers the session ID
func (u *UserInteractive) NewSession() *util.JSONResponse {
sessionID, err := GenerateAccessToken()
if err != nil {
logrus.WithError(err).Error("failed to generate session ID")
res := jsonerror.InternalServerError()
return &res
}
u.Sessions[sessionID] = []string{}
return u.Challenge(sessionID)
}
// ResponseWithChallenge mixes together a JSON body (e.g an error with errcode/message) with the
// standard challenge response.
func (u *UserInteractive) ResponseWithChallenge(sessionID string, response interface{}) *util.JSONResponse {
mixedObjects := make(map[string]interface{})
b, err := json.Marshal(response)
if err != nil {
ise := jsonerror.InternalServerError()
return &ise
}
_ = json.Unmarshal(b, &mixedObjects)
challenge := u.Challenge(sessionID)
b, err = json.Marshal(challenge.JSON)
if err != nil {
ise := jsonerror.InternalServerError()
return &ise
}
_ = json.Unmarshal(b, &mixedObjects)
return &util.JSONResponse{
Code: 401,
JSON: mixedObjects,
}
}
// Verify returns an error/challenge response to send to the client, or nil if the user is authenticated.
// `bodyBytes` is the HTTP request body which must contain an `auth` key.
// Returns the login that was verified for additional checks if required.
func (u *UserInteractive) Verify(ctx context.Context, bodyBytes []byte, device *api.Device) (*Login, *util.JSONResponse) {
// TODO: rate limit
// "A client should first make a request with no auth parameter. The homeserver returns an HTTP 401 response, with a JSON body"
// https://matrix.org/docs/spec/client_server/r0.6.1#user-interactive-api-in-the-rest-api
hasResponse := gjson.GetBytes(bodyBytes, "auth").Exists()
if !hasResponse {
return nil, u.NewSession()
}
// extract the type so we know which login type to use
authType := gjson.GetBytes(bodyBytes, "auth.type").Str
loginType, ok := u.Types[authType]
if !ok {
return nil, &util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.BadJSON("unknown auth.type: " + authType),
}
}
// retrieve the session
sessionID := gjson.GetBytes(bodyBytes, "auth.session").Str
if _, ok = u.Sessions[sessionID]; !ok {
// if the login type is part of a single stage flow then allow them to omit the session ID
if !u.IsSingleStageFlow(authType) {
return nil, &util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.Unknown("missing or unknown auth.session"),
}
}
}
r := loginType.Request()
if err := json.Unmarshal([]byte(gjson.GetBytes(bodyBytes, "auth").Raw), r); err != nil {
return nil, &util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.BadJSON("The request body could not be decoded into valid JSON. " + err.Error()),
}
}
login, resErr := loginType.Login(ctx, r)
if resErr == nil {
u.AddCompletedStage(sessionID, authType)
// TODO: Check if there's more stages to go and return an error
return login, nil
}
return nil, u.ResponseWithChallenge(sessionID, resErr.JSON)
}

View file

@ -0,0 +1,174 @@
package auth
import (
"context"
"encoding/json"
"fmt"
"testing"
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
)
var (
ctx = context.Background()
serverName = gomatrixserverlib.ServerName("example.com")
// space separated localpart+password -> account
lookup = make(map[string]*api.Account)
device = &api.Device{
AccessToken: "flibble",
DisplayName: "My Device",
ID: "device_id_goes_here",
}
)
func getAccountByPassword(ctx context.Context, localpart, plaintextPassword string) (*api.Account, error) {
acc, ok := lookup[localpart+" "+plaintextPassword]
if !ok {
return nil, fmt.Errorf("unknown user/password")
}
return acc, nil
}
func setup() *UserInteractive {
cfg := &config.Dendrite{}
cfg.Matrix.ServerName = serverName
return NewUserInteractive(getAccountByPassword, cfg)
}
func TestUserInteractiveChallenge(t *testing.T) {
uia := setup()
// no auth key results in a challenge
_, errRes := uia.Verify(ctx, []byte(`{}`), device)
if errRes == nil {
t.Fatalf("Verify succeeded with {} but expected failure")
}
if errRes.Code != 401 {
t.Errorf("Expected HTTP 401, got %d", errRes.Code)
}
}
func TestUserInteractivePasswordLogin(t *testing.T) {
uia := setup()
// valid password login succeeds when an account exists
lookup["alice herpassword"] = &api.Account{
Localpart: "alice",
ServerName: serverName,
UserID: fmt.Sprintf("@alice:%s", serverName),
}
// valid password requests
testCases := []json.RawMessage{
// deprecated form
[]byte(`{
"auth": {
"type": "m.login.password",
"user": "alice",
"password": "herpassword"
}
}`),
// new form
[]byte(`{
"auth": {
"type": "m.login.password",
"identifier": {
"type": "m.id.user",
"user": "alice"
},
"password": "herpassword"
}
}`),
}
for _, tc := range testCases {
_, errRes := uia.Verify(ctx, tc, device)
if errRes != nil {
t.Errorf("Verify failed but expected success for request: %s - got %+v", string(tc), errRes)
}
}
}
func TestUserInteractivePasswordBadLogin(t *testing.T) {
uia := setup()
// password login fails when an account exists but is specced wrong
lookup["bob hispassword"] = &api.Account{
Localpart: "bob",
ServerName: serverName,
UserID: fmt.Sprintf("@bob:%s", serverName),
}
// invalid password requests
testCases := []struct {
body json.RawMessage
wantRes util.JSONResponse
}{
{
// fields not in an auth dict
body: []byte(`{
"type": "m.login.password",
"user": "bob",
"password": "hispassword"
}`),
wantRes: util.JSONResponse{
Code: 401,
},
},
{
// wrong type
body: []byte(`{
"auth": {
"type": "m.login.not_password",
"identifier": {
"type": "m.id.user",
"user": "bob"
},
"password": "hispassword"
}
}`),
wantRes: util.JSONResponse{
Code: 400,
},
},
{
// identifier type is wrong
body: []byte(`{
"auth": {
"type": "m.login.password",
"identifier": {
"type": "m.id.thirdparty",
"user": "bob"
},
"password": "hispassword"
}
}`),
wantRes: util.JSONResponse{
Code: 401,
},
},
{
// wrong password
body: []byte(`{
"auth": {
"type": "m.login.password",
"identifier": {
"type": "m.id.user",
"user": "bob"
},
"password": "not_his_password"
}
}`),
wantRes: util.JSONResponse{
Code: 401,
},
},
}
for _, tc := range testCases {
_, errRes := uia.Verify(ctx, tc.body, device)
if errRes == nil {
t.Errorf("Verify succeeded but expected failure for request: %s", string(tc.body))
continue
}
if errRes.Code != tc.wantRes.Code {
t.Errorf("got code %d want code %d for request: %s", errRes.Code, tc.wantRes.Code, string(tc.body))
}
}
}

View file

@ -17,8 +17,10 @@ package routing
import ( import (
"database/sql" "database/sql"
"encoding/json" "encoding/json"
"io/ioutil"
"net/http" "net/http"
"github.com/matrix-org/dendrite/clientapi/auth"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/storage/devices" "github.com/matrix-org/dendrite/userapi/storage/devices"
@ -72,7 +74,8 @@ func GetDeviceByID(
return util.JSONResponse{ return util.JSONResponse{
Code: http.StatusOK, Code: http.StatusOK,
JSON: deviceJSON{ JSON: deviceJSON{
DeviceID: dev.ID, DeviceID: dev.ID,
DisplayName: dev.DisplayName,
}, },
} }
} }
@ -99,7 +102,8 @@ func GetDevicesByLocalpart(
for _, dev := range deviceList { for _, dev := range deviceList {
res.Devices = append(res.Devices, deviceJSON{ res.Devices = append(res.Devices, deviceJSON{
DeviceID: dev.ID, DeviceID: dev.ID,
DisplayName: dev.DisplayName,
}) })
} }
@ -161,20 +165,40 @@ func UpdateDeviceByID(
// DeleteDeviceById handles DELETE requests to /devices/{deviceId} // DeleteDeviceById handles DELETE requests to /devices/{deviceId}
func DeleteDeviceById( func DeleteDeviceById(
req *http.Request, deviceDB devices.Database, device *api.Device, req *http.Request, userInteractiveAuth *auth.UserInteractive, deviceDB devices.Database, device *api.Device,
deviceID string, deviceID string,
) util.JSONResponse { ) util.JSONResponse {
ctx := req.Context()
defer req.Body.Close() // nolint:errcheck
bodyBytes, err := ioutil.ReadAll(req.Body)
if err != nil {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.BadJSON("The request body could not be read: " + err.Error()),
}
}
login, errRes := userInteractiveAuth.Verify(ctx, bodyBytes, device)
if errRes != nil {
return *errRes
}
localpart, _, err := gomatrixserverlib.SplitID('@', device.UserID) localpart, _, err := gomatrixserverlib.SplitID('@', device.UserID)
if err != nil { if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("gomatrixserverlib.SplitID failed") util.GetLogger(ctx).WithError(err).Error("gomatrixserverlib.SplitID failed")
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
} }
ctx := req.Context()
defer req.Body.Close() // nolint: errcheck // make sure that the access token being used matches the login creds used for user interactive auth, else
// 1 compromised access token could be used to logout all devices.
if login.Username() != localpart && login.Username() != device.UserID {
return util.JSONResponse{
Code: 403,
JSON: jsonerror.Forbidden("Cannot delete another user's device"),
}
}
if err := deviceDB.RemoveDevice(ctx, deviceID, localpart); err != nil { if err := deviceDB.RemoveDevice(ctx, deviceID, localpart); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("deviceDB.RemoveDevice failed") util.GetLogger(ctx).WithError(err).Error("deviceDB.RemoveDevice failed")
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
} }

View file

@ -31,3 +31,10 @@ func QueryKeys(
}, },
} }
} }
func UploadKeys(req *http.Request) util.JSONResponse {
return util.JSONResponse{
Code: 200,
JSON: struct{}{},
}
}

View file

@ -15,46 +15,20 @@
package routing package routing
import ( import (
"net/http"
"context" "context"
"net/http"
"github.com/matrix-org/dendrite/clientapi/auth" "github.com/matrix-org/dendrite/clientapi/auth"
"github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/userutil" "github.com/matrix-org/dendrite/clientapi/userutil"
"github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/storage/accounts" "github.com/matrix-org/dendrite/userapi/storage/accounts"
"github.com/matrix-org/dendrite/userapi/storage/devices" "github.com/matrix-org/dendrite/userapi/storage/devices"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
) )
type loginFlows struct {
Flows []flow `json:"flows"`
}
type flow struct {
Type string `json:"type"`
Stages []string `json:"stages"`
}
type loginIdentifier struct {
Type string `json:"type"`
User string `json:"user"`
}
type passwordRequest struct {
Identifier loginIdentifier `json:"identifier"`
User string `json:"user"` // deprecated in favour of identifier
Password string `json:"password"`
// Both DeviceID and InitialDisplayName can be omitted, or empty strings ("")
// Thus a pointer is needed to differentiate between the two
InitialDisplayName *string `json:"initial_device_display_name"`
DeviceID *string `json:"device_id"`
}
type loginResponse struct { type loginResponse struct {
UserID string `json:"user_id"` UserID string `json:"user_id"`
AccessToken string `json:"access_token"` AccessToken string `json:"access_token"`
@ -62,9 +36,21 @@ type loginResponse struct {
DeviceID string `json:"device_id"` DeviceID string `json:"device_id"`
} }
func passwordLogin() loginFlows { type flows struct {
f := loginFlows{} Flows []flow `json:"flows"`
s := flow{"m.login.password", []string{"m.login.password"}} }
type flow struct {
Type string `json:"type"`
Stages []string `json:"stages"`
}
func passwordLogin() flows {
f := flows{}
s := flow{
Type: "m.login.password",
Stages: []string{"m.login.password"},
}
f.Flows = append(f.Flows, s) f.Flows = append(f.Flows, s)
return f return f
} }
@ -74,69 +60,28 @@ func Login(
req *http.Request, accountDB accounts.Database, deviceDB devices.Database, req *http.Request, accountDB accounts.Database, deviceDB devices.Database,
cfg *config.Dendrite, cfg *config.Dendrite,
) util.JSONResponse { ) util.JSONResponse {
if req.Method == http.MethodGet { // TODO: support other forms of login other than password, depending on config options if req.Method == http.MethodGet {
// TODO: support other forms of login other than password, depending on config options
return util.JSONResponse{ return util.JSONResponse{
Code: http.StatusOK, Code: http.StatusOK,
JSON: passwordLogin(), JSON: passwordLogin(),
} }
} else if req.Method == http.MethodPost { } else if req.Method == http.MethodPost {
var r passwordRequest typePassword := auth.LoginTypePassword{
var acc *api.Account GetAccountByPassword: accountDB.GetAccountByPassword,
var errJSON *util.JSONResponse Config: cfg,
resErr := httputil.UnmarshalJSONRequest(req, &r) }
r := typePassword.Request()
resErr := httputil.UnmarshalJSONRequest(req, r)
if resErr != nil { if resErr != nil {
return *resErr return *resErr
} }
switch r.Identifier.Type { login, authErr := typePassword.Login(req.Context(), r)
case "m.id.user": if authErr != nil {
if r.Identifier.User == "" { return *authErr
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.BadJSON("'user' must be supplied."),
}
}
acc, errJSON = r.processUsernamePasswordLoginRequest(req, accountDB, cfg, r.Identifier.User)
if errJSON != nil {
return *errJSON
}
default:
// TODO: The below behaviour is deprecated but without it Riot iOS won't log in
if r.User != "" {
acc, errJSON = r.processUsernamePasswordLoginRequest(req, accountDB, cfg, r.User)
if errJSON != nil {
return *errJSON
}
} else {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.BadJSON("login identifier '" + r.Identifier.Type + "' not supported"),
}
}
}
token, err := auth.GenerateAccessToken()
if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("auth.GenerateAccessToken failed")
return jsonerror.InternalServerError()
}
dev, err := getDevice(req.Context(), r, deviceDB, acc, token)
if err != nil {
return util.JSONResponse{
Code: http.StatusInternalServerError,
JSON: jsonerror.Unknown("failed to create device: " + err.Error()),
}
}
return util.JSONResponse{
Code: http.StatusOK,
JSON: loginResponse{
UserID: dev.UserID,
AccessToken: dev.AccessToken,
HomeServer: cfg.Matrix.ServerName,
DeviceID: dev.ID,
},
} }
// make a device/access token
return completeAuth(req.Context(), cfg.Matrix.ServerName, deviceDB, login)
} }
return util.JSONResponse{ return util.JSONResponse{
Code: http.StatusMethodNotAllowed, Code: http.StatusMethodNotAllowed,
@ -144,45 +89,38 @@ func Login(
} }
} }
// getDevice returns a new or existing device func completeAuth(
func getDevice( ctx context.Context, serverName gomatrixserverlib.ServerName, deviceDB devices.Database, login *auth.Login,
ctx context.Context, ) util.JSONResponse {
r passwordRequest, token, err := auth.GenerateAccessToken()
deviceDB devices.Database, if err != nil {
acc *api.Account, util.GetLogger(ctx).WithError(err).Error("auth.GenerateAccessToken failed")
token string, return jsonerror.InternalServerError()
) (dev *api.Device, err error) { }
dev, err = deviceDB.CreateDevice(
ctx, acc.Localpart, r.DeviceID, token, r.InitialDisplayName, localpart, err := userutil.ParseUsernameParam(login.Username(), &serverName)
if err != nil {
util.GetLogger(ctx).WithError(err).Error("auth.ParseUsernameParam failed")
return jsonerror.InternalServerError()
}
dev, err := deviceDB.CreateDevice(
ctx, localpart, login.DeviceID, token, login.InitialDisplayName,
) )
return
}
func (r *passwordRequest) processUsernamePasswordLoginRequest(
req *http.Request, accountDB accounts.Database,
cfg *config.Dendrite, username string,
) (acc *api.Account, errJSON *util.JSONResponse) {
util.GetLogger(req.Context()).WithField("user", username).Info("Processing login request")
localpart, err := userutil.ParseUsernameParam(username, &cfg.Matrix.ServerName)
if err != nil { if err != nil {
errJSON = &util.JSONResponse{ return util.JSONResponse{
Code: http.StatusBadRequest, Code: http.StatusInternalServerError,
JSON: jsonerror.InvalidUsername(err.Error()), JSON: jsonerror.Unknown("failed to create device: " + err.Error()),
} }
return
} }
acc, err = accountDB.GetAccountByPassword(req.Context(), localpart, r.Password) return util.JSONResponse{
if err != nil { Code: http.StatusOK,
// Technically we could tell them if the user does not exist by checking if err == sql.ErrNoRows JSON: loginResponse{
// but that would leak the existence of the user. UserID: dev.UserID,
errJSON = &util.JSONResponse{ AccessToken: dev.AccessToken,
Code: http.StatusForbidden, HomeServer: serverName,
JSON: jsonerror.Forbidden("username or password was incorrect, or the account does not exist"), DeviceID: dev.ID,
} },
return
} }
return
} }

View file

@ -22,6 +22,7 @@ import (
"github.com/gorilla/mux" "github.com/gorilla/mux"
appserviceAPI "github.com/matrix-org/dendrite/appservice/api" appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
"github.com/matrix-org/dendrite/clientapi/api" "github.com/matrix-org/dendrite/clientapi/api"
"github.com/matrix-org/dendrite/clientapi/auth"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/clientapi/producers"
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api" currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
@ -63,6 +64,7 @@ func Setup(
stateAPI currentstateAPI.CurrentStateInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI,
extRoomsProvider api.ExtraPublicRoomsProvider, extRoomsProvider api.ExtraPublicRoomsProvider,
) { ) {
userInteractiveAuth := auth.NewUserInteractive(accountDB.GetAccountByPassword, cfg)
publicAPIMux.Handle("/client/versions", publicAPIMux.Handle("/client/versions",
httputil.MakeExternalAPI("versions", func(req *http.Request) util.JSONResponse { httputil.MakeExternalAPI("versions", func(req *http.Request) util.JSONResponse {
@ -629,7 +631,7 @@ func Setup(
if err != nil { if err != nil {
return util.ErrorResponse(err) return util.ErrorResponse(err)
} }
return DeleteDeviceById(req, deviceDB, device, vars["deviceID"]) return DeleteDeviceById(req, userInteractiveAuth, deviceDB, device, vars["deviceID"])
}), }),
).Methods(http.MethodDelete, http.MethodOptions) ).Methods(http.MethodDelete, http.MethodOptions)
@ -693,4 +695,17 @@ func Setup(
return GetCapabilities(req, rsAPI) return GetCapabilities(req, rsAPI)
}), }),
).Methods(http.MethodGet) ).Methods(http.MethodGet)
r0mux.Handle("/keys/query",
httputil.MakeAuthAPI("queryKeys", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
return QueryKeys(req)
}),
).Methods(http.MethodPost, http.MethodOptions)
// Supplying a device ID is deprecated.
r0mux.Handle("/keys/upload/{deviceID}",
httputil.MakeAuthAPI("keys_upload", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
return UploadKeys(req)
}),
).Methods(http.MethodPost, http.MethodOptions)
} }

View file

@ -0,0 +1,22 @@
# Yggdrasil Demo
This is the Dendrite Yggdrasil demo! It's easy to get started - all you need is Go 1.13 or later.
To run the homeserver, start at the root of the Dendrite repository and run:
```
go run ./cmd/dendrite-demo-yggdrasil
```
The following command line arguments are accepted:
* `-peer tcp://a.b.c.d:e` to specify a static Yggdrasil peer to connect to - you will need to supply this if you do not have another Yggdrasil node on your network
* `-port 12345` to specify a port to listen on for client connections
If you need to find an internet peer, take a look at [this list](https://publicpeers.neilalexander.dev/).
Then point your favourite Matrix client to the homeserver URL`http://localhost:8008` (or whichever `-port` you specified), create an account and log in.
If your peering connection is operational then you should see a `Connected TCP:` line in the log output. If not then try a different peer.
Once logged in, you should be able to open the room directory or join a room by its ID.

View file

@ -17,6 +17,7 @@ package main
import ( import (
"context" "context"
"crypto/tls" "crypto/tls"
"encoding/hex"
"flag" "flag"
"fmt" "fmt"
"net" "net"
@ -32,6 +33,7 @@ import (
"github.com/matrix-org/dendrite/eduserver" "github.com/matrix-org/dendrite/eduserver"
"github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/federationsender" "github.com/matrix-org/dendrite/federationsender"
"github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/internal/httputil"
@ -39,6 +41,7 @@ import (
"github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/roomserver"
"github.com/matrix-org/dendrite/userapi" "github.com/matrix-org/dendrite/userapi"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
@ -60,7 +63,9 @@ func main() {
} }
ygg.SetMulticastEnabled(true) ygg.SetMulticastEnabled(true)
if instancePeer != nil && *instancePeer != "" { if instancePeer != nil && *instancePeer != "" {
ygg.SetStaticPeer(*instancePeer) if err = ygg.SetStaticPeer(*instancePeer); err != nil {
logrus.WithError(err).Error("Failed to set static peer")
}
} }
cfg := &config.Dendrite{} cfg := &config.Dendrite{}
@ -150,6 +155,31 @@ func main() {
base.UseHTTPAPIs, base.UseHTTPAPIs,
) )
ygg.NewSession = func(serverName gomatrixserverlib.ServerName) {
logrus.Infof("Found new session %q", serverName)
req := &api.PerformServersAliveRequest{
Servers: []gomatrixserverlib.ServerName{serverName},
}
res := &api.PerformServersAliveResponse{}
if err := fsAPI.PerformServersAlive(context.TODO(), req, res); err != nil {
logrus.WithError(err).Warn("Failed to notify server alive due to new session")
}
}
ygg.NotifyLinkNew(func(_ crypto.BoxPubKey, sigPubKey crypto.SigPubKey, linkType, remote string) {
serverName := hex.EncodeToString(sigPubKey[:])
logrus.Infof("Found new peer %q", serverName)
req := &api.PerformServersAliveRequest{
Servers: []gomatrixserverlib.ServerName{
gomatrixserverlib.ServerName(serverName),
},
}
res := &api.PerformServersAliveResponse{}
if err := fsAPI.PerformServersAlive(context.TODO(), req, res); err != nil {
logrus.WithError(err).Warn("Failed to notify server alive due to new session")
}
})
// Build both ends of a HTTP multiplex. // Build both ends of a HTTP multiplex.
httpServer := &http.Server{ httpServer := &http.Server{
Addr: ":0", Addr: ":0",

View file

@ -46,7 +46,8 @@ func (n *Node) CreateClient(
tr.RegisterProtocol( tr.RegisterProtocol(
"matrix", &yggroundtripper{ "matrix", &yggroundtripper{
inner: &http.Transport{ inner: &http.Transport{
ResponseHeaderTimeout: 15 * time.Second, TLSHandshakeTimeout: 20 * time.Second,
ResponseHeaderTimeout: 10 * time.Second,
IdleConnTimeout: 60 * time.Second, IdleConnTimeout: 60 * time.Second,
DialContext: n.yggdialerctx, DialContext: n.yggdialerctx,
}, },
@ -62,7 +63,8 @@ func (n *Node) CreateFederationClient(
tr.RegisterProtocol( tr.RegisterProtocol(
"matrix", &yggroundtripper{ "matrix", &yggroundtripper{
inner: &http.Transport{ inner: &http.Transport{
ResponseHeaderTimeout: 15 * time.Second, TLSHandshakeTimeout: 20 * time.Second,
ResponseHeaderTimeout: 10 * time.Second,
IdleConnTimeout: 60 * time.Second, IdleConnTimeout: 60 * time.Second,
DialContext: n.yggdialerctx, DialContext: n.yggdialerctx,
}, },

View file

@ -35,6 +35,7 @@ import (
yggdrasiladmin "github.com/yggdrasil-network/yggdrasil-go/src/admin" yggdrasiladmin "github.com/yggdrasil-network/yggdrasil-go/src/admin"
yggdrasilconfig "github.com/yggdrasil-network/yggdrasil-go/src/config" yggdrasilconfig "github.com/yggdrasil-network/yggdrasil-go/src/config"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
yggdrasilmulticast "github.com/yggdrasil-network/yggdrasil-go/src/multicast" yggdrasilmulticast "github.com/yggdrasil-network/yggdrasil-go/src/multicast"
"github.com/yggdrasil-network/yggdrasil-go/src/yggdrasil" "github.com/yggdrasil-network/yggdrasil-go/src/yggdrasil"
@ -54,6 +55,7 @@ type Node struct {
quicConfig *quic.Config quicConfig *quic.Config
sessions sync.Map // string -> quic.Session sessions sync.Map // string -> quic.Session
incoming chan QUICStream incoming chan QUICStream
NewSession func(remote gomatrixserverlib.ServerName)
} }
func (n *Node) BuildName() string { func (n *Node) BuildName() string {
@ -136,8 +138,8 @@ func Setup(instanceName, storageDirectory string) (*Node, error) {
MaxIncomingStreams: 0, MaxIncomingStreams: 0,
MaxIncomingUniStreams: 0, MaxIncomingUniStreams: 0,
KeepAlive: true, KeepAlive: true,
MaxIdleTimeout: time.Second * 900, MaxIdleTimeout: time.Minute * 15,
HandshakeTimeout: time.Second * 30, HandshakeTimeout: time.Second * 15,
} }
n.log.Println("Public curve25519:", n.core.EncryptionPublicKey()) n.log.Println("Public curve25519:", n.core.EncryptionPublicKey())
@ -188,7 +190,9 @@ func (n *Node) PeerCount() int {
} }
func (n *Node) KnownNodes() []gomatrixserverlib.ServerName { func (n *Node) KnownNodes() []gomatrixserverlib.ServerName {
nodemap := map[string]struct{}{} nodemap := map[string]struct{}{
"b5ae50589e50991dd9dd7d59c5c5f7a4521e8da5b603b7f57076272abc58b374": struct{}{},
}
for _, peer := range n.core.GetSwitchPeers() { for _, peer := range n.core.GetSwitchPeers() {
nodemap[hex.EncodeToString(peer.SigningKey[:])] = struct{}{} nodemap[hex.EncodeToString(peer.SigningKey[:])] = struct{}{}
} }
@ -262,3 +266,11 @@ func (n *Node) SetStaticPeer(uri string) error {
} }
return nil return nil
} }
func (n *Node) NotifyLinkNew(f func(boxPubKey crypto.BoxPubKey, sigPubKey crypto.SigPubKey, linkType, remote string)) {
n.core.NotifyLinkNew(f)
}
func (n *Node) NotifyLinkGone(f func(boxPubKey crypto.BoxPubKey, sigPubKey crypto.SigPubKey, linkType, remote string)) {
n.core.NotifyLinkGone(f)
}

View file

@ -29,6 +29,7 @@ import (
"time" "time"
"github.com/lucas-clemente/quic-go" "github.com/lucas-clemente/quic-go"
"github.com/matrix-org/gomatrixserverlib"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto" "github.com/yggdrasil-network/yggdrasil-go/src/crypto"
) )
@ -56,6 +57,12 @@ func (n *Node) listenFromYgg() {
func (n *Node) listenFromQUIC(session quic.Session) { func (n *Node) listenFromQUIC(session quic.Session) {
n.sessions.Store(session.RemoteAddr().String(), session) n.sessions.Store(session.RemoteAddr().String(), session)
defer n.sessions.Delete(session.RemoteAddr()) defer n.sessions.Delete(session.RemoteAddr())
if n.NewSession != nil {
if len(session.ConnectionState().PeerCertificates) == 1 {
subjectName := session.ConnectionState().PeerCertificates[0].Subject.CommonName
go n.NewSession(gomatrixserverlib.ServerName(subjectName))
}
}
for { for {
st, err := session.AcceptStream(context.TODO()) st, err := session.AcceptStream(context.TODO())
if err != nil { if err != nil {

View file

@ -24,9 +24,9 @@ func main() {
base := setup.NewBaseDendrite(cfg, "KeyServer", true) base := setup.NewBaseDendrite(cfg, "KeyServer", true)
defer base.Close() // nolint: errcheck defer base.Close() // nolint: errcheck
userAPI := base.UserAPIClient() intAPI := keyserver.NewInternalAPI()
keyserver.AddPublicRoutes(base.PublicAPIMux, base.Cfg, userAPI) keyserver.AddInternalRoutes(base.InternalAPIMux, intAPI)
base.SetupAndServeHTTP(string(base.Cfg.Bind.KeyServer), string(base.Cfg.Listen.KeyServer)) base.SetupAndServeHTTP(string(base.Cfg.Bind.KeyServer), string(base.Cfg.Listen.KeyServer))

View file

@ -27,6 +27,7 @@ import (
"github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/internal/setup" "github.com/matrix-org/dendrite/internal/setup"
"github.com/matrix-org/dendrite/keyserver"
"github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/roomserver"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/serverkeyapi" "github.com/matrix-org/dendrite/serverkeyapi"
@ -118,6 +119,7 @@ func main() {
rsImpl.SetFederationSenderAPI(fsAPI) rsImpl.SetFederationSenderAPI(fsAPI)
stateAPI := currentstateserver.NewInternalAPI(base.Cfg, base.KafkaConsumer) stateAPI := currentstateserver.NewInternalAPI(base.Cfg, base.KafkaConsumer)
keyAPI := keyserver.NewInternalAPI()
monolith := setup.Monolith{ monolith := setup.Monolith{
Config: base.Cfg, Config: base.Cfg,
@ -136,6 +138,7 @@ func main() {
ServerKeyAPI: serverKeyAPI, ServerKeyAPI: serverKeyAPI,
StateAPI: stateAPI, StateAPI: stateAPI,
UserAPI: userAPI, UserAPI: userAPI,
KeyAPI: keyAPI,
} }
monolith.AddAllPublicRoutes(base.PublicAPIMux) monolith.AddAllPublicRoutes(base.PublicAPIMux)

View file

@ -23,7 +23,6 @@ import (
"github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
) )
type Database struct { type Database struct {
@ -45,10 +44,7 @@ func (d *Database) RedactEvent(ctx context.Context, redactedEventID string, reda
return err return err
} }
if len(events) != 1 { if len(events) != 1 {
// this should never happen but is non-fatal // this will happen for all non-state events
util.GetLogger(ctx).WithField("redacted_event_id", redactedEventID).WithField("redaction_event_id", redactedBecause.EventID()).Warnf(
"RedactEvent: missing redacted event",
)
return nil return nil
} }
redactionEvent := redactedBecause.Unwrap() redactionEvent := redactedBecause.Unwrap()

View file

@ -32,6 +32,7 @@ import (
) )
const maxPDUsPerTransaction = 50 const maxPDUsPerTransaction = 50
const queueIdleTimeout = time.Second * 30
// destinationQueue is a queue of events for a single destination. // destinationQueue is a queue of events for a single destination.
// It is responsible for sending the events to the destination and // It is responsible for sending the events to the destination and
@ -52,7 +53,6 @@ type destinationQueue struct {
transactionIDMutex sync.Mutex // protects transactionID transactionIDMutex sync.Mutex // protects transactionID
transactionID gomatrixserverlib.TransactionID // last transaction ID transactionID gomatrixserverlib.TransactionID // last transaction ID
transactionCount atomic.Int32 // how many events in this transaction so far transactionCount atomic.Int32 // how many events in this transaction so far
pendingPDUs atomic.Int64 // how many PDUs are waiting to be sent
pendingEDUs []*gomatrixserverlib.EDU // owned by backgroundSend pendingEDUs []*gomatrixserverlib.EDU // owned by backgroundSend
pendingInvites []*gomatrixserverlib.InviteV2Request // owned by backgroundSend pendingInvites []*gomatrixserverlib.InviteV2Request // owned by backgroundSend
notifyPDUs chan bool // interrupts idle wait for PDUs notifyPDUs chan bool // interrupts idle wait for PDUs
@ -68,7 +68,6 @@ func (oq *destinationQueue) sendEvent(nid int64) {
log.Infof("%s is blacklisted; dropping event", oq.destination) log.Infof("%s is blacklisted; dropping event", oq.destination)
return return
} }
oq.wakeQueueIfNeeded()
// Create a transaction ID. We'll either do this if we don't have // Create a transaction ID. We'll either do this if we don't have
// one made up yet, or if we've exceeded the number of maximum // one made up yet, or if we've exceeded the number of maximum
// events allowed in a single tranaction. We'll reset the counter // events allowed in a single tranaction. We'll reset the counter
@ -95,11 +94,13 @@ func (oq *destinationQueue) sendEvent(nid int64) {
// We've successfully added a PDU to the transaction so increase // We've successfully added a PDU to the transaction so increase
// the counter. // the counter.
oq.transactionCount.Add(1) oq.transactionCount.Add(1)
// Signal that we've sent a new PDU. This will cause the queue to // Wake up the queue if it's asleep.
// wake up if it's asleep. The return to the Add function will only oq.wakeQueueIfNeeded()
// be 1 if the previous value was 0, e.g. nothing was waiting before. // If we're blocking on waiting PDUs then tell the queue that we
if oq.pendingPDUs.Add(1) == 1 { // have work to do.
oq.notifyPDUs <- true select {
case oq.notifyPDUs <- true:
default:
} }
} }
@ -138,26 +139,33 @@ func (oq *destinationQueue) wakeQueueIfNeeded() {
} }
// If we aren't running then wake up the queue. // If we aren't running then wake up the queue.
if !oq.running.Load() { if !oq.running.Load() {
// Look up how many events are pending in this queue. We need // Start the queue.
// to do this so that the queue thinks it has work to do.
count, err := oq.db.GetPendingPDUCount(
context.TODO(),
oq.destination,
)
if err == nil {
oq.pendingPDUs.Store(count)
log.Printf("Destination queue %q has %d pending PDUs", oq.destination, count)
} else {
log.WithError(err).Errorf("Can't get pending PDU count for %q destination queue", oq.destination)
}
if count > 0 {
oq.notifyPDUs <- true
}
// Then start the queue.
go oq.backgroundSend() go oq.backgroundSend()
} }
} }
// waitForPDUs returns a channel for pending PDUs, which will be
// used in backgroundSend select. It returns a closed channel if
// there is something pending right now, or an open channel if
// we're waiting for something.
func (oq *destinationQueue) waitForPDUs() chan bool {
pendingPDUs, err := oq.db.GetPendingPDUCount(context.TODO(), oq.destination)
if err != nil {
log.WithError(err).Errorf("Failed to get pending PDU count on queue %q", oq.destination)
}
// If there are PDUs pending right now then we'll return a closed
// channel. This will mean that the backgroundSend will not block.
if pendingPDUs > 0 {
ch := make(chan bool, 1)
close(ch)
return ch
}
// If there are no PDUs pending right now then instead we'll return
// the notify channel, so that backgroundSend can pick up normal
// notifications from sendEvent.
return oq.notifyPDUs
}
// backgroundSend is the worker goroutine for sending events. // backgroundSend is the worker goroutine for sending events.
// nolint:gocyclo // nolint:gocyclo
func (oq *destinationQueue) backgroundSend() { func (oq *destinationQueue) backgroundSend() {
@ -169,12 +177,15 @@ func (oq *destinationQueue) backgroundSend() {
defer oq.running.Store(false) defer oq.running.Store(false)
for { for {
pendingPDUs := false
// If we have nothing to do then wait either for incoming events, or // If we have nothing to do then wait either for incoming events, or
// until we hit an idle timeout. // until we hit an idle timeout.
select { select {
case <-oq.notifyPDUs: case <-oq.waitForPDUs():
// We were woken up because there are new PDUs waiting in the // We were woken up because there are new PDUs waiting in the
// database. // database.
pendingPDUs = true
case edu := <-oq.incomingEDUs: case edu := <-oq.incomingEDUs:
// EDUs are handled in-memory for now. We will try to keep // EDUs are handled in-memory for now. We will try to keep
// the ordering intact. // the ordering intact.
@ -204,10 +215,11 @@ func (oq *destinationQueue) backgroundSend() {
for len(oq.incomingInvites) > 0 { for len(oq.incomingInvites) > 0 {
oq.pendingInvites = append(oq.pendingInvites, <-oq.incomingInvites) oq.pendingInvites = append(oq.pendingInvites, <-oq.incomingInvites)
} }
case <-time.After(time.Second * 30): case <-time.After(queueIdleTimeout):
// The worker is idle so stop the goroutine. It'll get // The worker is idle so stop the goroutine. It'll get
// restarted automatically the next time we have an event to // restarted automatically the next time we have an event to
// send. // send.
log.Infof("Queue %q has been idle for %s, going to sleep", oq.destination, queueIdleTimeout)
return return
} }
@ -220,12 +232,13 @@ func (oq *destinationQueue) backgroundSend() {
select { select {
case <-time.After(duration): case <-time.After(duration):
case <-oq.interruptBackoff: case <-oq.interruptBackoff:
log.Infof("Interrupting backoff for %q", oq.destination)
} }
oq.backingOff.Store(false) oq.backingOff.Store(false)
} }
// If we have pending PDUs or EDUs then construct a transaction. // If we have pending PDUs or EDUs then construct a transaction.
if oq.pendingPDUs.Load() > 0 || len(oq.pendingEDUs) > 0 { if pendingPDUs || len(oq.pendingEDUs) > 0 {
// Try sending the next transaction and see what happens. // Try sending the next transaction and see what happens.
transaction, terr := oq.nextTransaction(oq.pendingEDUs) transaction, terr := oq.nextTransaction(oq.pendingEDUs)
if terr != nil { if terr != nil {
@ -236,13 +249,17 @@ func (oq *destinationQueue) backgroundSend() {
// buffers at this point. The PDU clean-up is already on a defer. // buffers at this point. The PDU clean-up is already on a defer.
oq.cleanPendingEDUs() oq.cleanPendingEDUs()
oq.cleanPendingInvites() oq.cleanPendingInvites()
log.Infof("Blacklisting %q due to errors", oq.destination)
return return
} else { } else {
// We haven't been told to give up terminally yet but we still have // We haven't been told to give up terminally yet but we still have
// PDUs waiting to be sent. By sending a message into the wake chan, // PDUs waiting to be sent. By sending a message into the wake chan,
// the next loop iteration will try processing these PDUs again, // the next loop iteration will try processing these PDUs again,
// subject to the backoff. // subject to the backoff.
oq.notifyPDUs <- true select {
case oq.notifyPDUs <- true:
default:
}
} }
} else if transaction { } else if transaction {
// If we successfully sent the transaction then clear out // If we successfully sent the transaction then clear out
@ -262,6 +279,7 @@ func (oq *destinationQueue) backgroundSend() {
if giveUp := oq.statistics.Failure(); giveUp { if giveUp := oq.statistics.Failure(); giveUp {
// It's been suggested that we should give up because // It's been suggested that we should give up because
// the backoff has exceeded a maximum allowable value. // the backoff has exceeded a maximum allowable value.
log.Infof("Blacklisting %q due to errors", oq.destination)
return return
} }
} else if sent > 0 { } else if sent > 0 {
@ -273,17 +291,6 @@ func (oq *destinationQueue) backgroundSend() {
oq.cleanPendingInvites() oq.cleanPendingInvites()
} }
} }
// If something else has come along while we were busy sending
// the previous transaction then we don't want the next loop
// iteration to sleep. Send a message if someone else hasn't
// already sent a wake-up.
if oq.pendingPDUs.Load() > 0 {
select {
case oq.notifyPDUs <- true:
default:
}
}
} }
} }
@ -349,7 +356,6 @@ func (oq *destinationQueue) nextTransaction(
// If we didn't get anything from the database and there are no // If we didn't get anything from the database and there are no
// pending EDUs then there's nothing to do - stop here. // pending EDUs then there's nothing to do - stop here.
if len(pdus) == 0 && len(pendingEDUs) == 0 { if len(pdus) == 0 && len(pendingEDUs) == 0 {
log.Warnf("no pdus/edus for nextTransaction for destination %q", oq.destination)
return false, nil return false, nil
} }
@ -381,15 +387,14 @@ func (oq *destinationQueue) nextTransaction(
// TODO: we should check for 500-ish fails vs 400-ish here, // TODO: we should check for 500-ish fails vs 400-ish here,
// since we shouldn't queue things indefinitely in response // since we shouldn't queue things indefinitely in response
// to a 400-ish error // to a 400-ish error
_, err = oq.client.SendTransaction(context.TODO(), t) ctx, cancel = context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
_, err = oq.client.SendTransaction(ctx, t)
switch err.(type) { switch err.(type) {
case nil: case nil:
// No error was returned so the transaction looks to have
// been successfully sent.
oq.pendingPDUs.Sub(int64(len(t.PDUs)))
// Clean up the transaction in the database. // Clean up the transaction in the database.
if err = oq.db.CleanTransactionPDUs( if err = oq.db.CleanTransactionPDUs(
context.TODO(), context.Background(),
t.Destination, t.Destination,
t.TransactionID, t.TransactionID,
); err != nil { ); err != nil {

View file

@ -35,7 +35,9 @@ type Database struct {
queuePDUsStatements queuePDUsStatements
queueJSONStatements queueJSONStatements
sqlutil.PartitionOffsetStatements sqlutil.PartitionOffsetStatements
db *sql.DB db *sql.DB
queuePDUsWriter *sqlutil.TransactionWriter
queueJSONWriter *sqlutil.TransactionWriter
} }
// NewDatabase opens a new database // NewDatabase opens a new database
@ -74,6 +76,9 @@ func (d *Database) prepare() error {
return err return err
} }
d.queuePDUsWriter = sqlutil.NewTransactionWriter()
d.queueJSONWriter = sqlutil.NewTransactionWriter()
return d.PartitionOffsetStatements.Prepare(d.db, "federationsender") return d.PartitionOffsetStatements.Prepare(d.db, "federationsender")
} }
@ -145,12 +150,16 @@ func (d *Database) GetJoinedHosts(
// metadata entries. // metadata entries.
func (d *Database) StoreJSON( func (d *Database) StoreJSON(
ctx context.Context, js string, ctx context.Context, js string,
) (int64, error) { ) (nid int64, err error) {
nid, err := d.insertQueueJSON(ctx, nil, js) err = d.queueJSONWriter.Do(d.db, func(txn *sql.Tx) error {
if err != nil { n, e := d.insertQueueJSON(ctx, nil, js)
return 0, fmt.Errorf("d.insertQueueJSON: %w", err) if e != nil {
} return fmt.Errorf("d.insertQueueJSON: %w", e)
return nid, nil }
nid = n
return nil
})
return
} }
// AssociatePDUWithDestination creates an association that the // AssociatePDUWithDestination creates an association that the
@ -162,7 +171,7 @@ func (d *Database) AssociatePDUWithDestination(
serverName gomatrixserverlib.ServerName, serverName gomatrixserverlib.ServerName,
nids []int64, nids []int64,
) error { ) error {
return sqlutil.WithTransaction(d.db, func(txn *sql.Tx) error { return d.queuePDUsWriter.Do(d.db, func(txn *sql.Tx) error {
for _, nid := range nids { for _, nid := range nids {
if err := d.insertQueuePDU( if err := d.insertQueuePDU(
ctx, // context ctx, // context
@ -230,36 +239,38 @@ func (d *Database) CleanTransactionPDUs(
serverName gomatrixserverlib.ServerName, serverName gomatrixserverlib.ServerName,
transactionID gomatrixserverlib.TransactionID, transactionID gomatrixserverlib.TransactionID,
) error { ) error {
return sqlutil.WithTransaction(d.db, func(txn *sql.Tx) error { var deleteNIDs []int64
nids, err := d.selectQueuePDUs(ctx, txn, serverName, transactionID, 50) nids, err := d.selectQueuePDUs(ctx, nil, serverName, transactionID, 50)
if err != nil { if err != nil {
return fmt.Errorf("d.selectQueuePDUs: %w", err) return fmt.Errorf("d.selectQueuePDUs: %w", err)
} }
if err = d.queuePDUsWriter.Do(d.db, func(txn *sql.Tx) error {
if err = d.deleteQueueTransaction(ctx, txn, serverName, transactionID); err != nil { if err = d.deleteQueueTransaction(ctx, txn, serverName, transactionID); err != nil {
return fmt.Errorf("d.deleteQueueTransaction: %w", err) return fmt.Errorf("d.deleteQueueTransaction: %w", err)
} }
return nil
var count int64 }); err != nil {
var deleteNIDs []int64 return err
for _, nid := range nids { }
count, err = d.selectQueueReferenceJSONCount(ctx, txn, nid) var count int64
if err != nil { for _, nid := range nids {
return fmt.Errorf("d.selectQueueReferenceJSONCount: %w", err) count, err = d.selectQueueReferenceJSONCount(ctx, nil, nid)
} if err != nil {
if count == 0 { return fmt.Errorf("d.selectQueueReferenceJSONCount: %w", err)
deleteNIDs = append(deleteNIDs, nid)
}
} }
if count == 0 {
if len(deleteNIDs) > 0 { deleteNIDs = append(deleteNIDs, nid)
}
}
if len(deleteNIDs) > 0 {
err = d.queueJSONWriter.Do(d.db, func(txn *sql.Tx) error {
if err = d.deleteQueueJSON(ctx, txn, deleteNIDs); err != nil { if err = d.deleteQueueJSON(ctx, txn, deleteNIDs); err != nil {
return fmt.Errorf("d.deleteQueueJSON: %w", err) return fmt.Errorf("d.deleteQueueJSON: %w", err)
} }
} return nil
})
return nil }
}) return err
} }
// GetPendingPDUCount returns the number of PDUs waiting to be // GetPendingPDUCount returns the number of PDUs waiting to be

4
go.mod
View file

@ -21,7 +21,7 @@ require (
github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4 github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4
github.com/matrix-org/go-sqlite3-js v0.0.0-20200522092705-bc8506ccbcf3 github.com/matrix-org/go-sqlite3-js v0.0.0-20200522092705-bc8506ccbcf3
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26
github.com/matrix-org/gomatrixserverlib v0.0.0-20200707103800-7470b03f069b github.com/matrix-org/gomatrixserverlib v0.0.0-20200714093724-6c9a3db6c0ed
github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7
github.com/mattn/go-sqlite3 v2.0.2+incompatible github.com/mattn/go-sqlite3 v2.0.2+incompatible
@ -36,7 +36,7 @@ require (
github.com/uber-go/atomic v1.3.0 // indirect github.com/uber-go/atomic v1.3.0 // indirect
github.com/uber/jaeger-client-go v2.15.0+incompatible github.com/uber/jaeger-client-go v2.15.0+incompatible
github.com/uber/jaeger-lib v1.5.0 github.com/uber/jaeger-lib v1.5.0
github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20200707124509-16343a00055c github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20200713083728-5a765b33d55b
go.uber.org/atomic v1.4.0 go.uber.org/atomic v1.4.0
golang.org/x/crypto v0.0.0-20200423211502-4bdfaf469ed5 golang.org/x/crypto v0.0.0-20200423211502-4bdfaf469ed5
gopkg.in/h2non/bimg.v1 v1.0.18 gopkg.in/h2non/bimg.v1 v1.0.18

8
go.sum
View file

@ -421,8 +421,8 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20200522092705-bc8506ccbcf3 h1:Yb+Wlf
github.com/matrix-org/go-sqlite3-js v0.0.0-20200522092705-bc8506ccbcf3/go.mod h1:e+cg2q7C7yE5QnAXgzo512tgFh1RbQLC0+jozuegKgo= github.com/matrix-org/go-sqlite3-js v0.0.0-20200522092705-bc8506ccbcf3/go.mod h1:e+cg2q7C7yE5QnAXgzo512tgFh1RbQLC0+jozuegKgo=
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 h1:Hr3zjRsq2bhrnp3Ky1qgx/fzCtCALOoGYylh2tpS9K4= github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 h1:Hr3zjRsq2bhrnp3Ky1qgx/fzCtCALOoGYylh2tpS9K4=
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0= github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200707103800-7470b03f069b h1:g1ueoPHI5tpafw/QysVfDw43FwRTPqz8sT+MZbK54yk= github.com/matrix-org/gomatrixserverlib v0.0.0-20200714093724-6c9a3db6c0ed h1:b3PPbP+vzI68obhizd9O8/NzpiGOdj3uJXYz9S62PfE=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200707103800-7470b03f069b/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= github.com/matrix-org/gomatrixserverlib v0.0.0-20200714093724-6c9a3db6c0ed/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU=
github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f h1:pRz4VTiRCO4zPlEMc3ESdUOcW4PXHH4Kj+YDz1XyE+Y= github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f h1:pRz4VTiRCO4zPlEMc3ESdUOcW4PXHH4Kj+YDz1XyE+Y=
github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f/go.mod h1:y0oDTjZDv5SM9a2rp3bl+CU+bvTRINQsdb7YlDql5Go= github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f/go.mod h1:y0oDTjZDv5SM9a2rp3bl+CU+bvTRINQsdb7YlDql5Go=
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 h1:ntrLa/8xVzeSs8vHFHK25k0C+NV74sYMJnNSg5NoSRo= github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 h1:ntrLa/8xVzeSs8vHFHK25k0C+NV74sYMJnNSg5NoSRo=
@ -652,8 +652,8 @@ github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhe
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
github.com/yggdrasil-network/yggdrasil-extras v0.0.0-20200525205615-6c8a4a2e8855/go.mod h1:xQdsh08Io6nV4WRnOVTe6gI8/2iTvfLDQ0CYa5aMt+I= github.com/yggdrasil-network/yggdrasil-extras v0.0.0-20200525205615-6c8a4a2e8855/go.mod h1:xQdsh08Io6nV4WRnOVTe6gI8/2iTvfLDQ0CYa5aMt+I=
github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20200707124509-16343a00055c h1:tK1FySfDA5xVT5sAK/3XjUCE9LEoandmVnrg4Hj0fXk= github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20200713083728-5a765b33d55b h1:py36vWqSnHIQ2DQ9gC0jbkiFd9OCTQX01PdYJ7KmaQE=
github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20200707124509-16343a00055c/go.mod h1:d+Nz6SPeG6kmeSPFL0cvfWfgwEql75fUnZiAONgvyBE= github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20200713083728-5a765b33d55b/go.mod h1:d+Nz6SPeG6kmeSPFL0cvfWfgwEql75fUnZiAONgvyBE=
go.opencensus.io v0.18.0/go.mod h1:vKdFvxhtzZ9onBp9VKHK8z/sRpBMnKAsufL7wlDrCOA= go.opencensus.io v0.18.0/go.mod h1:vKdFvxhtzZ9onBp9VKHK8z/sRpBMnKAsufL7wlDrCOA=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
go.opencensus.io v0.22.1/go.mod h1:Ap50jQcDJrx6rB6VgeeFPtuPIf3wMRvRfrfYDO6+BmA= go.opencensus.io v0.22.1/go.mod h1:Ap50jQcDJrx6rB6VgeeFPtuPIf3wMRvRfrfYDO6+BmA=

View file

@ -764,7 +764,7 @@ func (config *Dendrite) FederationSenderURL() string {
return "http://" + string(config.Listen.FederationSender) return "http://" + string(config.Listen.FederationSender)
} }
// ServerKeyAPIURL returns an HTTP URL for where the federation sender is listening. // ServerKeyAPIURL returns an HTTP URL for where the server key API is listening.
func (config *Dendrite) ServerKeyAPIURL() string { func (config *Dendrite) ServerKeyAPIURL() string {
// Hard code the server key API server to talk HTTP for now. // Hard code the server key API server to talk HTTP for now.
// If we support HTTPS we need to think of a practical way to do certificate validation. // If we support HTTPS we need to think of a practical way to do certificate validation.
@ -773,6 +773,15 @@ func (config *Dendrite) ServerKeyAPIURL() string {
return "http://" + string(config.Listen.ServerKeyAPI) return "http://" + string(config.Listen.ServerKeyAPI)
} }
// KeyServerURL returns an HTTP URL for where the key server is listening.
func (config *Dendrite) KeyServerURL() string {
// Hard code the key server to talk HTTP for now.
// If we support HTTPS we need to think of a practical way to do certificate validation.
// People setting up servers shouldn't need to get a certificate valid for the public
// internet for an internal API.
return "http://" + string(config.Listen.KeyServer)
}
// SetupTracing configures the opentracing using the supplied configuration. // SetupTracing configures the opentracing using the supplied configuration.
func (config *Dendrite) SetupTracing(serviceName string) (closer io.Closer, err error) { func (config *Dendrite) SetupTracing(serviceName string) (closer io.Closer, err error) {
if !config.Tracing.Enabled { if !config.Tracing.Enabled {

View file

@ -162,5 +162,10 @@ func RedactEvent(redactionEvent, redactedEvent *gomatrixserverlib.Event) (*gomat
if err != nil { if err != nil {
return nil, err return nil, err
} }
// NOTSPEC: sytest relies on this unspecced field existing :(
err = r.SetUnsignedField("redacted_by", redactionEvent.EventID())
if err != nil {
return nil, err
}
return &r, nil return &r, nil
} }

View file

@ -44,6 +44,8 @@ import (
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
fsinthttp "github.com/matrix-org/dendrite/federationsender/inthttp" fsinthttp "github.com/matrix-org/dendrite/federationsender/inthttp"
"github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/config"
keyserverAPI "github.com/matrix-org/dendrite/keyserver/api"
keyinthttp "github.com/matrix-org/dendrite/keyserver/inthttp"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
rsinthttp "github.com/matrix-org/dendrite/roomserver/inthttp" rsinthttp "github.com/matrix-org/dendrite/roomserver/inthttp"
serverKeyAPI "github.com/matrix-org/dendrite/serverkeyapi/api" serverKeyAPI "github.com/matrix-org/dendrite/serverkeyapi/api"
@ -214,6 +216,15 @@ func (b *BaseDendrite) ServerKeyAPIClient() serverKeyAPI.ServerKeyInternalAPI {
return f return f
} }
// KeyServerHTTPClient returns KeyInternalAPI for hitting the key server over HTTP
func (b *BaseDendrite) KeyServerHTTPClient() keyserverAPI.KeyInternalAPI {
f, err := keyinthttp.NewKeyServerClient(b.Cfg.KeyServerURL(), b.httpClient)
if err != nil {
logrus.WithError(err).Panic("KeyServerHTTPClient failed", b.httpClient)
}
return f
}
// CreateDeviceDB creates a new instance of the device database. Should only be // CreateDeviceDB creates a new instance of the device database. Should only be
// called once per component. // called once per component.
func (b *BaseDendrite) CreateDeviceDB() devices.Database { func (b *BaseDendrite) CreateDeviceDB() devices.Database {

View file

@ -26,7 +26,7 @@ import (
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/transactions" "github.com/matrix-org/dendrite/internal/transactions"
"github.com/matrix-org/dendrite/keyserver" keyAPI "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/mediaapi" "github.com/matrix-org/dendrite/mediaapi"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
serverKeyAPI "github.com/matrix-org/dendrite/serverkeyapi/api" serverKeyAPI "github.com/matrix-org/dendrite/serverkeyapi/api"
@ -56,6 +56,7 @@ type Monolith struct {
ServerKeyAPI serverKeyAPI.ServerKeyInternalAPI ServerKeyAPI serverKeyAPI.ServerKeyInternalAPI
UserAPI userapi.UserInternalAPI UserAPI userapi.UserInternalAPI
StateAPI currentstateAPI.CurrentStateInternalAPI StateAPI currentstateAPI.CurrentStateInternalAPI
KeyAPI keyAPI.KeyInternalAPI
// Optional // Optional
ExtPublicRoomsProvider api.ExtraPublicRoomsProvider ExtPublicRoomsProvider api.ExtraPublicRoomsProvider
@ -69,8 +70,6 @@ func (m *Monolith) AddAllPublicRoutes(publicMux *mux.Router) {
m.EDUInternalAPI, m.AppserviceAPI, m.StateAPI, transactions.New(), m.EDUInternalAPI, m.AppserviceAPI, m.StateAPI, transactions.New(),
m.FederationSenderAPI, m.UserAPI, m.ExtPublicRoomsProvider, m.FederationSenderAPI, m.UserAPI, m.ExtPublicRoomsProvider,
) )
keyserver.AddPublicRoutes(publicMux, m.Config, m.UserAPI)
federationapi.AddPublicRoutes( federationapi.AddPublicRoutes(
publicMux, m.Config, m.UserAPI, m.FedClient, publicMux, m.Config, m.UserAPI, m.FedClient,
m.KeyRing, m.RoomserverAPI, m.FederationSenderAPI, m.KeyRing, m.RoomserverAPI, m.FederationSenderAPI,

19
keyserver/README.md Normal file
View file

@ -0,0 +1,19 @@
## Key Server
This is an internal component which manages E2E keys from clients. It handles all the [Key Management APIs](https://matrix.org/docs/spec/client_server/r0.6.1#key-management-api) with the exception of `/keys/changes` which is handled by Sync API. This component is designed to shard by user ID.
Keys are uploaded and stored in this component, and key changes are emitted to a Kafka topic for downstream components such as Sync API.
### Internal APIs
- `PerformUploadKeys` stores identity keys and one-time public keys for given user(s).
- `PerformClaimKeys` acquires one-time public keys for given user(s). This may involve outbound federation calls.
- `QueryKeys` returns identity keys for given user(s). This may involve outbound federation calls. This component may then cache federated identity keys to avoid repeatedly hitting remote servers.
- A topic which emits identity keys every time there is a change (addition or deletion).
### Endpoint mappings
- Client API maps `/keys/upload` to `PerformUploadKeys`.
- Client API maps `/keys/query` to `QueryKeys`.
- Client API maps `/keys/claim` to `PerformClaimKeys`.
- Federation API maps `/user/keys/query` to `QueryKeys`.
- Federation API maps `/user/keys/claim` to `PerformClaimKeys`.
- Sync API maps `/keys/changes` to consuming from the Kafka topic.

49
keyserver/api/api.go Normal file
View file

@ -0,0 +1,49 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package api
import "context"
type KeyInternalAPI interface {
PerformUploadKeys(ctx context.Context, req *PerformUploadKeysRequest, res *PerformUploadKeysResponse)
PerformClaimKeys(ctx context.Context, req *PerformClaimKeysRequest, res *PerformClaimKeysResponse)
QueryKeys(ctx context.Context, req *QueryKeysRequest, res *QueryKeysResponse)
}
// KeyError is returned if there was a problem performing/querying the server
type KeyError struct {
Error string
}
type PerformUploadKeysRequest struct {
}
type PerformUploadKeysResponse struct {
Error *KeyError
}
type PerformClaimKeysRequest struct {
}
type PerformClaimKeysResponse struct {
Error *KeyError
}
type QueryKeysRequest struct {
}
type QueryKeysResponse struct {
Error *KeyError
}

View file

@ -0,0 +1,19 @@
package internal
import (
"context"
"github.com/matrix-org/dendrite/keyserver/api"
)
type KeyInternalAPI struct{}
func (a *KeyInternalAPI) PerformUploadKeys(ctx context.Context, req *api.PerformUploadKeysRequest, res *api.PerformUploadKeysResponse) {
}
func (a *KeyInternalAPI) PerformClaimKeys(ctx context.Context, req *api.PerformClaimKeysRequest, res *api.PerformClaimKeysResponse) {
}
func (a *KeyInternalAPI) QueryKeys(ctx context.Context, req *api.QueryKeysRequest, res *api.QueryKeysResponse) {
}

103
keyserver/inthttp/client.go Normal file
View file

@ -0,0 +1,103 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package inthttp
import (
"context"
"errors"
"net/http"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/keyserver/api"
"github.com/opentracing/opentracing-go"
)
// HTTP paths for the internal HTTP APIs
const (
PerformUploadKeysPath = "/keyserver/performUploadKeys"
PerformClaimKeysPath = "/keyserver/performClaimKeys"
QueryKeysPath = "/keyserver/queryKeys"
)
// NewKeyServerClient creates a KeyInternalAPI implemented by talking to a HTTP POST API.
// If httpClient is nil an error is returned
func NewKeyServerClient(
apiURL string,
httpClient *http.Client,
) (api.KeyInternalAPI, error) {
if httpClient == nil {
return nil, errors.New("NewKeyServerClient: httpClient is <nil>")
}
return &httpKeyInternalAPI{
apiURL: apiURL,
httpClient: httpClient,
}, nil
}
type httpKeyInternalAPI struct {
apiURL string
httpClient *http.Client
}
func (h *httpKeyInternalAPI) PerformClaimKeys(
ctx context.Context,
request *api.PerformClaimKeysRequest,
response *api.PerformClaimKeysResponse,
) {
span, ctx := opentracing.StartSpanFromContext(ctx, "PerformClaimKeys")
defer span.Finish()
apiURL := h.apiURL + PerformClaimKeysPath
err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
if err != nil {
response.Error = &api.KeyError{
Error: err.Error(),
}
}
}
func (h *httpKeyInternalAPI) PerformUploadKeys(
ctx context.Context,
request *api.PerformUploadKeysRequest,
response *api.PerformUploadKeysResponse,
) {
span, ctx := opentracing.StartSpanFromContext(ctx, "PerformUploadKeys")
defer span.Finish()
apiURL := h.apiURL + PerformUploadKeysPath
err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
if err != nil {
response.Error = &api.KeyError{
Error: err.Error(),
}
}
}
func (h *httpKeyInternalAPI) QueryKeys(
ctx context.Context,
request *api.QueryKeysRequest,
response *api.QueryKeysResponse,
) {
span, ctx := opentracing.StartSpanFromContext(ctx, "QueryKeys")
defer span.Finish()
apiURL := h.apiURL + QueryKeysPath
err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
if err != nil {
response.Error = &api.KeyError{
Error: err.Error(),
}
}
}

View file

@ -0,0 +1,61 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package inthttp
import (
"encoding/json"
"net/http"
"github.com/gorilla/mux"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/util"
)
func AddRoutes(internalAPIMux *mux.Router, s api.KeyInternalAPI) {
internalAPIMux.Handle(PerformClaimKeysPath,
httputil.MakeInternalAPI("performClaimKeys", func(req *http.Request) util.JSONResponse {
request := api.PerformClaimKeysRequest{}
response := api.PerformClaimKeysResponse{}
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
s.PerformClaimKeys(req.Context(), &request, &response)
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
internalAPIMux.Handle(PerformUploadKeysPath,
httputil.MakeInternalAPI("performUploadKeys", func(req *http.Request) util.JSONResponse {
request := api.PerformUploadKeysRequest{}
response := api.PerformUploadKeysResponse{}
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
s.PerformUploadKeys(req.Context(), &request, &response)
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
internalAPIMux.Handle(QueryKeysPath,
httputil.MakeInternalAPI("queryKeys", func(req *http.Request) util.JSONResponse {
request := api.QueryKeysRequest{}
response := api.QueryKeysResponse{}
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
s.QueryKeys(req.Context(), &request, &response)
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
}

View file

@ -16,14 +16,19 @@ package keyserver
import ( import (
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/keyserver/routing" "github.com/matrix-org/dendrite/keyserver/internal"
userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/dendrite/keyserver/inthttp"
) )
// AddPublicRoutes registers HTTP handlers for CS API calls // AddInternalRoutes registers HTTP handlers for the internal API. Invokes functions
func AddPublicRoutes( // on the given input API.
router *mux.Router, cfg *config.Dendrite, userAPI userapi.UserInternalAPI, func AddInternalRoutes(router *mux.Router, intAPI api.KeyInternalAPI) {
) { inthttp.AddRoutes(router, intAPI)
routing.Setup(router, cfg, userAPI) }
// NewInternalAPI returns a concerete implementation of the internal API. Callers
// can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes.
func NewInternalAPI() api.KeyInternalAPI {
return &internal.KeyInternalAPI{}
} }

View file

@ -1,54 +0,0 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package routing
import (
"net/http"
"github.com/gorilla/mux"
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/httputil"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/util"
)
const pathPrefixR0 = "/client/r0"
// Setup registers HTTP handlers with the given ServeMux. It also supplies the given http.Client
// to clients which need to make outbound HTTP requests.
//
// Due to Setup being used to call many other functions, a gocyclo nolint is
// applied:
// nolint: gocyclo
func Setup(
publicAPIMux *mux.Router, cfg *config.Dendrite, userAPI userapi.UserInternalAPI,
) {
r0mux := publicAPIMux.PathPrefix(pathPrefixR0).Subrouter()
r0mux.Handle("/keys/query",
httputil.MakeAuthAPI("queryKeys", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
return QueryKeys(req)
}),
).Methods(http.MethodPost, http.MethodOptions)
r0mux.Handle("/keys/upload/{keyID}",
httputil.MakeAuthAPI("keys_upload", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
return util.JSONResponse{
Code: 200,
JSON: map[string]interface{}{},
}
}),
).Methods(http.MethodPost, http.MethodOptions)
}

View file

@ -53,12 +53,16 @@ func Setup(
activeThumbnailGeneration := &types.ActiveThumbnailGeneration{ activeThumbnailGeneration := &types.ActiveThumbnailGeneration{
PathToResult: map[string]*types.ThumbnailGenerationResult{}, PathToResult: map[string]*types.ThumbnailGenerationResult{},
} }
r0mux.Handle("/upload", httputil.MakeAuthAPI(
uploadHandler := httputil.MakeAuthAPI(
"upload", userAPI, "upload", userAPI,
func(req *http.Request, _ *userapi.Device) util.JSONResponse { func(req *http.Request, _ *userapi.Device) util.JSONResponse {
return Upload(req, cfg, db, activeThumbnailGeneration) return Upload(req, cfg, db, activeThumbnailGeneration)
}, },
)).Methods(http.MethodPost, http.MethodOptions) )
r0mux.Handle("/upload", uploadHandler).Methods(http.MethodPost, http.MethodOptions)
v1mux.Handle("/upload", uploadHandler).Methods(http.MethodPost, http.MethodOptions)
activeRemoteRequests := &types.ActiveRemoteRequests{ activeRemoteRequests := &types.ActiveRemoteRequests{
MXCToResult: map[string]*types.RemoteRequestResult{}, MXCToResult: map[string]*types.RemoteRequestResult{},

View file

@ -29,6 +29,22 @@ const (
// OutputTypeRetireInviteEvent indicates that the event is an OutputRetireInviteEvent // OutputTypeRetireInviteEvent indicates that the event is an OutputRetireInviteEvent
OutputTypeRetireInviteEvent OutputType = "retire_invite_event" OutputTypeRetireInviteEvent OutputType = "retire_invite_event"
// OutputTypeRedactedEvent indicates that the event is an OutputRedactedEvent // OutputTypeRedactedEvent indicates that the event is an OutputRedactedEvent
//
// This event is emitted when a redaction has been 'validated' (meaning both the redaction and the event to redact are known).
// Redaction validation happens when the roomserver receives either:
// - A redaction for which we have the event to redact.
// - Any event for which we have a redaction.
// When the roomserver receives an event, it will check against the redactions table to see if there is a matching redaction
// for the event. If there is, it will mark the redaction as validated and emit this event. In the common case of a redaction
// happening after receiving the event to redact, the roomserver will emit a OutputTypeNewRoomEvent of m.room.redaction
// immediately followed by a OutputTypeRedactedEvent. In the uncommon case of receiving the redaction BEFORE the event to redact,
// the roomserver will emit a OutputTypeNewRoomEvent of the event to redact immediately followed by a OutputTypeRedactedEvent.
//
// In order to honour redactions correctly, downstream components must ignore m.room.redaction events emitted via OutputTypeNewRoomEvent.
// When downstream components receive an OutputTypeRedactedEvent they must:
// - Pull out the event to redact from the database. They should have this because the redaction is validated.
// - Redact the event and set the corresponding `unsigned` fields to indicate it as redacted.
// - Replace the event in the database.
OutputTypeRedactedEvent OutputType = "redacted_event" OutputTypeRedactedEvent OutputType = "redacted_event"
) )

View file

@ -20,6 +20,7 @@ import (
"context" "context"
"fmt" "fmt"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/auth" "github.com/matrix-org/dendrite/roomserver/auth"
"github.com/matrix-org/dendrite/roomserver/state" "github.com/matrix-org/dendrite/roomserver/state"
@ -867,7 +868,7 @@ func getAuthChain(
func persistEvents(ctx context.Context, db storage.Database, events []gomatrixserverlib.HeaderedEvent) (types.RoomNID, map[string]types.Event) { func persistEvents(ctx context.Context, db storage.Database, events []gomatrixserverlib.HeaderedEvent) (types.RoomNID, map[string]types.Event) {
var roomNID types.RoomNID var roomNID types.RoomNID
backfilledEventMap := make(map[string]types.Event) backfilledEventMap := make(map[string]types.Event)
for _, ev := range events { for j, ev := range events {
nidMap, err := db.EventNIDs(ctx, ev.AuthEventIDs()) nidMap, err := db.EventNIDs(ctx, ev.AuthEventIDs())
if err != nil { // this shouldn't happen as RequestBackfill already found them if err != nil { // this shouldn't happen as RequestBackfill already found them
logrus.WithError(err).WithField("auth_events", ev.AuthEventIDs()).Error("Failed to find one or more auth events") logrus.WithError(err).WithField("auth_events", ev.AuthEventIDs()).Error("Failed to find one or more auth events")
@ -891,12 +892,14 @@ func persistEvents(ctx context.Context, db storage.Database, events []gomatrixse
// It's also possible for this event to be a redaction which results in another event being // It's also possible for this event to be a redaction which results in another event being
// redacted, which we don't care about since we aren't returning it in this backfill. // redacted, which we don't care about since we aren't returning it in this backfill.
if redactedEventID == ev.EventID() { if redactedEventID == ev.EventID() {
ev = ev.Redact().Headered(ev.RoomVersion) eventToRedact := ev.Unwrap()
err = ev.SetUnsignedField("redacted_because", redactionEvent) redactedEvent, err := eventutil.RedactEvent(redactionEvent, &eventToRedact)
if err != nil { if err != nil {
logrus.WithError(err).WithField("event_id", ev.EventID()).Error("Failed to set unsigned field") logrus.WithError(err).WithField("event_id", ev.EventID()).Error("Failed to redact event")
continue continue
} }
ev = redactedEvent.Headered(ev.RoomVersion)
events[j] = ev
} }
backfilledEventMap[ev.EventID()] = types.Event{ backfilledEventMap[ev.EventID()] = types.Event{
EventNID: stateAtEvent.StateEntry.EventNID, EventNID: stateAtEvent.StateEntry.EventNID,

View file

@ -563,6 +563,10 @@ func (d *Database) handleRedactions(
// we've seen this redaction before or there is nothing to redact // we've seen this redaction before or there is nothing to redact
return nil, "", nil return nil, "", nil
} }
if redactedEvent.RoomID() != redactionEvent.RoomID() {
// redactions across rooms aren't allowed
return nil, "", nil
}
// mark the event as redacted // mark the event as redacted
err = redactedEvent.SetUnsignedField("redacted_because", redactionEvent) err = redactedEvent.SetUnsignedField("redacted_because", redactionEvent)

View file

@ -81,11 +81,23 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
switch output.Type { switch output.Type {
case api.OutputTypeNewRoomEvent: case api.OutputTypeNewRoomEvent:
// Ignore redaction events. We will add them to the database when they are
// validated (when we receive OutputTypeRedactedEvent)
event := output.NewRoomEvent.Event
if event.Type() == gomatrixserverlib.MRoomRedaction && event.StateKey() == nil {
// in the special case where the event redacts itself, just pass the message through because
// we will never see the other part of the pair
if event.Redacts() != event.EventID() {
return nil
}
}
return s.onNewRoomEvent(context.TODO(), *output.NewRoomEvent) return s.onNewRoomEvent(context.TODO(), *output.NewRoomEvent)
case api.OutputTypeNewInviteEvent: case api.OutputTypeNewInviteEvent:
return s.onNewInviteEvent(context.TODO(), *output.NewInviteEvent) return s.onNewInviteEvent(context.TODO(), *output.NewInviteEvent)
case api.OutputTypeRetireInviteEvent: case api.OutputTypeRetireInviteEvent:
return s.onRetireInviteEvent(context.TODO(), *output.RetireInviteEvent) return s.onRetireInviteEvent(context.TODO(), *output.RetireInviteEvent)
case api.OutputTypeRedactedEvent:
return s.onRedactEvent(context.TODO(), *output.RedactedEvent)
default: default:
log.WithField("type", output.Type).Debug( log.WithField("type", output.Type).Debug(
"roomserver output log: ignoring unknown output type", "roomserver output log: ignoring unknown output type",
@ -94,11 +106,25 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
} }
} }
func (s *OutputRoomEventConsumer) onRedactEvent(
ctx context.Context, msg api.OutputRedactedEvent,
) error {
err := s.db.RedactEvent(ctx, msg.RedactedEventID, &msg.RedactedBecause)
if err != nil {
log.WithError(err).Error("RedactEvent error'd")
return err
}
// fake a room event so we notify clients about the redaction, as if it were
// a normal event.
return s.onNewRoomEvent(ctx, api.OutputNewRoomEvent{
Event: msg.RedactedBecause,
})
}
func (s *OutputRoomEventConsumer) onNewRoomEvent( func (s *OutputRoomEventConsumer) onNewRoomEvent(
ctx context.Context, msg api.OutputNewRoomEvent, ctx context.Context, msg api.OutputNewRoomEvent,
) error { ) error {
ev := msg.Event ev := msg.Event
addsStateEvents := msg.AddsState() addsStateEvents := msg.AddsState()
ev, err := s.updateStateEvent(ev) ev, err := s.updateStateEvent(ev)
@ -173,12 +199,10 @@ func (s *OutputRoomEventConsumer) onRetireInviteEvent(
} }
func (s *OutputRoomEventConsumer) updateStateEvent(event gomatrixserverlib.HeaderedEvent) (gomatrixserverlib.HeaderedEvent, error) { func (s *OutputRoomEventConsumer) updateStateEvent(event gomatrixserverlib.HeaderedEvent) (gomatrixserverlib.HeaderedEvent, error) {
var stateKey string
if event.StateKey() == nil { if event.StateKey() == nil {
stateKey = "" return event, nil
} else {
stateKey = *event.StateKey()
} }
stateKey := *event.StateKey()
prevEvent, err := s.db.GetStateEvent( prevEvent, err := s.db.GetStateEvent(
context.TODO(), event.RoomID(), event.Type(), stateKey, context.TODO(), event.RoomID(), event.Type(), stateKey,

View file

@ -136,4 +136,6 @@ type Database interface {
// Returns the filterID as a string. Otherwise returns an error if something // Returns the filterID as a string. Otherwise returns an error if something
// goes wrong. // goes wrong.
PutFilter(ctx context.Context, localpart string, filter *gomatrixserverlib.Filter) (string, error) PutFilter(ctx context.Context, localpart string, filter *gomatrixserverlib.Filter) (string, error)
// RedactEvent wipes an event in the database and sets the unsigned.redacted_because key to the redaction event
RedactEvent(ctx context.Context, redactedEventID string, redactedBecause *gomatrixserverlib.HeaderedEvent) error
} }

View file

@ -99,6 +99,9 @@ const selectEarlyEventsSQL = "" +
const selectMaxEventIDSQL = "" + const selectMaxEventIDSQL = "" +
"SELECT MAX(id) FROM syncapi_output_room_events" "SELECT MAX(id) FROM syncapi_output_room_events"
const updateEventJSONSQL = "" +
"UPDATE syncapi_output_room_events SET headered_event_json=$1 WHERE event_id=$2"
// In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id). // In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id).
const selectStateInRangeSQL = "" + const selectStateInRangeSQL = "" +
"SELECT id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids" + "SELECT id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids" +
@ -120,6 +123,7 @@ type outputRoomEventsStatements struct {
selectRecentEventsForSyncStmt *sql.Stmt selectRecentEventsForSyncStmt *sql.Stmt
selectEarlyEventsStmt *sql.Stmt selectEarlyEventsStmt *sql.Stmt
selectStateInRangeStmt *sql.Stmt selectStateInRangeStmt *sql.Stmt
updateEventJSONStmt *sql.Stmt
} }
func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) { func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
@ -149,9 +153,21 @@ func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
if s.selectStateInRangeStmt, err = db.Prepare(selectStateInRangeSQL); err != nil { if s.selectStateInRangeStmt, err = db.Prepare(selectStateInRangeSQL); err != nil {
return nil, err return nil, err
} }
if s.updateEventJSONStmt, err = db.Prepare(updateEventJSONSQL); err != nil {
return nil, err
}
return s, nil return s, nil
} }
func (s *outputRoomEventsStatements) UpdateEventJSON(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error {
headeredJSON, err := json.Marshal(event)
if err != nil {
return err
}
_, err = s.updateEventJSONStmt.ExecContext(ctx, headeredJSON, event.EventID())
return err
}
// selectStateInRange returns the state events between the two given PDU stream positions, exclusive of oldPos, inclusive of newPos. // selectStateInRange returns the state events between the two given PDU stream positions, exclusive of oldPos, inclusive of newPos.
// Results are bucketed based on the room ID. If the same state is overwritten multiple times between the // Results are bucketed based on the room ID. If the same state is overwritten multiple times between the
// two positions, only the most recent state is returned. // two positions, only the most recent state is returned.

View file

@ -24,6 +24,7 @@ import (
userapi "github.com/matrix-org/dendrite/userapi/api" userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/storage/tables" "github.com/matrix-org/dendrite/syncapi/storage/tables"
@ -597,6 +598,26 @@ func (d *Database) IncrementalSync(
return res, nil return res, nil
} }
func (d *Database) RedactEvent(ctx context.Context, redactedEventID string, redactedBecause *gomatrixserverlib.HeaderedEvent) error {
redactedEvents, err := d.Events(ctx, []string{redactedEventID})
if err != nil {
return err
}
if len(redactedEvents) == 0 {
logrus.WithField("event_id", redactedEventID).WithField("redaction_event", redactedBecause.EventID()).Warnf("missing redacted event for redaction")
return nil
}
eventToRedact := redactedEvents[0].Unwrap()
redactionEvent := redactedBecause.Unwrap()
ev, err := eventutil.RedactEvent(&redactionEvent, &eventToRedact)
if err != nil {
return err
}
newEvent := ev.Headered(redactedBecause.RoomVersion)
return d.OutputEvents.UpdateEventJSON(ctx, &newEvent)
}
// getResponseWithPDUsForCompleteSync creates a response and adds all PDUs needed // getResponseWithPDUsForCompleteSync creates a response and adds all PDUs needed
// to it. It returns toPos and joinedRoomIDs for use of adding EDUs. // to it. It returns toPos and joinedRoomIDs for use of adding EDUs.
// nolint:nakedret // nolint:nakedret

View file

@ -76,6 +76,9 @@ const selectEarlyEventsSQL = "" +
const selectMaxEventIDSQL = "" + const selectMaxEventIDSQL = "" +
"SELECT MAX(id) FROM syncapi_output_room_events" "SELECT MAX(id) FROM syncapi_output_room_events"
const updateEventJSONSQL = "" +
"UPDATE syncapi_output_room_events SET headered_event_json=$1 WHERE event_id=$2"
// In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id). // In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id).
/* /*
$1 = oldPos, $1 = oldPos,
@ -109,6 +112,7 @@ type outputRoomEventsStatements struct {
selectRecentEventsForSyncStmt *sql.Stmt selectRecentEventsForSyncStmt *sql.Stmt
selectEarlyEventsStmt *sql.Stmt selectEarlyEventsStmt *sql.Stmt
selectStateInRangeStmt *sql.Stmt selectStateInRangeStmt *sql.Stmt
updateEventJSONStmt *sql.Stmt
} }
func NewSqliteEventsTable(db *sql.DB, streamID *streamIDStatements) (tables.Events, error) { func NewSqliteEventsTable(db *sql.DB, streamID *streamIDStatements) (tables.Events, error) {
@ -140,9 +144,21 @@ func NewSqliteEventsTable(db *sql.DB, streamID *streamIDStatements) (tables.Even
if s.selectStateInRangeStmt, err = db.Prepare(selectStateInRangeSQL); err != nil { if s.selectStateInRangeStmt, err = db.Prepare(selectStateInRangeSQL); err != nil {
return nil, err return nil, err
} }
if s.updateEventJSONStmt, err = db.Prepare(updateEventJSONSQL); err != nil {
return nil, err
}
return s, nil return s, nil
} }
func (s *outputRoomEventsStatements) UpdateEventJSON(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error {
headeredJSON, err := json.Marshal(event)
if err != nil {
return err
}
_, err = s.updateEventJSONStmt.ExecContext(ctx, headeredJSON, event.EventID())
return err
}
// selectStateInRange returns the state events between the two given PDU stream positions, exclusive of oldPos, inclusive of newPos. // selectStateInRange returns the state events between the two given PDU stream positions, exclusive of oldPos, inclusive of newPos.
// Results are bucketed based on the room ID. If the same state is overwritten multiple times between the // Results are bucketed based on the room ID. If the same state is overwritten multiple times between the
// two positions, only the most recent state is returned. // two positions, only the most recent state is returned.

View file

@ -49,6 +49,7 @@ type Events interface {
// SelectEarlyEvents returns the earliest events in the given room. // SelectEarlyEvents returns the earliest events in the given room.
SelectEarlyEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, limit int) ([]types.StreamEvent, error) SelectEarlyEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, limit int) ([]types.StreamEvent, error)
SelectEvents(ctx context.Context, txn *sql.Tx, eventIDs []string) ([]types.StreamEvent, error) SelectEvents(ctx context.Context, txn *sql.Tx, eventIDs []string) ([]types.StreamEvent, error)
UpdateEventJSON(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error
} }
// Topology keeps track of the depths and stream positions for all events. // Topology keeps track of the depths and stream positions for all events.

View file

@ -31,6 +31,12 @@ PUT /profile/:user_id/avatar_url sets my avatar
GET /profile/:user_id/avatar_url publicly accessible GET /profile/:user_id/avatar_url publicly accessible
GET /device/{deviceId} gives a 404 for unknown devices GET /device/{deviceId} gives a 404 for unknown devices
PUT /device/{deviceId} gives a 404 for unknown devices PUT /device/{deviceId} gives a 404 for unknown devices
GET /device/{deviceId}
GET /devices
PUT /device/{deviceId} updates device fields
DELETE /device/{deviceId}
DELETE /device/{deviceId} requires UI auth user to match device owner
DELETE /device/{deviceId} with no body gives a 401
POST /createRoom makes a public room POST /createRoom makes a public room
POST /createRoom makes a private room POST /createRoom makes a private room
POST /createRoom makes a private room with invites POST /createRoom makes a private room with invites
@ -160,14 +166,6 @@ User can create and send/receive messages in a room with version 1
POST /createRoom ignores attempts to set the room version via creation_content POST /createRoom ignores attempts to set the room version via creation_content
Inbound federation rejects remote attempts to join local users to rooms Inbound federation rejects remote attempts to join local users to rooms
Inbound federation rejects remote attempts to kick local users to rooms Inbound federation rejects remote attempts to kick local users to rooms
# SyTest currently only implements the v1 endpoints for /send_join and /send_leave,
# whereas Dendrite only supports the v2 endpoints for those, so let's ignore this
# test for now.
#An event which redacts itself should be ignored
# SyTest currently only implements the v1 endpoints for /send_join and /send_leave,
# whereas Dendrite only supports the v2 endpoints for those, so let's ignore this
# test for now.
#A pair of events which redact each other should be ignored
Full state sync includes joined rooms Full state sync includes joined rooms
A message sent after an initial sync appears in the timeline of an incremental sync. A message sent after an initial sync appears in the timeline of an incremental sync.
Can add tag Can add tag
@ -295,6 +293,14 @@ POST /rooms/:room_id/redact/:event_id as random user does not redact message
POST /redact disallows redaction of event in different room POST /redact disallows redaction of event in different room
An event which redacts itself should be ignored An event which redacts itself should be ignored
A pair of events which redact each other should be ignored A pair of events which redact each other should be ignored
Redaction of a redaction redacts the redaction reason
An event which redacts an event in a different room should be ignored
Can receive redactions from regular users over federation in room version 1
Can receive redactions from regular users over federation in room version 2
Can receive redactions from regular users over federation in room version 3
Can receive redactions from regular users over federation in room version 4
Can receive redactions from regular users over federation in room version 5
Can receive redactions from regular users over federation in room version 6
Outbound federation can backfill events Outbound federation can backfill events
Inbound federation can backfill events Inbound federation can backfill events
Backfill checks the events requested belong to the room Backfill checks the events requested belong to the room