diff --git a/are-we-synapse-yet.list b/are-we-synapse-yet.list index 3876de549..239b0ebf4 100644 --- a/are-we-synapse-yet.list +++ b/are-we-synapse-yet.list @@ -91,6 +91,7 @@ snd PUT /rooms/:room_id/send/:event_type/:txn_id deduplicates the same txn id get GET /rooms/:room_id/messages returns a message get GET /rooms/:room_id/messages lazy loads members correctly typ PUT /rooms/:room_id/typing/:user_id sets typing notification +typ Typing notifications don't leak (3 subtests) rst GET /rooms/:room_id/state/m.room.power_levels can fetch levels rst PUT /rooms/:room_id/state/m.room.power_levels can set levels rst PUT power_levels should not explode if the old power levels were empty @@ -857,4 +858,4 @@ jso Invalid JSON special values inv Can invite users to invite-only rooms (2 subtests) plv setting 'm.room.name' respects room powerlevel (2 subtests) psh Messages that notify from another user increment notification_count -psh Messages that org.matrix.msc2625.mark_unread from another user increment org.matrix.msc2625.unread_count \ No newline at end of file +psh Messages that org.matrix.msc2625.mark_unread from another user increment org.matrix.msc2625.unread_count diff --git a/clientapi/clientapi.go b/clientapi/clientapi.go index 174eb1bf1..8ea84249a 100644 --- a/clientapi/clientapi.go +++ b/clientapi/clientapi.go @@ -18,9 +18,9 @@ import ( "github.com/Shopify/sarama" "github.com/gorilla/mux" appserviceAPI "github.com/matrix-org/dendrite/appservice/api" - "github.com/matrix-org/dendrite/clientapi/consumers" "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/clientapi/routing" + currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api" eduServerAPI "github.com/matrix-org/dendrite/eduserver/api" federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/internal/config" @@ -30,14 +30,12 @@ import ( "github.com/matrix-org/dendrite/userapi/storage/accounts" "github.com/matrix-org/dendrite/userapi/storage/devices" "github.com/matrix-org/gomatrixserverlib" - "github.com/sirupsen/logrus" ) // AddPublicRoutes sets up and registers HTTP handlers for the ClientAPI component. func AddPublicRoutes( router *mux.Router, cfg *config.Dendrite, - consumer sarama.Consumer, producer sarama.SyncProducer, deviceDB devices.Database, accountsDB accounts.Database, @@ -45,6 +43,7 @@ func AddPublicRoutes( rsAPI roomserverAPI.RoomserverInternalAPI, eduInputAPI eduServerAPI.EDUServerInputAPI, asAPI appserviceAPI.AppServiceQueryAPI, + stateAPI currentstateAPI.CurrentStateInternalAPI, transactionsCache *transactions.Cache, fsAPI federationSenderAPI.FederationSenderInternalAPI, userAPI userapi.UserInternalAPI, @@ -54,16 +53,9 @@ func AddPublicRoutes( Topic: string(cfg.Kafka.Topics.OutputClientData), } - roomEventConsumer := consumers.NewOutputRoomEventConsumer( - cfg, consumer, accountsDB, rsAPI, - ) - if err := roomEventConsumer.Start(); err != nil { - logrus.WithError(err).Panicf("failed to start room server consumer") - } - routing.Setup( router, cfg, eduInputAPI, rsAPI, asAPI, accountsDB, deviceDB, userAPI, federation, - syncProducer, transactionsCache, fsAPI, + syncProducer, transactionsCache, fsAPI, stateAPI, ) } diff --git a/clientapi/consumers/roomserver.go b/clientapi/consumers/roomserver.go deleted file mode 100644 index beeda042b..000000000 --- a/clientapi/consumers/roomserver.go +++ /dev/null @@ -1,92 +0,0 @@ -// Copyright 2017 Vector Creations Ltd -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package consumers - -import ( - "context" - "encoding/json" - - "github.com/matrix-org/dendrite/internal" - "github.com/matrix-org/dendrite/internal/config" - "github.com/matrix-org/dendrite/roomserver/api" - "github.com/matrix-org/dendrite/userapi/storage/accounts" - "github.com/matrix-org/gomatrixserverlib" - - "github.com/Shopify/sarama" - log "github.com/sirupsen/logrus" -) - -// OutputRoomEventConsumer consumes events that originated in the room server. -type OutputRoomEventConsumer struct { - rsAPI api.RoomserverInternalAPI - rsConsumer *internal.ContinualConsumer - db accounts.Database - serverName string -} - -// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers. -func NewOutputRoomEventConsumer( - cfg *config.Dendrite, - kafkaConsumer sarama.Consumer, - store accounts.Database, - rsAPI api.RoomserverInternalAPI, -) *OutputRoomEventConsumer { - - consumer := internal.ContinualConsumer{ - Topic: string(cfg.Kafka.Topics.OutputRoomEvent), - Consumer: kafkaConsumer, - PartitionStore: store, - } - s := &OutputRoomEventConsumer{ - rsConsumer: &consumer, - db: store, - rsAPI: rsAPI, - serverName: string(cfg.Matrix.ServerName), - } - consumer.ProcessMessage = s.onMessage - - return s -} - -// Start consuming from room servers -func (s *OutputRoomEventConsumer) Start() error { - return s.rsConsumer.Start() -} - -// onMessage is called when the sync server receives a new event from the room server output log. -// It is not safe for this function to be called from multiple goroutines, or else the -// sync stream position may race and be incorrectly calculated. -func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { - // Parse out the event JSON - var output api.OutputEvent - if err := json.Unmarshal(msg.Value, &output); err != nil { - // If the message was invalid, log it and move on to the next message in the stream - log.WithError(err).Errorf("roomserver output log: message parse failure") - return nil - } - - if output.Type != api.OutputTypeNewRoomEvent { - log.WithField("type", output.Type).Debug( - "roomserver output log: ignoring unknown output type", - ) - return nil - } - - return s.db.UpdateMemberships( - context.TODO(), - gomatrixserverlib.UnwrapEventHeaders(output.NewRoomEvent.AddsState()), - output.NewRoomEvent.RemovesStateEventIDs, - ) -} diff --git a/clientapi/routing/memberships.go b/clientapi/routing/memberships.go index 1c9800b66..9c4cf7497 100644 --- a/clientapi/routing/memberships.go +++ b/clientapi/routing/memberships.go @@ -18,9 +18,8 @@ import ( "encoding/json" "net/http" - "github.com/matrix-org/dendrite/userapi/storage/accounts" - "github.com/matrix-org/dendrite/clientapi/jsonerror" + currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api" "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/roomserver/api" userapi "github.com/matrix-org/dendrite/userapi/api" @@ -95,20 +94,19 @@ func GetMemberships( func GetJoinedRooms( req *http.Request, device *userapi.Device, - accountsDB accounts.Database, + stateAPI currentstateAPI.CurrentStateInternalAPI, ) util.JSONResponse { - localpart, _, err := gomatrixserverlib.SplitID('@', device.UserID) + var res currentstateAPI.QueryRoomsForUserResponse + err := stateAPI.QueryRoomsForUser(req.Context(), ¤tstateAPI.QueryRoomsForUserRequest{ + UserID: device.UserID, + WantMembership: "join", + }, &res) if err != nil { - util.GetLogger(req.Context()).WithError(err).Error("gomatrixserverlib.SplitID failed") - return jsonerror.InternalServerError() - } - joinedRooms, err := accountsDB.GetRoomIDsByLocalPart(req.Context(), localpart) - if err != nil { - util.GetLogger(req.Context()).WithError(err).Error("accountsDB.GetRoomIDsByLocalPart failed") + util.GetLogger(req.Context()).WithError(err).Error("QueryRoomsForUser failed") return jsonerror.InternalServerError() } return util.JSONResponse{ Code: http.StatusOK, - JSON: getJoinedRoomsResponse{joinedRooms}, + JSON: getJoinedRoomsResponse{res.RoomIDs}, } } diff --git a/clientapi/routing/profile.go b/clientapi/routing/profile.go index 7c2cd19bc..1df4c9b33 100644 --- a/clientapi/routing/profile.go +++ b/clientapi/routing/profile.go @@ -23,6 +23,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" + currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api" "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/roomserver/api" @@ -93,8 +94,8 @@ func GetAvatarURL( // SetAvatarURL implements PUT /profile/{userID}/avatar_url // nolint:gocyclo func SetAvatarURL( - req *http.Request, accountDB accounts.Database, device *userapi.Device, - userID string, cfg *config.Dendrite, rsAPI api.RoomserverInternalAPI, + req *http.Request, accountDB accounts.Database, stateAPI currentstateAPI.CurrentStateInternalAPI, + device *userapi.Device, userID string, cfg *config.Dendrite, rsAPI api.RoomserverInternalAPI, ) util.JSONResponse { if userID != device.UserID { return util.JSONResponse{ @@ -139,9 +140,13 @@ func SetAvatarURL( return jsonerror.InternalServerError() } - memberships, err := accountDB.GetMembershipsByLocalpart(req.Context(), localpart) + var res currentstateAPI.QueryRoomsForUserResponse + err = stateAPI.QueryRoomsForUser(req.Context(), ¤tstateAPI.QueryRoomsForUserRequest{ + UserID: device.UserID, + WantMembership: "join", + }, &res) if err != nil { - util.GetLogger(req.Context()).WithError(err).Error("accountDB.GetMembershipsByLocalpart failed") + util.GetLogger(req.Context()).WithError(err).Error("QueryRoomsForUser failed") return jsonerror.InternalServerError() } @@ -152,7 +157,7 @@ func SetAvatarURL( } events, err := buildMembershipEvents( - req.Context(), memberships, newProfile, userID, cfg, evTime, rsAPI, + req.Context(), res.RoomIDs, newProfile, userID, cfg, evTime, rsAPI, ) switch e := err.(type) { case nil: @@ -207,8 +212,8 @@ func GetDisplayName( // SetDisplayName implements PUT /profile/{userID}/displayname // nolint:gocyclo func SetDisplayName( - req *http.Request, accountDB accounts.Database, device *userapi.Device, - userID string, cfg *config.Dendrite, rsAPI api.RoomserverInternalAPI, + req *http.Request, accountDB accounts.Database, stateAPI currentstateAPI.CurrentStateInternalAPI, + device *userapi.Device, userID string, cfg *config.Dendrite, rsAPI api.RoomserverInternalAPI, ) util.JSONResponse { if userID != device.UserID { return util.JSONResponse{ @@ -253,9 +258,13 @@ func SetDisplayName( return jsonerror.InternalServerError() } - memberships, err := accountDB.GetMembershipsByLocalpart(req.Context(), localpart) + var res currentstateAPI.QueryRoomsForUserResponse + err = stateAPI.QueryRoomsForUser(req.Context(), ¤tstateAPI.QueryRoomsForUserRequest{ + UserID: device.UserID, + WantMembership: "join", + }, &res) if err != nil { - util.GetLogger(req.Context()).WithError(err).Error("accountDB.GetMembershipsByLocalpart failed") + util.GetLogger(req.Context()).WithError(err).Error("QueryRoomsForUser failed") return jsonerror.InternalServerError() } @@ -266,7 +275,7 @@ func SetDisplayName( } events, err := buildMembershipEvents( - req.Context(), memberships, newProfile, userID, cfg, evTime, rsAPI, + req.Context(), res.RoomIDs, newProfile, userID, cfg, evTime, rsAPI, ) switch e := err.(type) { case nil: @@ -335,14 +344,14 @@ func getProfile( func buildMembershipEvents( ctx context.Context, - memberships []authtypes.Membership, + roomIDs []string, newProfile authtypes.Profile, userID string, cfg *config.Dendrite, evTime time.Time, rsAPI api.RoomserverInternalAPI, ) ([]gomatrixserverlib.HeaderedEvent, error) { evs := []gomatrixserverlib.HeaderedEvent{} - for _, membership := range memberships { - verReq := api.QueryRoomVersionForRoomRequest{RoomID: membership.RoomID} + for _, roomID := range roomIDs { + verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID} verRes := api.QueryRoomVersionForRoomResponse{} if err := rsAPI.QueryRoomVersionForRoom(ctx, &verReq, &verRes); err != nil { return []gomatrixserverlib.HeaderedEvent{}, err @@ -350,7 +359,7 @@ func buildMembershipEvents( builder := gomatrixserverlib.EventBuilder{ Sender: userID, - RoomID: membership.RoomID, + RoomID: roomID, Type: "m.room.member", StateKey: &userID, } diff --git a/clientapi/routing/routing.go b/clientapi/routing/routing.go index 9dfff0f20..deaa7b329 100644 --- a/clientapi/routing/routing.go +++ b/clientapi/routing/routing.go @@ -23,6 +23,7 @@ import ( appserviceAPI "github.com/matrix-org/dendrite/appservice/api" "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/producers" + currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api" eduServerAPI "github.com/matrix-org/dendrite/eduserver/api" federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/internal/config" @@ -58,6 +59,7 @@ func Setup( syncProducer *producers.SyncAPIProducer, transactionsCache *transactions.Cache, federationSender federationSenderAPI.FederationSenderInternalAPI, + stateAPI currentstateAPI.CurrentStateInternalAPI, ) { publicAPIMux.Handle("/client/versions", @@ -98,7 +100,7 @@ func Setup( ).Methods(http.MethodPost, http.MethodOptions) r0mux.Handle("/joined_rooms", httputil.MakeAuthAPI("joined_rooms", userAPI, func(req *http.Request, device *api.Device) util.JSONResponse { - return GetJoinedRooms(req, device, accountDB) + return GetJoinedRooms(req, device, stateAPI) }), ).Methods(http.MethodGet, http.MethodOptions) r0mux.Handle("/rooms/{roomID}/join", @@ -307,7 +309,7 @@ func Setup( if err != nil { return util.ErrorResponse(err) } - return SendTyping(req, device, vars["roomID"], vars["userID"], accountDB, eduAPI) + return SendTyping(req, device, vars["roomID"], vars["userID"], accountDB, eduAPI, stateAPI) }), ).Methods(http.MethodPut, http.MethodOptions) @@ -404,7 +406,7 @@ func Setup( if err != nil { return util.ErrorResponse(err) } - return SetAvatarURL(req, accountDB, device, vars["userID"], cfg, rsAPI) + return SetAvatarURL(req, accountDB, stateAPI, device, vars["userID"], cfg, rsAPI) }), ).Methods(http.MethodPut, http.MethodOptions) // Browsers use the OPTIONS HTTP method to check if the CORS policy allows @@ -426,7 +428,7 @@ func Setup( if err != nil { return util.ErrorResponse(err) } - return SetDisplayName(req, accountDB, device, vars["userID"], cfg, rsAPI) + return SetDisplayName(req, accountDB, stateAPI, device, vars["userID"], cfg, rsAPI) }), ).Methods(http.MethodPut, http.MethodOptions) // Browsers use the OPTIONS HTTP method to check if the CORS policy allows diff --git a/clientapi/routing/sendtyping.go b/clientapi/routing/sendtyping.go index 9b6a0b39b..54a822860 100644 --- a/clientapi/routing/sendtyping.go +++ b/clientapi/routing/sendtyping.go @@ -13,15 +13,15 @@ package routing import ( - "database/sql" "net/http" "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" - "github.com/matrix-org/dendrite/clientapi/userutil" + currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api" "github.com/matrix-org/dendrite/eduserver/api" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/dendrite/userapi/storage/accounts" + "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" ) @@ -36,6 +36,7 @@ func SendTyping( req *http.Request, device *userapi.Device, roomID string, userID string, accountDB accounts.Database, eduAPI api.EDUServerInputAPI, + stateAPI currentstateAPI.CurrentStateInternalAPI, ) util.JSONResponse { if device.UserID != userID { return util.JSONResponse{ @@ -44,23 +45,38 @@ func SendTyping( } } - localpart, err := userutil.ParseUsernameParam(userID, nil) + // Verify that the user is a member of this room + tuple := gomatrixserverlib.StateKeyTuple{ + EventType: gomatrixserverlib.MRoomMember, + StateKey: userID, + } + var res currentstateAPI.QueryCurrentStateResponse + err := stateAPI.QueryCurrentState(req.Context(), ¤tstateAPI.QueryCurrentStateRequest{ + RoomID: roomID, + StateTuples: []gomatrixserverlib.StateKeyTuple{tuple}, + }, &res) if err != nil { - util.GetLogger(req.Context()).WithError(err).Error("userutil.ParseUsernameParam failed") + util.GetLogger(req.Context()).WithError(err).Error("QueryCurrentState failed") return jsonerror.InternalServerError() } - - // Verify that the user is a member of this room - _, err = accountDB.GetMembershipInRoomByLocalpart(req.Context(), localpart, roomID) - if err == sql.ErrNoRows { + ev := res.StateEvents[tuple] + if ev == nil { return util.JSONResponse{ Code: http.StatusForbidden, JSON: jsonerror.Forbidden("User not in this room"), } - } else if err != nil { - util.GetLogger(req.Context()).WithError(err).Error("accountDB.GetMembershipInRoomByLocalPart failed") + } + membership, err := ev.Membership() + if err != nil { + util.GetLogger(req.Context()).WithError(err).Error("Member event isn't valid") return jsonerror.InternalServerError() } + if membership != gomatrixserverlib.Join { + return util.JSONResponse{ + Code: http.StatusForbidden, + JSON: jsonerror.Forbidden("User not in this room"), + } + } // parse the incoming http request var r typingContentJSON diff --git a/cmd/dendrite-client-api-server/main.go b/cmd/dendrite-client-api-server/main.go index fe5f30a0e..f46dae502 100644 --- a/cmd/dendrite-client-api-server/main.go +++ b/cmd/dendrite-client-api-server/main.go @@ -35,10 +35,11 @@ func main() { fsAPI := base.FederationSenderHTTPClient() eduInputAPI := base.EDUServerClient() userAPI := base.UserAPIClient() + stateAPI := base.CurrentStateAPIClient() clientapi.AddPublicRoutes( - base.PublicAPIMux, base.Cfg, base.KafkaConsumer, base.KafkaProducer, deviceDB, accountDB, federation, - rsAPI, eduInputAPI, asQuery, transactions.New(), fsAPI, userAPI, + base.PublicAPIMux, base.Cfg, base.KafkaProducer, deviceDB, accountDB, federation, + rsAPI, eduInputAPI, asQuery, stateAPI, transactions.New(), fsAPI, userAPI, ) base.SetupAndServeHTTP(string(base.Cfg.Bind.ClientAPI), string(base.Cfg.Listen.ClientAPI)) diff --git a/cmd/dendrite-demo-libp2p/main.go b/cmd/dendrite-demo-libp2p/main.go index 356ab5a7f..b7e86b77c 100644 --- a/cmd/dendrite-demo-libp2p/main.go +++ b/cmd/dendrite-demo-libp2p/main.go @@ -30,6 +30,7 @@ import ( p2pdisc "github.com/libp2p/go-libp2p/p2p/discovery" "github.com/matrix-org/dendrite/appservice" "github.com/matrix-org/dendrite/cmd/dendrite-demo-libp2p/storage" + "github.com/matrix-org/dendrite/currentstateserver" "github.com/matrix-org/dendrite/eduserver" "github.com/matrix-org/dendrite/federationsender" "github.com/matrix-org/dendrite/internal/config" @@ -166,6 +167,7 @@ func main() { if err != nil { logrus.WithError(err).Panicf("failed to connect to public rooms db") } + stateAPI := currentstateserver.NewInternalAPI(base.Base.Cfg, base.Base.KafkaConsumer) monolith := setup.Monolith{ Config: base.Base.Cfg, @@ -182,6 +184,7 @@ func main() { FederationSenderAPI: fsAPI, RoomserverAPI: rsAPI, ServerKeyAPI: serverKeyAPI, + StateAPI: stateAPI, UserAPI: userAPI, PublicRoomsDB: publicRoomsDB, diff --git a/cmd/dendrite-demo-yggdrasil/main.go b/cmd/dendrite-demo-yggdrasil/main.go index db05ecb76..5de674021 100644 --- a/cmd/dendrite-demo-yggdrasil/main.go +++ b/cmd/dendrite-demo-yggdrasil/main.go @@ -27,6 +27,7 @@ import ( "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/embed" "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing" "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/yggconn" + "github.com/matrix-org/dendrite/currentstateserver" "github.com/matrix-org/dendrite/eduserver" "github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/federationsender" @@ -115,6 +116,8 @@ func main() { embed.Embed(base.BaseMux, *instancePort, "Yggdrasil Demo") + stateAPI := currentstateserver.NewInternalAPI(base.Cfg, base.KafkaConsumer) + monolith := setup.Monolith{ Config: base.Cfg, AccountDB: accountDB, @@ -130,6 +133,7 @@ func main() { FederationSenderAPI: fsAPI, RoomserverAPI: rsAPI, UserAPI: userAPI, + StateAPI: stateAPI, //ServerKeyAPI: serverKeyAPI, PublicRoomsDB: publicRoomsDB, diff --git a/cmd/dendrite-monolith-server/main.go b/cmd/dendrite-monolith-server/main.go index 339bbe699..905eda2ba 100644 --- a/cmd/dendrite-monolith-server/main.go +++ b/cmd/dendrite-monolith-server/main.go @@ -20,6 +20,7 @@ import ( "os" "github.com/matrix-org/dendrite/appservice" + "github.com/matrix-org/dendrite/currentstateserver" "github.com/matrix-org/dendrite/eduserver" "github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/federationsender" @@ -122,6 +123,8 @@ func main() { logrus.WithError(err).Panicf("failed to connect to public rooms db") } + stateAPI := currentstateserver.NewInternalAPI(base.Cfg, base.KafkaConsumer) + monolith := setup.Monolith{ Config: base.Cfg, AccountDB: accountDB, @@ -137,6 +140,7 @@ func main() { FederationSenderAPI: fsAPI, RoomserverAPI: rsAPI, ServerKeyAPI: serverKeyAPI, + StateAPI: stateAPI, UserAPI: userAPI, PublicRoomsDB: publicRoomsDB, diff --git a/cmd/dendritejs/main.go b/cmd/dendritejs/main.go index 883b0fad0..11f339b0f 100644 --- a/cmd/dendritejs/main.go +++ b/cmd/dendritejs/main.go @@ -22,6 +22,7 @@ import ( "syscall/js" "github.com/matrix-org/dendrite/appservice" + "github.com/matrix-org/dendrite/currentstateserver" "github.com/matrix-org/dendrite/eduserver" "github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/federationsender" @@ -218,6 +219,8 @@ func main() { logrus.WithError(err).Panicf("failed to connect to public rooms db") } + stateAPI := currentstateserver.NewInternalAPI(base.Cfg, base.KafkaConsumer) + monolith := setup.Monolith{ Config: base.Cfg, AccountDB: accountDB, @@ -232,6 +235,7 @@ func main() { EDUInternalAPI: eduInputAPI, FederationSenderAPI: fedSenderAPI, RoomserverAPI: rsAPI, + StateAPI: stateAPI, UserAPI: userAPI, //ServerKeyAPI: serverKeyAPI, diff --git a/currentstateserver/api/api.go b/currentstateserver/api/api.go new file mode 100644 index 000000000..b16306ab0 --- /dev/null +++ b/currentstateserver/api/api.go @@ -0,0 +1,78 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +import ( + "context" + "encoding/json" + "fmt" + "strings" + + "github.com/matrix-org/gomatrixserverlib" +) + +type CurrentStateInternalAPI interface { + // QueryCurrentState retrieves the requested state events. If state events are not found, they will be missing from + // the response. + QueryCurrentState(ctx context.Context, req *QueryCurrentStateRequest, res *QueryCurrentStateResponse) error + // QueryRoomsForUser retrieves a list of room IDs matching the given query. + QueryRoomsForUser(ctx context.Context, req *QueryRoomsForUserRequest, res *QueryRoomsForUserResponse) error +} + +type QueryRoomsForUserRequest struct { + UserID string + // The desired membership of the user. If this is the empty string then no rooms are returned. + WantMembership string +} + +type QueryRoomsForUserResponse struct { + RoomIDs []string +} + +type QueryCurrentStateRequest struct { + RoomID string + StateTuples []gomatrixserverlib.StateKeyTuple +} + +type QueryCurrentStateResponse struct { + StateEvents map[gomatrixserverlib.StateKeyTuple]*gomatrixserverlib.HeaderedEvent +} + +// MarshalJSON stringifies the StateKeyTuple keys so they can be sent over the wire in HTTP API mode. +func (r *QueryCurrentStateResponse) MarshalJSON() ([]byte, error) { + se := make(map[string]*gomatrixserverlib.HeaderedEvent, len(r.StateEvents)) + for k, v := range r.StateEvents { + // use 0x1F (unit separator) as the delimiter between type/state key, + se[fmt.Sprintf("%s\x1F%s", k.EventType, k.StateKey)] = v + } + return json.Marshal(se) +} + +func (r *QueryCurrentStateResponse) UnmarshalJSON(data []byte) error { + res := make(map[string]*gomatrixserverlib.HeaderedEvent) + err := json.Unmarshal(data, &res) + if err != nil { + return err + } + r.StateEvents = make(map[gomatrixserverlib.StateKeyTuple]*gomatrixserverlib.HeaderedEvent, len(res)) + for k, v := range res { + fields := strings.Split(k, "\x1F") + r.StateEvents[gomatrixserverlib.StateKeyTuple{ + EventType: fields[0], + StateKey: fields[1], + }] = v + } + return nil +} diff --git a/currentstateserver/consumers/roomserver.go b/currentstateserver/consumers/roomserver.go new file mode 100644 index 000000000..9e2694b0c --- /dev/null +++ b/currentstateserver/consumers/roomserver.go @@ -0,0 +1,140 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumers + +import ( + "context" + "encoding/json" + + "github.com/Shopify/sarama" + "github.com/matrix-org/dendrite/currentstateserver/storage" + "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/gomatrixserverlib" + log "github.com/sirupsen/logrus" +) + +type OutputRoomEventConsumer struct { + rsConsumer *internal.ContinualConsumer + db storage.Database +} + +func NewOutputRoomEventConsumer(topicName string, kafkaConsumer sarama.Consumer, store storage.Database) *OutputRoomEventConsumer { + consumer := &internal.ContinualConsumer{ + Topic: topicName, + Consumer: kafkaConsumer, + PartitionStore: store, + } + s := &OutputRoomEventConsumer{ + rsConsumer: consumer, + db: store, + } + consumer.ProcessMessage = s.onMessage + + return s +} + +func (c *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { + // Parse out the event JSON + var output api.OutputEvent + if err := json.Unmarshal(msg.Value, &output); err != nil { + // If the message was invalid, log it and move on to the next message in the stream + log.WithError(err).Errorf("roomserver output log: message parse failure") + return nil + } + + switch output.Type { + case api.OutputTypeNewRoomEvent: + return c.onNewRoomEvent(context.TODO(), *output.NewRoomEvent) + case api.OutputTypeNewInviteEvent: + case api.OutputTypeRetireInviteEvent: + default: + log.WithField("type", output.Type).Debug( + "roomserver output log: ignoring unknown output type", + ) + } + return nil +} + +func (c *OutputRoomEventConsumer) onNewRoomEvent( + ctx context.Context, msg api.OutputNewRoomEvent, +) error { + ev := msg.Event + + addsStateEvents := msg.AddsState() + + ev, err := c.updateStateEvent(ev) + if err != nil { + return err + } + + for i := range addsStateEvents { + addsStateEvents[i], err = c.updateStateEvent(addsStateEvents[i]) + if err != nil { + return err + } + } + + err = c.db.StoreStateEvents( + ctx, + addsStateEvents, + msg.RemovesStateEventIDs, + ) + if err != nil { + // panic rather than continue with an inconsistent database + log.WithFields(log.Fields{ + "event": string(ev.JSON()), + log.ErrorKey: err, + "add": msg.AddsStateEventIDs, + "del": msg.RemovesStateEventIDs, + }).Panicf("roomserver output log: write event failure") + } + return nil +} + +// Start consuming from room servers +func (c *OutputRoomEventConsumer) Start() error { + return c.rsConsumer.Start() +} + +func (c *OutputRoomEventConsumer) updateStateEvent(event gomatrixserverlib.HeaderedEvent) (gomatrixserverlib.HeaderedEvent, error) { + var stateKey string + if event.StateKey() == nil { + stateKey = "" + } else { + stateKey = *event.StateKey() + } + + prevEvent, err := c.db.GetStateEvent( + context.TODO(), event.RoomID(), event.Type(), stateKey, + ) + if err != nil { + return event, err + } + + if prevEvent == nil { + return event, nil + } + + prev := types.PrevEventRef{ + PrevContent: prevEvent.Content(), + ReplacesState: prevEvent.EventID(), + PrevSender: prevEvent.Sender(), + } + + event.Event, err = event.SetUnsigned(prev) + return event, err +} diff --git a/currentstateserver/currentstateserver.go b/currentstateserver/currentstateserver.go new file mode 100644 index 000000000..07d5e54ad --- /dev/null +++ b/currentstateserver/currentstateserver.go @@ -0,0 +1,51 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package currentstateserver + +import ( + "github.com/Shopify/sarama" + "github.com/gorilla/mux" + "github.com/matrix-org/dendrite/currentstateserver/api" + "github.com/matrix-org/dendrite/currentstateserver/consumers" + "github.com/matrix-org/dendrite/currentstateserver/internal" + "github.com/matrix-org/dendrite/currentstateserver/inthttp" + "github.com/matrix-org/dendrite/currentstateserver/storage" + "github.com/matrix-org/dendrite/internal/config" + "github.com/sirupsen/logrus" +) + +// AddInternalRoutes registers HTTP handlers for the internal API. Invokes functions +// on the given input API. +func AddInternalRoutes(router *mux.Router, intAPI api.CurrentStateInternalAPI) { + inthttp.AddRoutes(router, intAPI) +} + +// NewInternalAPI returns a concrete implementation of the internal API. Callers +// can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes. +func NewInternalAPI(cfg *config.Dendrite, consumer sarama.Consumer) api.CurrentStateInternalAPI { + csDB, err := storage.NewDatabase(string(cfg.Database.CurrentState), cfg.DbProperties()) + if err != nil { + logrus.WithError(err).Panicf("failed to open database") + } + roomConsumer := consumers.NewOutputRoomEventConsumer( + string(cfg.Kafka.Topics.OutputRoomEvent), consumer, csDB, + ) + if err = roomConsumer.Start(); err != nil { + logrus.WithError(err).Panicf("failed to start room server consumer") + } + return &internal.CurrentStateInternalAPI{ + DB: csDB, + } +} diff --git a/currentstateserver/currentstateserver_test.go b/currentstateserver/currentstateserver_test.go new file mode 100644 index 000000000..a0627fea7 --- /dev/null +++ b/currentstateserver/currentstateserver_test.go @@ -0,0 +1,180 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package currentstateserver + +import ( + "context" + "encoding/json" + "net/http" + "reflect" + "testing" + "time" + + "github.com/Shopify/sarama" + "github.com/gorilla/mux" + "github.com/matrix-org/dendrite/currentstateserver/api" + "github.com/matrix-org/dendrite/currentstateserver/inthttp" + "github.com/matrix-org/dendrite/internal/config" + "github.com/matrix-org/dendrite/internal/httputil" + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/internal/test" + roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/naffka" +) + +var ( + testRoomVersion = gomatrixserverlib.RoomVersionV1 + testData = []json.RawMessage{ + []byte(`{"auth_events":[],"content":{"creator":"@userid:kaer.morhen"},"depth":0,"event_id":"$0ok8ynDp7kjc95e3:kaer.morhen","hashes":{"sha256":"17kPoH+h0Dk4Omn7Sus0qMb6+oGcf+CZFEgDhv7UKWs"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[],"prev_state":[],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"jP4a04f5/F10Pw95FPpdCyKAO44JOwUQ/MZOOeA/RTU1Dn+AHPMzGSaZnuGjRr/xQuADt+I3ctb5ZQfLKNzHDw"}},"state_key":"","type":"m.room.create"}`), + []byte(`{"auth_events":[["$0ok8ynDp7kjc95e3:kaer.morhen",{"sha256":"sWCi6Ckp9rDimQON+MrUlNRkyfZ2tjbPbWfg2NMB18Q"}]],"content":{"membership":"join"},"depth":1,"event_id":"$LEwEu0kxrtu5fOiS:kaer.morhen","hashes":{"sha256":"B7M88PhXf3vd1LaFtjQutFu4x/w7fHD28XKZ4sAsJTo"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$0ok8ynDp7kjc95e3:kaer.morhen",{"sha256":"sWCi6Ckp9rDimQON+MrUlNRkyfZ2tjbPbWfg2NMB18Q"}]],"prev_state":[],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"p2vqmuJn7ZBRImctSaKbXCAxCcBlIjPH9JHte1ouIUGy84gpu4eLipOvSBCLL26hXfC0Zrm4WUto6Hr+ohdrCg"}},"state_key":"@userid:kaer.morhen","type":"m.room.member"}`), + []byte(`{"auth_events":[["$0ok8ynDp7kjc95e3:kaer.morhen",{"sha256":"sWCi6Ckp9rDimQON+MrUlNRkyfZ2tjbPbWfg2NMB18Q"}],["$LEwEu0kxrtu5fOiS:kaer.morhen",{"sha256":"1aKajq6DWHru1R1HJjvdWMEavkJJHGaTmPvfuERUXaA"}]],"content":{"join_rule":"public"},"depth":2,"event_id":"$SMHlqUrNhhBBRLeN:kaer.morhen","hashes":{"sha256":"vIuJQvmMjrGxshAkj1SXe0C4RqvMbv4ZADDw9pFCWqQ"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$LEwEu0kxrtu5fOiS:kaer.morhen",{"sha256":"1aKajq6DWHru1R1HJjvdWMEavkJJHGaTmPvfuERUXaA"}]],"prev_state":[],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"hBMsb3Qppo3RaqqAl4JyTgaiWEbW5hlckATky6PrHun+F3YM203TzG7w9clwuQU5F5pZoB1a6nw+to0hN90FAw"}},"state_key":"","type":"m.room.join_rules"}`), + []byte(`{"auth_events":[["$0ok8ynDp7kjc95e3:kaer.morhen",{"sha256":"sWCi6Ckp9rDimQON+MrUlNRkyfZ2tjbPbWfg2NMB18Q"}],["$LEwEu0kxrtu5fOiS:kaer.morhen",{"sha256":"1aKajq6DWHru1R1HJjvdWMEavkJJHGaTmPvfuERUXaA"}]],"content":{"history_visibility":"shared"},"depth":3,"event_id":"$6F1yGIbO0J7TM93h:kaer.morhen","hashes":{"sha256":"Mr23GKSlZW7UCCYLgOWawI2Sg6KIoMjUWO2TDenuOgw"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$SMHlqUrNhhBBRLeN:kaer.morhen",{"sha256":"SylzE8U02I+6eyEHgL+FlU0L5YdqrVp8OOlxKS9VQW0"}]],"prev_state":[],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"sHLKrFI3hKGrEJfpMVZSDS3LvLasQsy50CTsOwru9XTVxgRsPo6wozNtRVjxo1J3Rk18RC9JppovmQ5VR5EcDw"}},"state_key":"","type":"m.room.history_visibility"}`), + []byte(`{"auth_events":[["$0ok8ynDp7kjc95e3:kaer.morhen",{"sha256":"sWCi6Ckp9rDimQON+MrUlNRkyfZ2tjbPbWfg2NMB18Q"}],["$LEwEu0kxrtu5fOiS:kaer.morhen",{"sha256":"1aKajq6DWHru1R1HJjvdWMEavkJJHGaTmPvfuERUXaA"}]],"content":{"ban":50,"events":null,"events_default":0,"invite":0,"kick":50,"redact":50,"state_default":50,"users":null,"users_default":0},"depth":4,"event_id":"$UKNe10XzYzG0TeA9:kaer.morhen","hashes":{"sha256":"ngbP3yja9U5dlckKerUs/fSOhtKxZMCVvsfhPURSS28"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$6F1yGIbO0J7TM93h:kaer.morhen",{"sha256":"A4CucrKSoWX4IaJXhq02mBg1sxIyZEftbC+5p3fZAvk"}]],"prev_state":[],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"zOmwlP01QL3yFchzuR9WHvogOoBZA3oVtNIF3lM0ZfDnqlSYZB9sns27G/4HVq0k7alaK7ZE3oGoCrVnMkPNCw"}},"state_key":"","type":"m.room.power_levels"}`), + // messages + []byte(`{"auth_events":[["$0ok8ynDp7kjc95e3:kaer.morhen",{"sha256":"sWCi6Ckp9rDimQON+MrUlNRkyfZ2tjbPbWfg2NMB18Q"}],["$LEwEu0kxrtu5fOiS:kaer.morhen",{"sha256":"1aKajq6DWHru1R1HJjvdWMEavkJJHGaTmPvfuERUXaA"}]],"content":{"body":"Test Message"},"depth":5,"event_id":"$gl2T9l3qm0kUbiIJ:kaer.morhen","hashes":{"sha256":"Qx3nRMHLDPSL5hBAzuX84FiSSP0K0Kju2iFoBWH4Za8"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$UKNe10XzYzG0TeA9:kaer.morhen",{"sha256":"KtSRyMjt0ZSjsv2koixTRCxIRCGoOp6QrKscsW97XRo"}]],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"sqDgv3EG7ml5VREzmT9aZeBpS4gAPNIaIeJOwqjDhY0GPU/BcpX5wY4R7hYLrNe5cChgV+eFy/GWm1Zfg5FfDg"}},"type":"m.room.message"}`), + []byte(`{"auth_events":[["$0ok8ynDp7kjc95e3:kaer.morhen",{"sha256":"sWCi6Ckp9rDimQON+MrUlNRkyfZ2tjbPbWfg2NMB18Q"}],["$LEwEu0kxrtu5fOiS:kaer.morhen",{"sha256":"1aKajq6DWHru1R1HJjvdWMEavkJJHGaTmPvfuERUXaA"}]],"content":{"body":"Test Message"},"depth":6,"event_id":"$MYSbs8m4rEbsCWXD:kaer.morhen","hashes":{"sha256":"kgbYM7v4Ud2YaBsjBTolM4ySg6rHcJNYI6nWhMSdFUA"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$gl2T9l3qm0kUbiIJ:kaer.morhen",{"sha256":"C/rD04h9wGxRdN2G/IBfrgoE1UovzLZ+uskwaKZ37/Q"}]],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"x0UoKh968jj/F5l1/R7Ew0T6CTKuew3PLNHASNxqck/bkNe8yYQiDHXRr+kZxObeqPZZTpaF1+EI+bLU9W8GDQ"}},"type":"m.room.message"}`), + []byte(`{"auth_events":[["$0ok8ynDp7kjc95e3:kaer.morhen",{"sha256":"sWCi6Ckp9rDimQON+MrUlNRkyfZ2tjbPbWfg2NMB18Q"}],["$LEwEu0kxrtu5fOiS:kaer.morhen",{"sha256":"1aKajq6DWHru1R1HJjvdWMEavkJJHGaTmPvfuERUXaA"}]],"content":{"body":"Test Message"},"depth":7,"event_id":"$N5x9WJkl9ClPrAEg:kaer.morhen","hashes":{"sha256":"FWM8oz4yquTunRZ67qlW2gzPDzdWfBP6RPHXhK1I/x8"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$MYSbs8m4rEbsCWXD:kaer.morhen",{"sha256":"fatqgW+SE8mb2wFn3UN+drmluoD4UJ/EcSrL6Ur9q1M"}]],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"Y+LX/xcyufoXMOIoqQBNOzy6lZfUGB1ffgXIrSugk6obMiyAsiRejHQN/pciZXsHKxMJLYRFAz4zSJoS/LGPAA"}},"type":"m.room.message"}`), + } + testEvents = []gomatrixserverlib.HeaderedEvent{} + testStateEvents = make(map[gomatrixserverlib.StateKeyTuple]gomatrixserverlib.HeaderedEvent) + + kafkaTopic = "room_events" +) + +func init() { + for _, j := range testData { + e, err := gomatrixserverlib.NewEventFromTrustedJSON(j, false, testRoomVersion) + if err != nil { + panic("cannot load test data: " + err.Error()) + } + h := e.Headered(testRoomVersion) + testEvents = append(testEvents, h) + if e.StateKey() != nil { + testStateEvents[gomatrixserverlib.StateKeyTuple{ + EventType: e.Type(), + StateKey: *e.StateKey(), + }] = h + } + } +} + +func MustWriteOutputEvent(t *testing.T, producer sarama.SyncProducer, out *roomserverAPI.OutputNewRoomEvent) error { + value, err := json.Marshal(roomserverAPI.OutputEvent{ + Type: roomserverAPI.OutputTypeNewRoomEvent, + NewRoomEvent: out, + }) + if err != nil { + t.Fatalf("failed to marshal output event: %s", err) + } + _, _, err = producer.SendMessage(&sarama.ProducerMessage{ + Topic: kafkaTopic, + Key: sarama.StringEncoder(out.Event.RoomID()), + Value: sarama.ByteEncoder(value), + }) + if err != nil { + t.Fatalf("failed to send message: %s", err) + } + return nil +} + +func MustMakeInternalAPI(t *testing.T) (api.CurrentStateInternalAPI, sarama.SyncProducer) { + cfg := &config.Dendrite{} + cfg.Kafka.Topics.OutputRoomEvent = config.Topic(kafkaTopic) + cfg.Database.CurrentState = config.DataSource("file::memory:") + db, err := sqlutil.Open(sqlutil.SQLiteDriverName(), "file::memory:", nil) + if err != nil { + t.Fatalf("Failed to open naffka database: %s", err) + } + naffkaDB, err := naffka.NewSqliteDatabase(db) + if err != nil { + t.Fatalf("Failed to setup naffka database: %s", err) + } + naff, err := naffka.New(naffkaDB) + if err != nil { + t.Fatalf("Failed to create naffka consumer: %s", err) + } + return NewInternalAPI(cfg, naff), naff +} + +func TestQueryCurrentState(t *testing.T) { + currStateAPI, producer := MustMakeInternalAPI(t) + plTuple := gomatrixserverlib.StateKeyTuple{ + EventType: "m.room.power_levels", + StateKey: "", + } + plEvent := testEvents[4] + MustWriteOutputEvent(t, producer, &roomserverAPI.OutputNewRoomEvent{ + Event: plEvent, + AddsStateEventIDs: []string{plEvent.EventID()}, + }) + // we have no good way to know /when/ the server has consumed the event + time.Sleep(100 * time.Millisecond) + + testCases := []struct { + req api.QueryCurrentStateRequest + wantRes api.QueryCurrentStateResponse + wantErr error + }{ + { + req: api.QueryCurrentStateRequest{ + RoomID: plEvent.RoomID(), + StateTuples: []gomatrixserverlib.StateKeyTuple{ + plTuple, + }, + }, + wantRes: api.QueryCurrentStateResponse{ + StateEvents: map[gomatrixserverlib.StateKeyTuple]*gomatrixserverlib.HeaderedEvent{ + plTuple: &plEvent, + }, + }, + }, + } + + runCases := func(testAPI api.CurrentStateInternalAPI) { + for _, tc := range testCases { + var gotRes api.QueryCurrentStateResponse + gotErr := testAPI.QueryCurrentState(context.TODO(), &tc.req, &gotRes) + if tc.wantErr == nil && gotErr != nil || tc.wantErr != nil && gotErr == nil { + t.Errorf("QueryCurrentState error, got %s want %s", gotErr, tc.wantErr) + continue + } + for tuple, wantEvent := range tc.wantRes.StateEvents { + gotEvent, ok := gotRes.StateEvents[tuple] + if !ok { + t.Errorf("QueryCurrentState want tuple %+v but it is missing from the response", tuple) + continue + } + if !reflect.DeepEqual(gotEvent.JSON(), wantEvent.JSON()) { + t.Errorf("QueryCurrentState tuple %+v got event JSON %s want %s", tuple, string(gotEvent.JSON()), string(wantEvent.JSON())) + } + } + } + } + t.Run("HTTP API", func(t *testing.T) { + router := mux.NewRouter().PathPrefix(httputil.InternalPathPrefix).Subrouter() + AddInternalRoutes(router, currStateAPI) + apiURL, cancel := test.ListenAndServe(t, router, false) + defer cancel() + httpAPI, err := inthttp.NewCurrentStateAPIClient(apiURL, &http.Client{}) + if err != nil { + t.Fatalf("failed to create HTTP client") + } + runCases(httpAPI) + }) + t.Run("Monolith", func(t *testing.T) { + runCases(currStateAPI) + }) +} diff --git a/currentstateserver/internal/api.go b/currentstateserver/internal/api.go new file mode 100644 index 000000000..85fbf51ef --- /dev/null +++ b/currentstateserver/internal/api.go @@ -0,0 +1,50 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "context" + + "github.com/matrix-org/dendrite/currentstateserver/api" + "github.com/matrix-org/dendrite/currentstateserver/storage" + "github.com/matrix-org/gomatrixserverlib" +) + +type CurrentStateInternalAPI struct { + DB storage.Database +} + +func (a *CurrentStateInternalAPI) QueryCurrentState(ctx context.Context, req *api.QueryCurrentStateRequest, res *api.QueryCurrentStateResponse) error { + res.StateEvents = make(map[gomatrixserverlib.StateKeyTuple]*gomatrixserverlib.HeaderedEvent) + for _, tuple := range req.StateTuples { + ev, err := a.DB.GetStateEvent(ctx, req.RoomID, tuple.EventType, tuple.StateKey) + if err != nil { + return err + } + if ev != nil { + res.StateEvents[tuple] = ev + } + } + return nil +} + +func (a *CurrentStateInternalAPI) QueryRoomsForUser(ctx context.Context, req *api.QueryRoomsForUserRequest, res *api.QueryRoomsForUserResponse) error { + roomIDs, err := a.DB.GetRoomsByMembership(ctx, req.UserID, req.WantMembership) + if err != nil { + return err + } + res.RoomIDs = roomIDs + return nil +} diff --git a/currentstateserver/inthttp/client.go b/currentstateserver/inthttp/client.go new file mode 100644 index 000000000..6fd9907bd --- /dev/null +++ b/currentstateserver/inthttp/client.go @@ -0,0 +1,75 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package inthttp + +import ( + "context" + "errors" + "net/http" + + "github.com/matrix-org/dendrite/currentstateserver/api" + "github.com/matrix-org/dendrite/internal/httputil" + "github.com/opentracing/opentracing-go" +) + +// HTTP paths for the internal HTTP APIs +const ( + QueryCurrentStatePath = "/currentstateserver/queryCurrentState" + QueryRoomsForUserPath = "/currentstateserver/queryRoomsForUser" +) + +// NewCurrentStateAPIClient creates a CurrentStateInternalAPI implemented by talking to a HTTP POST API. +// If httpClient is nil an error is returned +func NewCurrentStateAPIClient( + apiURL string, + httpClient *http.Client, +) (api.CurrentStateInternalAPI, error) { + if httpClient == nil { + return nil, errors.New("NewCurrentStateAPIClient: httpClient is ") + } + return &httpCurrentStateInternalAPI{ + apiURL: apiURL, + httpClient: httpClient, + }, nil +} + +type httpCurrentStateInternalAPI struct { + apiURL string + httpClient *http.Client +} + +func (h *httpCurrentStateInternalAPI) QueryCurrentState( + ctx context.Context, + request *api.QueryCurrentStateRequest, + response *api.QueryCurrentStateResponse, +) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "QueryCurrentState") + defer span.Finish() + + apiURL := h.apiURL + QueryCurrentStatePath + return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) +} + +func (h *httpCurrentStateInternalAPI) QueryRoomsForUser( + ctx context.Context, + request *api.QueryRoomsForUserRequest, + response *api.QueryRoomsForUserResponse, +) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "QueryRoomsForUser") + defer span.Finish() + + apiURL := h.apiURL + QueryRoomsForUserPath + return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) +} diff --git a/currentstateserver/inthttp/server.go b/currentstateserver/inthttp/server.go new file mode 100644 index 000000000..fa7ecb22e --- /dev/null +++ b/currentstateserver/inthttp/server.go @@ -0,0 +1,54 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package inthttp + +import ( + "encoding/json" + "net/http" + + "github.com/gorilla/mux" + "github.com/matrix-org/dendrite/currentstateserver/api" + "github.com/matrix-org/dendrite/internal/httputil" + "github.com/matrix-org/util" +) + +func AddRoutes(internalAPIMux *mux.Router, intAPI api.CurrentStateInternalAPI) { + internalAPIMux.Handle(QueryCurrentStatePath, + httputil.MakeInternalAPI("queryCurrentState", func(req *http.Request) util.JSONResponse { + request := api.QueryCurrentStateRequest{} + response := api.QueryCurrentStateResponse{} + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.MessageResponse(http.StatusBadRequest, err.Error()) + } + if err := intAPI.QueryCurrentState(req.Context(), &request, &response); err != nil { + return util.ErrorResponse(err) + } + return util.JSONResponse{Code: http.StatusOK, JSON: &response} + }), + ) + internalAPIMux.Handle(QueryRoomsForUserPath, + httputil.MakeInternalAPI("queryRoomsForUser", func(req *http.Request) util.JSONResponse { + request := api.QueryRoomsForUserRequest{} + response := api.QueryRoomsForUserResponse{} + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.MessageResponse(http.StatusBadRequest, err.Error()) + } + if err := intAPI.QueryRoomsForUser(req.Context(), &request, &response); err != nil { + return util.ErrorResponse(err) + } + return util.JSONResponse{Code: http.StatusOK, JSON: &response} + }), + ) +} diff --git a/currentstateserver/storage/interface.go b/currentstateserver/storage/interface.go new file mode 100644 index 000000000..dbf223f33 --- /dev/null +++ b/currentstateserver/storage/interface.go @@ -0,0 +1,34 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + + "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/gomatrixserverlib" +) + +type Database interface { + internal.PartitionStorer + // StoreStateEvents updates the database with new events from the roomserver. + StoreStateEvents(ctx context.Context, addStateEvents []gomatrixserverlib.HeaderedEvent, removeStateEventIDs []string) error + // GetStateEvent returns the state event of a given type for a given room with a given state key + // If no event could be found, returns nil + // If there was an issue during the retrieval, returns an error + GetStateEvent(ctx context.Context, roomID, evType, stateKey string) (*gomatrixserverlib.HeaderedEvent, error) + // GetRoomsByMembership returns a list of room IDs matching the provided membership and user ID (as state_key). + GetRoomsByMembership(ctx context.Context, userID, membership string) ([]string, error) +} diff --git a/currentstateserver/storage/postgres/current_room_state_table.go b/currentstateserver/storage/postgres/current_room_state_table.go new file mode 100644 index 000000000..95621913b --- /dev/null +++ b/currentstateserver/storage/postgres/current_room_state_table.go @@ -0,0 +1,208 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package postgres + +import ( + "context" + "database/sql" + "encoding/json" + "strconv" + + "github.com/lib/pq" + "github.com/matrix-org/dendrite/currentstateserver/storage/tables" + "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/gomatrixserverlib" +) + +var leaveEnum = strconv.Itoa(tables.MembershipToEnum["leave"]) + +var currentRoomStateSchema = ` +-- Stores the current room state for every room. +CREATE TABLE IF NOT EXISTS currentstate_current_room_state ( + -- The 'room_id' key for the state event. + room_id TEXT NOT NULL, + -- The state event ID + event_id TEXT NOT NULL, + -- The state event type e.g 'm.room.member' + type TEXT NOT NULL, + -- The 'sender' property of the event. + sender TEXT NOT NULL, + -- The state_key value for this state event e.g '' + state_key TEXT NOT NULL, + -- The JSON for the event. Stored as TEXT because this should be valid UTF-8. + headered_event_json TEXT NOT NULL, + -- The 'content.membership' enum value if this event is an m.room.member event. + membership SMALLINT NOT NULL DEFAULT 0, + -- Clobber based on 3-uple of room_id, type and state_key + CONSTRAINT currentstate_current_room_state_unique UNIQUE (room_id, type, state_key) +); +-- for event deletion +CREATE UNIQUE INDEX IF NOT EXISTS currentstate_event_id_idx ON currentstate_current_room_state(event_id, room_id, type, sender); +-- for querying membership states of users +CREATE INDEX IF NOT EXISTS currentstate_membership_idx ON currentstate_current_room_state(type, state_key, membership) +WHERE membership IS NOT NULL AND membership != ` + leaveEnum + `; +` + +const upsertRoomStateSQL = "" + + "INSERT INTO currentstate_current_room_state (room_id, event_id, type, sender, state_key, headered_event_json, membership)" + + " VALUES ($1, $2, $3, $4, $5, $6, $7)" + + " ON CONFLICT ON CONSTRAINT currentstate_current_room_state_unique" + + " DO UPDATE SET event_id = $2, sender=$4, headered_event_json = $6, membership = $7" + +const deleteRoomStateByEventIDSQL = "" + + "DELETE FROM currentstate_current_room_state WHERE event_id = $1" + +const selectRoomIDsWithMembershipSQL = "" + + "SELECT room_id FROM currentstate_current_room_state WHERE type = 'm.room.member' AND state_key = $1 AND membership = $2" + +const selectStateEventSQL = "" + + "SELECT headered_event_json FROM currentstate_current_room_state WHERE room_id = $1 AND type = $2 AND state_key = $3" + +const selectEventsWithEventIDsSQL = "" + + "SELECT headered_event_json FROM currentstate_current_room_state WHERE event_id = ANY($1)" + +type currentRoomStateStatements struct { + upsertRoomStateStmt *sql.Stmt + deleteRoomStateByEventIDStmt *sql.Stmt + selectRoomIDsWithMembershipStmt *sql.Stmt + selectEventsWithEventIDsStmt *sql.Stmt + selectStateEventStmt *sql.Stmt +} + +func NewPostgresCurrentRoomStateTable(db *sql.DB) (tables.CurrentRoomState, error) { + s := ¤tRoomStateStatements{} + _, err := db.Exec(currentRoomStateSchema) + if err != nil { + return nil, err + } + if s.upsertRoomStateStmt, err = db.Prepare(upsertRoomStateSQL); err != nil { + return nil, err + } + if s.deleteRoomStateByEventIDStmt, err = db.Prepare(deleteRoomStateByEventIDSQL); err != nil { + return nil, err + } + if s.selectRoomIDsWithMembershipStmt, err = db.Prepare(selectRoomIDsWithMembershipSQL); err != nil { + return nil, err + } + if s.selectEventsWithEventIDsStmt, err = db.Prepare(selectEventsWithEventIDsSQL); err != nil { + return nil, err + } + if s.selectStateEventStmt, err = db.Prepare(selectStateEventSQL); err != nil { + return nil, err + } + return s, nil +} + +// SelectRoomIDsWithMembership returns the list of room IDs which have the given user in the given membership state. +func (s *currentRoomStateStatements) SelectRoomIDsWithMembership( + ctx context.Context, + txn *sql.Tx, + userID string, + membershipEnum int, +) ([]string, error) { + stmt := sqlutil.TxStmt(txn, s.selectRoomIDsWithMembershipStmt) + rows, err := stmt.QueryContext(ctx, userID, membershipEnum) + if err != nil { + return nil, err + } + defer internal.CloseAndLogIfError(ctx, rows, "selectRoomIDsWithMembership: rows.close() failed") + + var result []string + for rows.Next() { + var roomID string + if err := rows.Scan(&roomID); err != nil { + return nil, err + } + result = append(result, roomID) + } + return result, rows.Err() +} + +func (s *currentRoomStateStatements) DeleteRoomStateByEventID( + ctx context.Context, txn *sql.Tx, eventID string, +) error { + stmt := sqlutil.TxStmt(txn, s.deleteRoomStateByEventIDStmt) + _, err := stmt.ExecContext(ctx, eventID) + return err +} + +func (s *currentRoomStateStatements) UpsertRoomState( + ctx context.Context, txn *sql.Tx, + event gomatrixserverlib.HeaderedEvent, membershipEnum int, +) error { + headeredJSON, err := json.Marshal(event) + if err != nil { + return err + } + + // upsert state event + stmt := sqlutil.TxStmt(txn, s.upsertRoomStateStmt) + _, err = stmt.ExecContext( + ctx, + event.RoomID(), + event.EventID(), + event.Type(), + event.Sender(), + *event.StateKey(), + headeredJSON, + membershipEnum, + ) + return err +} + +func (s *currentRoomStateStatements) SelectEventsWithEventIDs( + ctx context.Context, txn *sql.Tx, eventIDs []string, +) ([]gomatrixserverlib.HeaderedEvent, error) { + stmt := sqlutil.TxStmt(txn, s.selectEventsWithEventIDsStmt) + rows, err := stmt.QueryContext(ctx, pq.StringArray(eventIDs)) + if err != nil { + return nil, err + } + defer internal.CloseAndLogIfError(ctx, rows, "selectEventsWithEventIDs: rows.close() failed") + result := []gomatrixserverlib.HeaderedEvent{} + for rows.Next() { + var eventBytes []byte + if err := rows.Scan(&eventBytes); err != nil { + return nil, err + } + // TODO: Handle redacted events + var ev gomatrixserverlib.HeaderedEvent + if err := json.Unmarshal(eventBytes, &ev); err != nil { + return nil, err + } + result = append(result, ev) + } + return result, rows.Err() +} + +func (s *currentRoomStateStatements) SelectStateEvent( + ctx context.Context, roomID, evType, stateKey string, +) (*gomatrixserverlib.HeaderedEvent, error) { + stmt := s.selectStateEventStmt + var res []byte + err := stmt.QueryRowContext(ctx, roomID, evType, stateKey).Scan(&res) + if err == sql.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, err + } + var ev gomatrixserverlib.HeaderedEvent + if err = json.Unmarshal(res, &ev); err != nil { + return nil, err + } + return &ev, err +} diff --git a/currentstateserver/storage/postgres/storage.go b/currentstateserver/storage/postgres/storage.go new file mode 100644 index 000000000..f8edb94e6 --- /dev/null +++ b/currentstateserver/storage/postgres/storage.go @@ -0,0 +1,35 @@ +package postgres + +import ( + "database/sql" + + "github.com/matrix-org/dendrite/currentstateserver/storage/shared" + "github.com/matrix-org/dendrite/internal/sqlutil" +) + +type Database struct { + shared.Database + db *sql.DB + sqlutil.PartitionOffsetStatements +} + +// NewDatabase creates a new sync server database +func NewDatabase(dbDataSourceName string, dbProperties sqlutil.DbProperties) (*Database, error) { + var d Database + var err error + if d.db, err = sqlutil.Open("postgres", dbDataSourceName, dbProperties); err != nil { + return nil, err + } + if err = d.PartitionOffsetStatements.Prepare(d.db, "currentstate"); err != nil { + return nil, err + } + currRoomState, err := NewPostgresCurrentRoomStateTable(d.db) + if err != nil { + return nil, err + } + d.Database = shared.Database{ + DB: d.db, + CurrentRoomState: currRoomState, + } + return &d, nil +} diff --git a/currentstateserver/storage/shared/storage.go b/currentstateserver/storage/shared/storage.go new file mode 100644 index 000000000..d78b3e0ed --- /dev/null +++ b/currentstateserver/storage/shared/storage.go @@ -0,0 +1,78 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package shared + +import ( + "context" + "database/sql" + "fmt" + + "github.com/matrix-org/dendrite/currentstateserver/storage/tables" + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/gomatrixserverlib" +) + +type Database struct { + DB *sql.DB + CurrentRoomState tables.CurrentRoomState +} + +func (d *Database) GetStateEvent(ctx context.Context, roomID, evType, stateKey string) (*gomatrixserverlib.HeaderedEvent, error) { + return d.CurrentRoomState.SelectStateEvent(ctx, roomID, evType, stateKey) +} + +func (d *Database) StoreStateEvents(ctx context.Context, addStateEvents []gomatrixserverlib.HeaderedEvent, + removeStateEventIDs []string) error { + return sqlutil.WithTransaction(d.DB, func(txn *sql.Tx) error { + // remove first, then add, as we do not ever delete state, but do replace state which is a remove followed by an add. + for _, eventID := range removeStateEventIDs { + if err := d.CurrentRoomState.DeleteRoomStateByEventID(ctx, txn, eventID); err != nil { + return err + } + } + + for _, event := range addStateEvents { + if event.StateKey() == nil { + // ignore non state events + continue + } + var membershipEnum int + if event.Type() == "m.room.member" { + membership, err := event.Membership() + if err != nil { + return err + } + enum, ok := tables.MembershipToEnum[membership] + if !ok { + return fmt.Errorf("unknown membership: %s", membership) + } + membershipEnum = enum + } + + if err := d.CurrentRoomState.UpsertRoomState(ctx, txn, event, membershipEnum); err != nil { + return err + } + } + return nil + }) +} + +func (d *Database) GetRoomsByMembership(ctx context.Context, userID, membership string) ([]string, error) { + enum, ok := tables.MembershipToEnum[membership] + if !ok { + return nil, fmt.Errorf("unknown membership: %s", membership) + } + return d.CurrentRoomState.SelectRoomIDsWithMembership(ctx, nil, userID, enum) +} diff --git a/currentstateserver/storage/sqlite3/current_room_state_table.go b/currentstateserver/storage/sqlite3/current_room_state_table.go new file mode 100644 index 000000000..2e2b0e423 --- /dev/null +++ b/currentstateserver/storage/sqlite3/current_room_state_table.go @@ -0,0 +1,201 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlite3 + +import ( + "context" + "database/sql" + "encoding/json" + "strings" + + "github.com/matrix-org/dendrite/currentstateserver/storage/tables" + "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/gomatrixserverlib" +) + +const currentRoomStateSchema = ` +-- Stores the current room state for every room. +CREATE TABLE IF NOT EXISTS currentstate_current_room_state ( + room_id TEXT NOT NULL, + event_id TEXT NOT NULL, + type TEXT NOT NULL, + sender TEXT NOT NULL, + state_key TEXT NOT NULL, + headered_event_json TEXT NOT NULL, + membership INTEGER NOT NULL DEFAULT 0, + UNIQUE (room_id, type, state_key) +); +-- for event deletion +CREATE UNIQUE INDEX IF NOT EXISTS currentstate_event_id_idx ON currentstate_current_room_state(event_id, room_id, type, sender); +-- for querying membership states of users +-- CREATE INDEX IF NOT EXISTS currentstate_membership_idx ON currentstate_current_room_state(type, state_key, membership) WHERE membership IS NOT NULL AND membership != 'leave'; +` + +const upsertRoomStateSQL = "" + + "INSERT INTO currentstate_current_room_state (room_id, event_id, type, sender, state_key, headered_event_json, membership)" + + " VALUES ($1, $2, $3, $4, $5, $6, $7)" + + " ON CONFLICT (event_id, room_id, type, sender)" + + " DO UPDATE SET event_id = $2, sender=$4, headered_event_json = $6, membership = $7" + +const deleteRoomStateByEventIDSQL = "" + + "DELETE FROM currentstate_current_room_state WHERE event_id = $1" + +const selectRoomIDsWithMembershipSQL = "" + + "SELECT room_id FROM currentstate_current_room_state WHERE type = 'm.room.member' AND state_key = $1 AND membership = $2" + +const selectStateEventSQL = "" + + "SELECT headered_event_json FROM currentstate_current_room_state WHERE room_id = $1 AND type = $2 AND state_key = $3" + +const selectEventsWithEventIDsSQL = "" + + // TODO: The session_id and transaction_id blanks are here because otherwise + // the rowsToStreamEvents expects there to be exactly five columns. We need to + // figure out if these really need to be in the DB, and if so, we need a + // better permanent fix for this. - neilalexander, 2 Jan 2020 + "SELECT added_at, headered_event_json, 0 AS session_id, false AS exclude_from_sync, '' AS transaction_id" + + " FROM currentstate_current_room_state WHERE event_id IN ($1)" + +type currentRoomStateStatements struct { + upsertRoomStateStmt *sql.Stmt + deleteRoomStateByEventIDStmt *sql.Stmt + selectRoomIDsWithMembershipStmt *sql.Stmt + selectStateEventStmt *sql.Stmt +} + +func NewSqliteCurrentRoomStateTable(db *sql.DB) (tables.CurrentRoomState, error) { + s := ¤tRoomStateStatements{} + _, err := db.Exec(currentRoomStateSchema) + if err != nil { + return nil, err + } + if s.upsertRoomStateStmt, err = db.Prepare(upsertRoomStateSQL); err != nil { + return nil, err + } + if s.deleteRoomStateByEventIDStmt, err = db.Prepare(deleteRoomStateByEventIDSQL); err != nil { + return nil, err + } + if s.selectRoomIDsWithMembershipStmt, err = db.Prepare(selectRoomIDsWithMembershipSQL); err != nil { + return nil, err + } + if s.selectStateEventStmt, err = db.Prepare(selectStateEventSQL); err != nil { + return nil, err + } + return s, nil +} + +// SelectRoomIDsWithMembership returns the list of room IDs which have the given user in the given membership state. +func (s *currentRoomStateStatements) SelectRoomIDsWithMembership( + ctx context.Context, + txn *sql.Tx, + userID string, + membershipEnum int, +) ([]string, error) { + stmt := sqlutil.TxStmt(txn, s.selectRoomIDsWithMembershipStmt) + rows, err := stmt.QueryContext(ctx, userID, membershipEnum) + if err != nil { + return nil, err + } + defer internal.CloseAndLogIfError(ctx, rows, "selectRoomIDsWithMembership: rows.close() failed") + + var result []string + for rows.Next() { + var roomID string + if err := rows.Scan(&roomID); err != nil { + return nil, err + } + result = append(result, roomID) + } + return result, nil +} + +func (s *currentRoomStateStatements) DeleteRoomStateByEventID( + ctx context.Context, txn *sql.Tx, eventID string, +) error { + stmt := sqlutil.TxStmt(txn, s.deleteRoomStateByEventIDStmt) + _, err := stmt.ExecContext(ctx, eventID) + return err +} + +func (s *currentRoomStateStatements) UpsertRoomState( + ctx context.Context, txn *sql.Tx, + event gomatrixserverlib.HeaderedEvent, membershipEnum int, +) error { + headeredJSON, err := json.Marshal(event) + if err != nil { + return err + } + + // upsert state event + stmt := sqlutil.TxStmt(txn, s.upsertRoomStateStmt) + _, err = stmt.ExecContext( + ctx, + event.RoomID(), + event.EventID(), + event.Type(), + event.Sender(), + *event.StateKey(), + headeredJSON, + membershipEnum, + ) + return err +} + +func (s *currentRoomStateStatements) SelectEventsWithEventIDs( + ctx context.Context, txn *sql.Tx, eventIDs []string, +) ([]gomatrixserverlib.HeaderedEvent, error) { + iEventIDs := make([]interface{}, len(eventIDs)) + for k, v := range eventIDs { + iEventIDs[k] = v + } + query := strings.Replace(selectEventsWithEventIDsSQL, "($1)", sqlutil.QueryVariadic(len(iEventIDs)), 1) + rows, err := txn.QueryContext(ctx, query, iEventIDs...) + if err != nil { + return nil, err + } + defer internal.CloseAndLogIfError(ctx, rows, "selectEventsWithEventIDs: rows.close() failed") + result := []gomatrixserverlib.HeaderedEvent{} + for rows.Next() { + var eventBytes []byte + if err := rows.Scan(&eventBytes); err != nil { + return nil, err + } + // TODO: Handle redacted events + var ev gomatrixserverlib.HeaderedEvent + if err := json.Unmarshal(eventBytes, &ev); err != nil { + return nil, err + } + result = append(result, ev) + } + return result, nil +} + +func (s *currentRoomStateStatements) SelectStateEvent( + ctx context.Context, roomID, evType, stateKey string, +) (*gomatrixserverlib.HeaderedEvent, error) { + stmt := s.selectStateEventStmt + var res []byte + err := stmt.QueryRowContext(ctx, roomID, evType, stateKey).Scan(&res) + if err == sql.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, err + } + var ev gomatrixserverlib.HeaderedEvent + if err = json.Unmarshal(res, &ev); err != nil { + return nil, err + } + return &ev, err +} diff --git a/currentstateserver/storage/sqlite3/storage.go b/currentstateserver/storage/sqlite3/storage.go new file mode 100644 index 000000000..6975e40ba --- /dev/null +++ b/currentstateserver/storage/sqlite3/storage.go @@ -0,0 +1,39 @@ +package sqlite3 + +import ( + "database/sql" + + "github.com/matrix-org/dendrite/currentstateserver/storage/shared" + "github.com/matrix-org/dendrite/internal/sqlutil" +) + +type Database struct { + shared.Database + db *sql.DB + sqlutil.PartitionOffsetStatements +} + +// NewDatabase creates a new sync server database +// nolint: gocyclo +func NewDatabase(dataSourceName string) (*Database, error) { + var d Database + cs, err := sqlutil.ParseFileURI(dataSourceName) + if err != nil { + return nil, err + } + if d.db, err = sqlutil.Open(sqlutil.SQLiteDriverName(), cs, nil); err != nil { + return nil, err + } + if err = d.PartitionOffsetStatements.Prepare(d.db, "currentstate"); err != nil { + return nil, err + } + currRoomState, err := NewSqliteCurrentRoomStateTable(d.db) + if err != nil { + return nil, err + } + d.Database = shared.Database{ + DB: d.db, + CurrentRoomState: currRoomState, + } + return &d, nil +} diff --git a/currentstateserver/storage/storage.go b/currentstateserver/storage/storage.go new file mode 100644 index 000000000..ad04cf414 --- /dev/null +++ b/currentstateserver/storage/storage.go @@ -0,0 +1,41 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build !wasm + +package storage + +import ( + "net/url" + + "github.com/matrix-org/dendrite/currentstateserver/storage/postgres" + "github.com/matrix-org/dendrite/currentstateserver/storage/sqlite3" + "github.com/matrix-org/dendrite/internal/sqlutil" +) + +// NewDatabase opens a database connection. +func NewDatabase(dataSourceName string, dbProperties sqlutil.DbProperties) (Database, error) { + uri, err := url.Parse(dataSourceName) + if err != nil { + return postgres.NewDatabase(dataSourceName, dbProperties) + } + switch uri.Scheme { + case "postgres": + return postgres.NewDatabase(dataSourceName, dbProperties) + case "file": + return sqlite3.NewDatabase(dataSourceName) + default: + return postgres.NewDatabase(dataSourceName, dbProperties) + } +} diff --git a/currentstateserver/storage/storage_wasm.go b/currentstateserver/storage/storage_wasm.go new file mode 100644 index 000000000..aa46c44df --- /dev/null +++ b/currentstateserver/storage/storage_wasm.go @@ -0,0 +1,42 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "fmt" + "net/url" + + "github.com/matrix-org/dendrite/currentstateserver/storage/sqlite3" + "github.com/matrix-org/dendrite/internal/sqlutil" +) + +// NewDatabase opens a database connection. +func NewDatabase( + dataSourceName string, + dbProperties sqlutil.DbProperties, // nolint:unparam +) (Database, error) { + uri, err := url.Parse(dataSourceName) + if err != nil { + return nil, fmt.Errorf("Cannot use postgres implementation") + } + switch uri.Scheme { + case "postgres": + return nil, fmt.Errorf("Cannot use postgres implementation") + case "file": + return sqlite3.NewDatabase(dataSourceName) + default: + return nil, fmt.Errorf("Cannot use postgres implementation") + } +} diff --git a/currentstateserver/storage/tables/interface.go b/currentstateserver/storage/tables/interface.go new file mode 100644 index 000000000..f2c8b14ed --- /dev/null +++ b/currentstateserver/storage/tables/interface.go @@ -0,0 +1,44 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tables + +import ( + "context" + "database/sql" + + "github.com/matrix-org/gomatrixserverlib" +) + +var MembershipToEnum = map[string]int{ + gomatrixserverlib.Invite: 1, + gomatrixserverlib.Join: 2, + gomatrixserverlib.Leave: 3, + gomatrixserverlib.Ban: 4, +} +var EnumToMembership = map[int]string{ + 1: gomatrixserverlib.Invite, + 2: gomatrixserverlib.Join, + 3: gomatrixserverlib.Leave, + 4: gomatrixserverlib.Ban, +} + +type CurrentRoomState interface { + SelectStateEvent(ctx context.Context, roomID, evType, stateKey string) (*gomatrixserverlib.HeaderedEvent, error) + SelectEventsWithEventIDs(ctx context.Context, txn *sql.Tx, eventIDs []string) ([]gomatrixserverlib.HeaderedEvent, error) + UpsertRoomState(ctx context.Context, txn *sql.Tx, event gomatrixserverlib.HeaderedEvent, membershipEnum int) error + DeleteRoomStateByEventID(ctx context.Context, txn *sql.Tx, eventID string) error + // SelectRoomIDsWithMembership returns the list of room IDs which have the given user in the given membership state. + SelectRoomIDsWithMembership(ctx context.Context, txn *sql.Tx, userID string, membershipEnum int) ([]string, error) +} diff --git a/dendrite-config.yaml b/dendrite-config.yaml index 73bfec247..70c8f7958 100644 --- a/dendrite-config.yaml +++ b/dendrite-config.yaml @@ -121,6 +121,7 @@ database: federation_sender: "postgres://dendrite:itsasecret@localhost/dendrite_federationsender?sslmode=disable" appservice: "postgres://dendrite:itsasecret@localhost/dendrite_appservice?sslmode=disable" public_rooms_api: "postgres://dendrite:itsasecret@localhost/dendrite_publicroomsapi?sslmode=disable" + current_state: "postgres://dendrite:itsasecret@localhost/dendrite_currentstate?sslmode=disable" max_open_conns: 100 max_idle_conns: 2 conn_max_lifetime: -1 @@ -143,6 +144,7 @@ listen: key_server: "localhost:7779" server_key_api: "localhost:7780" user_api: "localhost:7781" + current_state_server: "localhost:7782" # The configuration for tracing the dendrite components. tracing: diff --git a/internal/config/config.go b/internal/config/config.go index baa82be23..8275fc478 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -160,10 +160,13 @@ type Dendrite struct { // Postgres Config Database struct { // The Account database stores the login details and account information - // for local users. It is accessed by the ClientAPI. + // for local users. It is accessed by the UserAPI. Account DataSource `yaml:"account"` + // The CurrentState database stores the current state of all rooms. + // It is accessed by the CurrentStateServer. + CurrentState DataSource `yaml:"current_state"` // The Device database stores session information for the devices of logged - // in local users. It is accessed by the ClientAPI, the MediaAPI and the SyncAPI. + // in local users. It is accessed by the UserAPI. Device DataSource `yaml:"device"` // The MediaAPI database stores information about files uploaded and downloaded // by local users. It is only accessed by the MediaAPI. @@ -222,6 +225,7 @@ type Dendrite struct { Bind struct { MediaAPI Address `yaml:"media_api"` ClientAPI Address `yaml:"client_api"` + CurrentState Address `yaml:"current_state_server"` FederationAPI Address `yaml:"federation_api"` ServerKeyAPI Address `yaml:"server_key_api"` AppServiceAPI Address `yaml:"appservice_api"` @@ -238,6 +242,7 @@ type Dendrite struct { Listen struct { MediaAPI Address `yaml:"media_api"` ClientAPI Address `yaml:"client_api"` + CurrentState Address `yaml:"current_state_server"` FederationAPI Address `yaml:"federation_api"` ServerKeyAPI Address `yaml:"server_key_api"` AppServiceAPI Address `yaml:"appservice_api"` @@ -601,6 +606,7 @@ func (config *Dendrite) checkDatabase(configErrs *configErrors) { checkNotEmpty(configErrs, "database.media_api", string(config.Database.MediaAPI)) checkNotEmpty(configErrs, "database.sync_api", string(config.Database.SyncAPI)) checkNotEmpty(configErrs, "database.room_server", string(config.Database.RoomServer)) + checkNotEmpty(configErrs, "database.current_state", string(config.Database.CurrentState)) } // checkListen verifies the parameters listen.* are valid. @@ -613,6 +619,7 @@ func (config *Dendrite) checkListen(configErrs *configErrors) { checkNotEmpty(configErrs, "listen.edu_server", string(config.Listen.EDUServer)) checkNotEmpty(configErrs, "listen.server_key_api", string(config.Listen.EDUServer)) checkNotEmpty(configErrs, "listen.user_api", string(config.Listen.UserAPI)) + checkNotEmpty(configErrs, "listen.current_state_server", string(config.Listen.CurrentState)) } // checkLogging verifies the parameters logging.* are valid. @@ -735,6 +742,15 @@ func (config *Dendrite) UserAPIURL() string { return "http://" + string(config.Listen.UserAPI) } +// CurrentStateAPIURL returns an HTTP URL for where the currentstateserver is listening. +func (config *Dendrite) CurrentStateAPIURL() string { + // Hard code the currentstateserver to talk HTTP for now. + // If we support HTTPS we need to think of a practical way to do certificate validation. + // People setting up servers shouldn't need to get a certificate valid for the public + // internet for an internal API. + return "http://" + string(config.Listen.CurrentState) +} + // EDUServerURL returns an HTTP URL for where the EDU server is listening. func (config *Dendrite) EDUServerURL() string { // Hard code the EDU server to talk HTTP for now. @@ -753,7 +769,7 @@ func (config *Dendrite) FederationSenderURL() string { return "http://" + string(config.Listen.FederationSender) } -// FederationSenderURL returns an HTTP URL for where the federation sender is listening. +// ServerKeyAPIURL returns an HTTP URL for where the federation sender is listening. func (config *Dendrite) ServerKeyAPIURL() string { // Hard code the server key API server to talk HTTP for now. // If we support HTTPS we need to think of a practical way to do certificate validation. diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 9a543e763..9b776a50f 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -55,6 +55,7 @@ database: sync_api: "postgresql:///syn_api" room_server: "postgresql:///room_server" appservice: "postgresql:///appservice" + current_state: "postgresql:///current_state" listen: room_server: "localhost:7770" client_api: "localhost:7771" @@ -64,6 +65,7 @@ listen: appservice_api: "localhost:7777" edu_server: "localhost:7778" user_api: "localhost:7779" + current_state_server: "localhost:7775" logging: - type: "file" level: "info" diff --git a/internal/setup/base.go b/internal/setup/base.go index 66424a609..ddf8e0fad 100644 --- a/internal/setup/base.go +++ b/internal/setup/base.go @@ -22,6 +22,7 @@ import ( "net/url" "time" + currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api" "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/internal/sqlutil" @@ -37,6 +38,7 @@ import ( appserviceAPI "github.com/matrix-org/dendrite/appservice/api" asinthttp "github.com/matrix-org/dendrite/appservice/inthttp" + currentstateinthttp "github.com/matrix-org/dendrite/currentstateserver/inthttp" eduServerAPI "github.com/matrix-org/dendrite/eduserver/api" eduinthttp "github.com/matrix-org/dendrite/eduserver/inthttp" federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" @@ -171,6 +173,15 @@ func (b *BaseDendrite) UserAPIClient() userapi.UserInternalAPI { return userAPI } +// CurrentStateAPIClient returns CurrentStateInternalAPI for hitting the currentstateserver over HTTP. +func (b *BaseDendrite) CurrentStateAPIClient() currentstateAPI.CurrentStateInternalAPI { + stateAPI, err := currentstateinthttp.NewCurrentStateAPIClient(b.Cfg.CurrentStateAPIURL(), b.httpClient) + if err != nil { + logrus.WithError(err).Panic("UserAPIClient failed", b.httpClient) + } + return stateAPI +} + // EDUServerClient returns EDUServerInputAPI for hitting the EDU server over HTTP func (b *BaseDendrite) EDUServerClient() eduServerAPI.EDUServerInputAPI { e, err := eduinthttp.NewEDUServerClient(b.Cfg.EDUServerURL(), b.httpClient) diff --git a/internal/setup/monolith.go b/internal/setup/monolith.go index 24bee9502..86275e28d 100644 --- a/internal/setup/monolith.go +++ b/internal/setup/monolith.go @@ -19,6 +19,7 @@ import ( "github.com/gorilla/mux" appserviceAPI "github.com/matrix-org/dendrite/appservice/api" "github.com/matrix-org/dendrite/clientapi" + currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api" eduServerAPI "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/federationapi" federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" @@ -56,6 +57,7 @@ type Monolith struct { RoomserverAPI roomserverAPI.RoomserverInternalAPI ServerKeyAPI serverKeyAPI.ServerKeyInternalAPI UserAPI userapi.UserInternalAPI + StateAPI currentstateAPI.CurrentStateInternalAPI // TODO: can we remove this? It's weird that we are required the database // yet every other component can do that on its own. libp2p-demo uses a custom @@ -69,9 +71,9 @@ type Monolith struct { // AddAllPublicRoutes attaches all public paths to the given router func (m *Monolith) AddAllPublicRoutes(publicMux *mux.Router) { clientapi.AddPublicRoutes( - publicMux, m.Config, m.KafkaConsumer, m.KafkaProducer, m.DeviceDB, m.AccountDB, + publicMux, m.Config, m.KafkaProducer, m.DeviceDB, m.AccountDB, m.FedClient, m.RoomserverAPI, - m.EDUInternalAPI, m.AppserviceAPI, transactions.New(), + m.EDUInternalAPI, m.AppserviceAPI, m.StateAPI, transactions.New(), m.FederationSenderAPI, m.UserAPI, ) diff --git a/userapi/storage/accounts/interface.go b/userapi/storage/accounts/interface.go index 9ed33e1b9..6f6caf111 100644 --- a/userapi/storage/accounts/interface.go +++ b/userapi/storage/accounts/interface.go @@ -22,7 +22,6 @@ import ( "github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/userapi/api" - "github.com/matrix-org/gomatrixserverlib" ) type Database interface { @@ -36,10 +35,6 @@ type Database interface { // account already exists, it will return nil, ErrUserExists. CreateAccount(ctx context.Context, localpart, plaintextPassword, appserviceID string) (*api.Account, error) CreateGuestAccount(ctx context.Context) (*api.Account, error) - UpdateMemberships(ctx context.Context, eventsToAdd []gomatrixserverlib.Event, idsToRemove []string) error - GetMembershipInRoomByLocalpart(ctx context.Context, localpart, roomID string) (authtypes.Membership, error) - GetRoomIDsByLocalPart(ctx context.Context, localpart string) ([]string, error) - GetMembershipsByLocalpart(ctx context.Context, localpart string) (memberships []authtypes.Membership, err error) SaveAccountData(ctx context.Context, localpart, roomID, dataType string, content json.RawMessage) error GetAccountData(ctx context.Context, localpart string) (global map[string]json.RawMessage, rooms map[string]map[string]json.RawMessage, err error) // GetAccountDataByType returns account data matching a given diff --git a/userapi/storage/accounts/postgres/membership_table.go b/userapi/storage/accounts/postgres/membership_table.go deleted file mode 100644 index 623530acc..000000000 --- a/userapi/storage/accounts/postgres/membership_table.go +++ /dev/null @@ -1,159 +0,0 @@ -// Copyright 2017 Vector Creations Ltd -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package postgres - -import ( - "context" - "database/sql" - - "github.com/lib/pq" - "github.com/matrix-org/dendrite/clientapi/auth/authtypes" - "github.com/matrix-org/dendrite/internal" -) - -const membershipSchema = ` --- Stores data about users memberships to rooms. -CREATE TABLE IF NOT EXISTS account_memberships ( - -- The Matrix user ID localpart for the member - localpart TEXT NOT NULL, - -- The room this user is a member of - room_id TEXT NOT NULL, - -- The ID of the join membership event - event_id TEXT NOT NULL, - - -- A user can only be member of a room once - PRIMARY KEY (localpart, room_id) -); - --- Use index to process deletion by ID more efficiently -CREATE UNIQUE INDEX IF NOT EXISTS account_membership_event_id ON account_memberships(event_id); -` - -const insertMembershipSQL = ` - INSERT INTO account_memberships(localpart, room_id, event_id) VALUES ($1, $2, $3) - ON CONFLICT (localpart, room_id) DO UPDATE SET event_id = EXCLUDED.event_id -` - -const selectMembershipsByLocalpartSQL = "" + - "SELECT room_id, event_id FROM account_memberships WHERE localpart = $1" - -const selectMembershipInRoomByLocalpartSQL = "" + - "SELECT event_id FROM account_memberships WHERE localpart = $1 AND room_id = $2" - -const selectRoomIDsByLocalPartSQL = "" + - "SELECT room_id FROM account_memberships WHERE localpart = $1" - -const deleteMembershipsByEventIDsSQL = "" + - "DELETE FROM account_memberships WHERE event_id = ANY($1)" - -type membershipStatements struct { - deleteMembershipsByEventIDsStmt *sql.Stmt - insertMembershipStmt *sql.Stmt - selectMembershipInRoomByLocalpartStmt *sql.Stmt - selectMembershipsByLocalpartStmt *sql.Stmt - selectRoomIDsByLocalPartStmt *sql.Stmt -} - -func (s *membershipStatements) prepare(db *sql.DB) (err error) { - _, err = db.Exec(membershipSchema) - if err != nil { - return - } - if s.deleteMembershipsByEventIDsStmt, err = db.Prepare(deleteMembershipsByEventIDsSQL); err != nil { - return - } - if s.insertMembershipStmt, err = db.Prepare(insertMembershipSQL); err != nil { - return - } - if s.selectMembershipInRoomByLocalpartStmt, err = db.Prepare(selectMembershipInRoomByLocalpartSQL); err != nil { - return - } - if s.selectMembershipsByLocalpartStmt, err = db.Prepare(selectMembershipsByLocalpartSQL); err != nil { - return - } - if s.selectRoomIDsByLocalPartStmt, err = db.Prepare(selectRoomIDsByLocalPartSQL); err != nil { - return - } - return -} - -func (s *membershipStatements) insertMembership( - ctx context.Context, txn *sql.Tx, localpart, roomID, eventID string, -) (err error) { - stmt := txn.Stmt(s.insertMembershipStmt) - _, err = stmt.ExecContext(ctx, localpart, roomID, eventID) - return -} - -func (s *membershipStatements) deleteMembershipsByEventIDs( - ctx context.Context, txn *sql.Tx, eventIDs []string, -) (err error) { - stmt := txn.Stmt(s.deleteMembershipsByEventIDsStmt) - _, err = stmt.ExecContext(ctx, pq.StringArray(eventIDs)) - return -} - -func (s *membershipStatements) selectMembershipInRoomByLocalpart( - ctx context.Context, localpart, roomID string, -) (authtypes.Membership, error) { - membership := authtypes.Membership{Localpart: localpart, RoomID: roomID} - stmt := s.selectMembershipInRoomByLocalpartStmt - err := stmt.QueryRowContext(ctx, localpart, roomID).Scan(&membership.EventID) - - return membership, err -} - -func (s *membershipStatements) selectMembershipsByLocalpart( - ctx context.Context, localpart string, -) (memberships []authtypes.Membership, err error) { - stmt := s.selectMembershipsByLocalpartStmt - rows, err := stmt.QueryContext(ctx, localpart) - if err != nil { - return - } - - memberships = []authtypes.Membership{} - - defer internal.CloseAndLogIfError(ctx, rows, "selectMembershipsByLocalpart: rows.close() failed") - for rows.Next() { - var m authtypes.Membership - m.Localpart = localpart - if err = rows.Scan(&m.RoomID, &m.EventID); err != nil { - return - } - memberships = append(memberships, m) - } - return memberships, rows.Err() -} - -func (s *membershipStatements) selectRoomIDsByLocalPart( - ctx context.Context, localPart string, -) ([]string, error) { - stmt := s.selectRoomIDsByLocalPartStmt - rows, err := stmt.QueryContext(ctx, localPart) - if err != nil { - return nil, err - } - roomIDs := []string{} - defer rows.Close() // nolint: errcheck - for rows.Next() { - var roomID string - if err = rows.Scan(&roomID); err != nil { - return nil, err - } - roomIDs = append(roomIDs, roomID) - } - return roomIDs, rows.Err() -} diff --git a/userapi/storage/accounts/postgres/storage.go b/userapi/storage/accounts/postgres/storage.go index f0b11bfdb..c76b92f10 100644 --- a/userapi/storage/accounts/postgres/storage.go +++ b/userapi/storage/accounts/postgres/storage.go @@ -37,7 +37,6 @@ type Database struct { sqlutil.PartitionOffsetStatements accounts accountsStatements profiles profilesStatements - memberships membershipStatements accountDatas accountDataStatements threepids threepidStatements serverName gomatrixserverlib.ServerName @@ -62,10 +61,6 @@ func NewDatabase(dataSourceName string, dbProperties sqlutil.DbProperties, serve if err = p.prepare(db); err != nil { return nil, err } - m := membershipStatements{} - if err = m.prepare(db); err != nil { - return nil, err - } ac := accountDataStatements{} if err = ac.prepare(db); err != nil { return nil, err @@ -74,7 +69,7 @@ func NewDatabase(dataSourceName string, dbProperties sqlutil.DbProperties, serve if err = t.prepare(db); err != nil { return nil, err } - return &Database{db, partitions, a, p, m, ac, t, serverName}, nil + return &Database{db, partitions, a, p, ac, t, serverName}, nil } // GetAccountByPassword returns the account associated with the given localpart and password. @@ -179,112 +174,6 @@ func (d *Database) createAccount( return d.accounts.insertAccount(ctx, txn, localpart, hash, appserviceID) } -// SaveMembership saves the user matching a given localpart as a member of a given -// room. It also stores the ID of the membership event. -// If a membership already exists between the user and the room, or if the -// insert fails, returns the SQL error -func (d *Database) saveMembership( - ctx context.Context, txn *sql.Tx, localpart, roomID, eventID string, -) error { - return d.memberships.insertMembership(ctx, txn, localpart, roomID, eventID) -} - -// removeMembershipsByEventIDs removes the memberships corresponding to the -// `join` membership events IDs in the eventIDs slice. -// If the removal fails, or if there is no membership to remove, returns an error -func (d *Database) removeMembershipsByEventIDs( - ctx context.Context, txn *sql.Tx, eventIDs []string, -) error { - return d.memberships.deleteMembershipsByEventIDs(ctx, txn, eventIDs) -} - -// UpdateMemberships adds the "join" membership events included in a given state -// events array, and removes those which ID is included in a given array of events -// IDs. All of the process is run in a transaction, which commits only once/if every -// insertion and deletion has been successfully processed. -// Returns a SQL error if there was an issue with any part of the process -func (d *Database) UpdateMemberships( - ctx context.Context, eventsToAdd []gomatrixserverlib.Event, idsToRemove []string, -) error { - return sqlutil.WithTransaction(d.db, func(txn *sql.Tx) error { - if err := d.removeMembershipsByEventIDs(ctx, txn, idsToRemove); err != nil { - return err - } - - for _, event := range eventsToAdd { - if err := d.newMembership(ctx, txn, event); err != nil { - return err - } - } - - return nil - }) -} - -// GetMembershipInRoomByLocalpart returns the membership for an user -// matching the given localpart if he is a member of the room matching roomID, -// if not sql.ErrNoRows is returned. -// If there was an issue during the retrieval, returns the SQL error -func (d *Database) GetMembershipInRoomByLocalpart( - ctx context.Context, localpart, roomID string, -) (authtypes.Membership, error) { - return d.memberships.selectMembershipInRoomByLocalpart(ctx, localpart, roomID) -} - -// GetRoomIDsByLocalPart returns an array containing the room ids of all -// the rooms a user matching a given localpart is a member of -// If no membership match the given localpart, returns an empty array -// If there was an issue during the retrieval, returns the SQL error -func (d *Database) GetRoomIDsByLocalPart( - ctx context.Context, localpart string, -) ([]string, error) { - return d.memberships.selectRoomIDsByLocalPart(ctx, localpart) -} - -// GetMembershipsByLocalpart returns an array containing the memberships for all -// the rooms a user matching a given localpart is a member of -// If no membership match the given localpart, returns an empty array -// If there was an issue during the retrieval, returns the SQL error -func (d *Database) GetMembershipsByLocalpart( - ctx context.Context, localpart string, -) (memberships []authtypes.Membership, err error) { - return d.memberships.selectMembershipsByLocalpart(ctx, localpart) -} - -// newMembership saves a new membership in the database. -// If the event isn't a valid m.room.member event with type `join`, does nothing. -// If an error occurred, returns the SQL error -func (d *Database) newMembership( - ctx context.Context, txn *sql.Tx, ev gomatrixserverlib.Event, -) error { - if ev.Type() == "m.room.member" && ev.StateKey() != nil { - localpart, serverName, err := gomatrixserverlib.SplitID('@', *ev.StateKey()) - if err != nil { - return err - } - - // We only want state events from local users - if string(serverName) != string(d.serverName) { - return nil - } - - eventID := ev.EventID() - roomID := ev.RoomID() - membership, err := ev.Membership() - if err != nil { - return err - } - - // Only "join" membership events can be considered as new memberships - if membership == gomatrixserverlib.Join { - if err := d.saveMembership(ctx, txn, localpart, roomID, eventID); err != nil { - return err - } - } - } - return nil -} - // SaveAccountData saves new account data for a given user and a given room. // If the account data is not specific to a room, the room ID should be an empty string // If an account data already exists for a given set (user, room, data type), it will diff --git a/userapi/storage/accounts/sqlite3/membership_table.go b/userapi/storage/accounts/sqlite3/membership_table.go deleted file mode 100644 index 67958f27d..000000000 --- a/userapi/storage/accounts/sqlite3/membership_table.go +++ /dev/null @@ -1,159 +0,0 @@ -// Copyright 2017 Vector Creations Ltd -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package sqlite3 - -import ( - "context" - "database/sql" - "strings" - - "github.com/matrix-org/dendrite/clientapi/auth/authtypes" - "github.com/matrix-org/dendrite/internal" - "github.com/matrix-org/dendrite/internal/sqlutil" -) - -const membershipSchema = ` --- Stores data about users memberships to rooms. -CREATE TABLE IF NOT EXISTS account_memberships ( - -- The Matrix user ID localpart for the member - localpart TEXT NOT NULL, - -- The room this user is a member of - room_id TEXT NOT NULL, - -- The ID of the join membership event - event_id TEXT NOT NULL, - - -- A user can only be member of a room once - PRIMARY KEY (localpart, room_id), - - UNIQUE (event_id) -); -` - -const insertMembershipSQL = ` - INSERT INTO account_memberships(localpart, room_id, event_id) VALUES ($1, $2, $3) - ON CONFLICT (localpart, room_id) DO UPDATE SET event_id = EXCLUDED.event_id -` - -const selectMembershipsByLocalpartSQL = "" + - "SELECT room_id, event_id FROM account_memberships WHERE localpart = $1" - -const selectMembershipInRoomByLocalpartSQL = "" + - "SELECT event_id FROM account_memberships WHERE localpart = $1 AND room_id = $2" - -const selectRoomIDsByLocalPartSQL = "" + - "SELECT room_id FROM account_memberships WHERE localpart = $1" - -const deleteMembershipsByEventIDsSQL = "" + - "DELETE FROM account_memberships WHERE event_id IN ($1)" - -type membershipStatements struct { - insertMembershipStmt *sql.Stmt - selectMembershipInRoomByLocalpartStmt *sql.Stmt - selectMembershipsByLocalpartStmt *sql.Stmt - selectRoomIDsByLocalPartStmt *sql.Stmt -} - -func (s *membershipStatements) prepare(db *sql.DB) (err error) { - _, err = db.Exec(membershipSchema) - if err != nil { - return - } - if s.insertMembershipStmt, err = db.Prepare(insertMembershipSQL); err != nil { - return - } - if s.selectMembershipInRoomByLocalpartStmt, err = db.Prepare(selectMembershipInRoomByLocalpartSQL); err != nil { - return - } - if s.selectMembershipsByLocalpartStmt, err = db.Prepare(selectMembershipsByLocalpartSQL); err != nil { - return - } - if s.selectRoomIDsByLocalPartStmt, err = db.Prepare(selectRoomIDsByLocalPartSQL); err != nil { - return - } - return -} - -func (s *membershipStatements) insertMembership( - ctx context.Context, txn *sql.Tx, localpart, roomID, eventID string, -) (err error) { - stmt := txn.Stmt(s.insertMembershipStmt) - _, err = stmt.ExecContext(ctx, localpart, roomID, eventID) - return -} - -func (s *membershipStatements) deleteMembershipsByEventIDs( - ctx context.Context, txn *sql.Tx, eventIDs []string, -) (err error) { - sqlStr := strings.Replace(deleteMembershipsByEventIDsSQL, "($1)", sqlutil.QueryVariadic(len(eventIDs)), 1) - iEventIDs := make([]interface{}, len(eventIDs)) - for i, e := range eventIDs { - iEventIDs[i] = e - } - _, err = txn.ExecContext(ctx, sqlStr, iEventIDs...) - return -} - -func (s *membershipStatements) selectMembershipInRoomByLocalpart( - ctx context.Context, localpart, roomID string, -) (authtypes.Membership, error) { - membership := authtypes.Membership{Localpart: localpart, RoomID: roomID} - stmt := s.selectMembershipInRoomByLocalpartStmt - err := stmt.QueryRowContext(ctx, localpart, roomID).Scan(&membership.EventID) - - return membership, err -} - -func (s *membershipStatements) selectMembershipsByLocalpart( - ctx context.Context, localpart string, -) (memberships []authtypes.Membership, err error) { - stmt := s.selectMembershipsByLocalpartStmt - rows, err := stmt.QueryContext(ctx, localpart) - if err != nil { - return - } - - memberships = []authtypes.Membership{} - - defer internal.CloseAndLogIfError(ctx, rows, "selectMembershipsByLocalpart: rows.close() failed") - for rows.Next() { - var m authtypes.Membership - m.Localpart = localpart - if err := rows.Scan(&m.RoomID, &m.EventID); err != nil { - return nil, err - } - memberships = append(memberships, m) - } - - return -} -func (s *membershipStatements) selectRoomIDsByLocalPart( - ctx context.Context, localPart string, -) ([]string, error) { - stmt := s.selectRoomIDsByLocalPartStmt - rows, err := stmt.QueryContext(ctx, localPart) - if err != nil { - return nil, err - } - roomIDs := []string{} - defer rows.Close() // nolint: errcheck - for rows.Next() { - var roomID string - if err = rows.Scan(&roomID); err != nil { - return nil, err - } - roomIDs = append(roomIDs, roomID) - } - return roomIDs, rows.Err() -} diff --git a/userapi/storage/accounts/sqlite3/storage.go b/userapi/storage/accounts/sqlite3/storage.go index e965df4f9..72b27c8bf 100644 --- a/userapi/storage/accounts/sqlite3/storage.go +++ b/userapi/storage/accounts/sqlite3/storage.go @@ -36,7 +36,6 @@ type Database struct { sqlutil.PartitionOffsetStatements accounts accountsStatements profiles profilesStatements - memberships membershipStatements accountDatas accountDataStatements threepids threepidStatements serverName gomatrixserverlib.ServerName @@ -67,10 +66,6 @@ func NewDatabase(dataSourceName string, serverName gomatrixserverlib.ServerName) if err = p.prepare(db); err != nil { return nil, err } - m := membershipStatements{} - if err = m.prepare(db); err != nil { - return nil, err - } ac := accountDataStatements{} if err = ac.prepare(db); err != nil { return nil, err @@ -79,7 +74,7 @@ func NewDatabase(dataSourceName string, serverName gomatrixserverlib.ServerName) if err = t.prepare(db); err != nil { return nil, err } - return &Database{db, partitions, a, p, m, ac, t, serverName, sync.Mutex{}}, nil + return &Database{db, partitions, a, p, ac, t, serverName, sync.Mutex{}}, nil } // GetAccountByPassword returns the account associated with the given localpart and password. @@ -193,112 +188,6 @@ func (d *Database) createAccount( return d.accounts.insertAccount(ctx, txn, localpart, hash, appserviceID) } -// SaveMembership saves the user matching a given localpart as a member of a given -// room. It also stores the ID of the membership event. -// If a membership already exists between the user and the room, or if the -// insert fails, returns the SQL error -func (d *Database) saveMembership( - ctx context.Context, txn *sql.Tx, localpart, roomID, eventID string, -) error { - return d.memberships.insertMembership(ctx, txn, localpart, roomID, eventID) -} - -// removeMembershipsByEventIDs removes the memberships corresponding to the -// `join` membership events IDs in the eventIDs slice. -// If the removal fails, or if there is no membership to remove, returns an error -func (d *Database) removeMembershipsByEventIDs( - ctx context.Context, txn *sql.Tx, eventIDs []string, -) error { - return d.memberships.deleteMembershipsByEventIDs(ctx, txn, eventIDs) -} - -// UpdateMemberships adds the "join" membership events included in a given state -// events array, and removes those which ID is included in a given array of events -// IDs. All of the process is run in a transaction, which commits only once/if every -// insertion and deletion has been successfully processed. -// Returns a SQL error if there was an issue with any part of the process -func (d *Database) UpdateMemberships( - ctx context.Context, eventsToAdd []gomatrixserverlib.Event, idsToRemove []string, -) error { - return sqlutil.WithTransaction(d.db, func(txn *sql.Tx) error { - if err := d.removeMembershipsByEventIDs(ctx, txn, idsToRemove); err != nil { - return err - } - - for _, event := range eventsToAdd { - if err := d.newMembership(ctx, txn, event); err != nil { - return err - } - } - - return nil - }) -} - -// GetMembershipInRoomByLocalpart returns the membership for an user -// matching the given localpart if he is a member of the room matching roomID, -// if not sql.ErrNoRows is returned. -// If there was an issue during the retrieval, returns the SQL error -func (d *Database) GetMembershipInRoomByLocalpart( - ctx context.Context, localpart, roomID string, -) (authtypes.Membership, error) { - return d.memberships.selectMembershipInRoomByLocalpart(ctx, localpart, roomID) -} - -// GetMembershipsByLocalpart returns an array containing the memberships for all -// the rooms a user matching a given localpart is a member of -// If no membership match the given localpart, returns an empty array -// If there was an issue during the retrieval, returns the SQL error -func (d *Database) GetMembershipsByLocalpart( - ctx context.Context, localpart string, -) (memberships []authtypes.Membership, err error) { - return d.memberships.selectMembershipsByLocalpart(ctx, localpart) -} - -// GetRoomIDsByLocalPart returns an array containing the room ids of all -// the rooms a user matching a given localpart is a member of -// If no membership match the given localpart, returns an empty array -// If there was an issue during the retrieval, returns the SQL error -func (d *Database) GetRoomIDsByLocalPart( - ctx context.Context, localpart string, -) ([]string, error) { - return d.memberships.selectRoomIDsByLocalPart(ctx, localpart) -} - -// newMembership saves a new membership in the database. -// If the event isn't a valid m.room.member event with type `join`, does nothing. -// If an error occurred, returns the SQL error -func (d *Database) newMembership( - ctx context.Context, txn *sql.Tx, ev gomatrixserverlib.Event, -) error { - if ev.Type() == "m.room.member" && ev.StateKey() != nil { - localpart, serverName, err := gomatrixserverlib.SplitID('@', *ev.StateKey()) - if err != nil { - return err - } - - // We only want state events from local users - if string(serverName) != string(d.serverName) { - return nil - } - - eventID := ev.EventID() - roomID := ev.RoomID() - membership, err := ev.Membership() - if err != nil { - return err - } - - // Only "join" membership events can be considered as new memberships - if membership == gomatrixserverlib.Join { - if err := d.saveMembership(ctx, txn, localpart, roomID, eventID); err != nil { - return err - } - } - } - return nil -} - // SaveAccountData saves new account data for a given user and a given room. // If the account data is not specific to a room, the room ID should be an empty string // If an account data already exists for a given set (user, room, data type), it will