mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-20 21:33:19 -06:00
Merge branch 'master' into neilalexander/persistentqueues
This commit is contained in:
commit
693928396d
|
|
@ -91,6 +91,7 @@ snd PUT /rooms/:room_id/send/:event_type/:txn_id deduplicates the same txn id
|
|||
get GET /rooms/:room_id/messages returns a message
|
||||
get GET /rooms/:room_id/messages lazy loads members correctly
|
||||
typ PUT /rooms/:room_id/typing/:user_id sets typing notification
|
||||
typ Typing notifications don't leak (3 subtests)
|
||||
rst GET /rooms/:room_id/state/m.room.power_levels can fetch levels
|
||||
rst PUT /rooms/:room_id/state/m.room.power_levels can set levels
|
||||
rst PUT power_levels should not explode if the old power levels were empty
|
||||
|
|
|
|||
|
|
@ -18,9 +18,9 @@ import (
|
|||
"github.com/Shopify/sarama"
|
||||
"github.com/gorilla/mux"
|
||||
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
|
||||
"github.com/matrix-org/dendrite/clientapi/consumers"
|
||||
"github.com/matrix-org/dendrite/clientapi/producers"
|
||||
"github.com/matrix-org/dendrite/clientapi/routing"
|
||||
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
|
||||
eduServerAPI "github.com/matrix-org/dendrite/eduserver/api"
|
||||
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
|
||||
"github.com/matrix-org/dendrite/internal/config"
|
||||
|
|
@ -30,14 +30,12 @@ import (
|
|||
"github.com/matrix-org/dendrite/userapi/storage/accounts"
|
||||
"github.com/matrix-org/dendrite/userapi/storage/devices"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// AddPublicRoutes sets up and registers HTTP handlers for the ClientAPI component.
|
||||
func AddPublicRoutes(
|
||||
router *mux.Router,
|
||||
cfg *config.Dendrite,
|
||||
consumer sarama.Consumer,
|
||||
producer sarama.SyncProducer,
|
||||
deviceDB devices.Database,
|
||||
accountsDB accounts.Database,
|
||||
|
|
@ -45,6 +43,7 @@ func AddPublicRoutes(
|
|||
rsAPI roomserverAPI.RoomserverInternalAPI,
|
||||
eduInputAPI eduServerAPI.EDUServerInputAPI,
|
||||
asAPI appserviceAPI.AppServiceQueryAPI,
|
||||
stateAPI currentstateAPI.CurrentStateInternalAPI,
|
||||
transactionsCache *transactions.Cache,
|
||||
fsAPI federationSenderAPI.FederationSenderInternalAPI,
|
||||
userAPI userapi.UserInternalAPI,
|
||||
|
|
@ -54,16 +53,9 @@ func AddPublicRoutes(
|
|||
Topic: string(cfg.Kafka.Topics.OutputClientData),
|
||||
}
|
||||
|
||||
roomEventConsumer := consumers.NewOutputRoomEventConsumer(
|
||||
cfg, consumer, accountsDB, rsAPI,
|
||||
)
|
||||
if err := roomEventConsumer.Start(); err != nil {
|
||||
logrus.WithError(err).Panicf("failed to start room server consumer")
|
||||
}
|
||||
|
||||
routing.Setup(
|
||||
router, cfg, eduInputAPI, rsAPI, asAPI,
|
||||
accountsDB, deviceDB, userAPI, federation,
|
||||
syncProducer, transactionsCache, fsAPI,
|
||||
syncProducer, transactionsCache, fsAPI, stateAPI,
|
||||
)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,92 +0,0 @@
|
|||
// Copyright 2017 Vector Creations Ltd
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package consumers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/dendrite/internal/config"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/userapi/storage/accounts"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
|
||||
"github.com/Shopify/sarama"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// OutputRoomEventConsumer consumes events that originated in the room server.
|
||||
type OutputRoomEventConsumer struct {
|
||||
rsAPI api.RoomserverInternalAPI
|
||||
rsConsumer *internal.ContinualConsumer
|
||||
db accounts.Database
|
||||
serverName string
|
||||
}
|
||||
|
||||
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
|
||||
func NewOutputRoomEventConsumer(
|
||||
cfg *config.Dendrite,
|
||||
kafkaConsumer sarama.Consumer,
|
||||
store accounts.Database,
|
||||
rsAPI api.RoomserverInternalAPI,
|
||||
) *OutputRoomEventConsumer {
|
||||
|
||||
consumer := internal.ContinualConsumer{
|
||||
Topic: string(cfg.Kafka.Topics.OutputRoomEvent),
|
||||
Consumer: kafkaConsumer,
|
||||
PartitionStore: store,
|
||||
}
|
||||
s := &OutputRoomEventConsumer{
|
||||
rsConsumer: &consumer,
|
||||
db: store,
|
||||
rsAPI: rsAPI,
|
||||
serverName: string(cfg.Matrix.ServerName),
|
||||
}
|
||||
consumer.ProcessMessage = s.onMessage
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
// Start consuming from room servers
|
||||
func (s *OutputRoomEventConsumer) Start() error {
|
||||
return s.rsConsumer.Start()
|
||||
}
|
||||
|
||||
// onMessage is called when the sync server receives a new event from the room server output log.
|
||||
// It is not safe for this function to be called from multiple goroutines, or else the
|
||||
// sync stream position may race and be incorrectly calculated.
|
||||
func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
||||
// Parse out the event JSON
|
||||
var output api.OutputEvent
|
||||
if err := json.Unmarshal(msg.Value, &output); err != nil {
|
||||
// If the message was invalid, log it and move on to the next message in the stream
|
||||
log.WithError(err).Errorf("roomserver output log: message parse failure")
|
||||
return nil
|
||||
}
|
||||
|
||||
if output.Type != api.OutputTypeNewRoomEvent {
|
||||
log.WithField("type", output.Type).Debug(
|
||||
"roomserver output log: ignoring unknown output type",
|
||||
)
|
||||
return nil
|
||||
}
|
||||
|
||||
return s.db.UpdateMemberships(
|
||||
context.TODO(),
|
||||
gomatrixserverlib.UnwrapEventHeaders(output.NewRoomEvent.AddsState()),
|
||||
output.NewRoomEvent.RemovesStateEventIDs,
|
||||
)
|
||||
}
|
||||
|
|
@ -18,9 +18,8 @@ import (
|
|||
"encoding/json"
|
||||
"net/http"
|
||||
|
||||
"github.com/matrix-org/dendrite/userapi/storage/accounts"
|
||||
|
||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
|
||||
"github.com/matrix-org/dendrite/internal/config"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||
|
|
@ -95,20 +94,19 @@ func GetMemberships(
|
|||
func GetJoinedRooms(
|
||||
req *http.Request,
|
||||
device *userapi.Device,
|
||||
accountsDB accounts.Database,
|
||||
stateAPI currentstateAPI.CurrentStateInternalAPI,
|
||||
) util.JSONResponse {
|
||||
localpart, _, err := gomatrixserverlib.SplitID('@', device.UserID)
|
||||
var res currentstateAPI.QueryRoomsForUserResponse
|
||||
err := stateAPI.QueryRoomsForUser(req.Context(), ¤tstateAPI.QueryRoomsForUserRequest{
|
||||
UserID: device.UserID,
|
||||
WantMembership: "join",
|
||||
}, &res)
|
||||
if err != nil {
|
||||
util.GetLogger(req.Context()).WithError(err).Error("gomatrixserverlib.SplitID failed")
|
||||
return jsonerror.InternalServerError()
|
||||
}
|
||||
joinedRooms, err := accountsDB.GetRoomIDsByLocalPart(req.Context(), localpart)
|
||||
if err != nil {
|
||||
util.GetLogger(req.Context()).WithError(err).Error("accountsDB.GetRoomIDsByLocalPart failed")
|
||||
util.GetLogger(req.Context()).WithError(err).Error("QueryRoomsForUser failed")
|
||||
return jsonerror.InternalServerError()
|
||||
}
|
||||
return util.JSONResponse{
|
||||
Code: http.StatusOK,
|
||||
JSON: getJoinedRoomsResponse{joinedRooms},
|
||||
JSON: getJoinedRoomsResponse{res.RoomIDs},
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@ import (
|
|||
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
||||
"github.com/matrix-org/dendrite/clientapi/httputil"
|
||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
|
||||
"github.com/matrix-org/dendrite/internal/config"
|
||||
"github.com/matrix-org/dendrite/internal/eventutil"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
|
|
@ -93,8 +94,8 @@ func GetAvatarURL(
|
|||
// SetAvatarURL implements PUT /profile/{userID}/avatar_url
|
||||
// nolint:gocyclo
|
||||
func SetAvatarURL(
|
||||
req *http.Request, accountDB accounts.Database, device *userapi.Device,
|
||||
userID string, cfg *config.Dendrite, rsAPI api.RoomserverInternalAPI,
|
||||
req *http.Request, accountDB accounts.Database, stateAPI currentstateAPI.CurrentStateInternalAPI,
|
||||
device *userapi.Device, userID string, cfg *config.Dendrite, rsAPI api.RoomserverInternalAPI,
|
||||
) util.JSONResponse {
|
||||
if userID != device.UserID {
|
||||
return util.JSONResponse{
|
||||
|
|
@ -139,9 +140,13 @@ func SetAvatarURL(
|
|||
return jsonerror.InternalServerError()
|
||||
}
|
||||
|
||||
memberships, err := accountDB.GetMembershipsByLocalpart(req.Context(), localpart)
|
||||
var res currentstateAPI.QueryRoomsForUserResponse
|
||||
err = stateAPI.QueryRoomsForUser(req.Context(), ¤tstateAPI.QueryRoomsForUserRequest{
|
||||
UserID: device.UserID,
|
||||
WantMembership: "join",
|
||||
}, &res)
|
||||
if err != nil {
|
||||
util.GetLogger(req.Context()).WithError(err).Error("accountDB.GetMembershipsByLocalpart failed")
|
||||
util.GetLogger(req.Context()).WithError(err).Error("QueryRoomsForUser failed")
|
||||
return jsonerror.InternalServerError()
|
||||
}
|
||||
|
||||
|
|
@ -152,7 +157,7 @@ func SetAvatarURL(
|
|||
}
|
||||
|
||||
events, err := buildMembershipEvents(
|
||||
req.Context(), memberships, newProfile, userID, cfg, evTime, rsAPI,
|
||||
req.Context(), res.RoomIDs, newProfile, userID, cfg, evTime, rsAPI,
|
||||
)
|
||||
switch e := err.(type) {
|
||||
case nil:
|
||||
|
|
@ -207,8 +212,8 @@ func GetDisplayName(
|
|||
// SetDisplayName implements PUT /profile/{userID}/displayname
|
||||
// nolint:gocyclo
|
||||
func SetDisplayName(
|
||||
req *http.Request, accountDB accounts.Database, device *userapi.Device,
|
||||
userID string, cfg *config.Dendrite, rsAPI api.RoomserverInternalAPI,
|
||||
req *http.Request, accountDB accounts.Database, stateAPI currentstateAPI.CurrentStateInternalAPI,
|
||||
device *userapi.Device, userID string, cfg *config.Dendrite, rsAPI api.RoomserverInternalAPI,
|
||||
) util.JSONResponse {
|
||||
if userID != device.UserID {
|
||||
return util.JSONResponse{
|
||||
|
|
@ -253,9 +258,13 @@ func SetDisplayName(
|
|||
return jsonerror.InternalServerError()
|
||||
}
|
||||
|
||||
memberships, err := accountDB.GetMembershipsByLocalpart(req.Context(), localpart)
|
||||
var res currentstateAPI.QueryRoomsForUserResponse
|
||||
err = stateAPI.QueryRoomsForUser(req.Context(), ¤tstateAPI.QueryRoomsForUserRequest{
|
||||
UserID: device.UserID,
|
||||
WantMembership: "join",
|
||||
}, &res)
|
||||
if err != nil {
|
||||
util.GetLogger(req.Context()).WithError(err).Error("accountDB.GetMembershipsByLocalpart failed")
|
||||
util.GetLogger(req.Context()).WithError(err).Error("QueryRoomsForUser failed")
|
||||
return jsonerror.InternalServerError()
|
||||
}
|
||||
|
||||
|
|
@ -266,7 +275,7 @@ func SetDisplayName(
|
|||
}
|
||||
|
||||
events, err := buildMembershipEvents(
|
||||
req.Context(), memberships, newProfile, userID, cfg, evTime, rsAPI,
|
||||
req.Context(), res.RoomIDs, newProfile, userID, cfg, evTime, rsAPI,
|
||||
)
|
||||
switch e := err.(type) {
|
||||
case nil:
|
||||
|
|
@ -335,14 +344,14 @@ func getProfile(
|
|||
|
||||
func buildMembershipEvents(
|
||||
ctx context.Context,
|
||||
memberships []authtypes.Membership,
|
||||
roomIDs []string,
|
||||
newProfile authtypes.Profile, userID string, cfg *config.Dendrite,
|
||||
evTime time.Time, rsAPI api.RoomserverInternalAPI,
|
||||
) ([]gomatrixserverlib.HeaderedEvent, error) {
|
||||
evs := []gomatrixserverlib.HeaderedEvent{}
|
||||
|
||||
for _, membership := range memberships {
|
||||
verReq := api.QueryRoomVersionForRoomRequest{RoomID: membership.RoomID}
|
||||
for _, roomID := range roomIDs {
|
||||
verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID}
|
||||
verRes := api.QueryRoomVersionForRoomResponse{}
|
||||
if err := rsAPI.QueryRoomVersionForRoom(ctx, &verReq, &verRes); err != nil {
|
||||
return []gomatrixserverlib.HeaderedEvent{}, err
|
||||
|
|
@ -350,7 +359,7 @@ func buildMembershipEvents(
|
|||
|
||||
builder := gomatrixserverlib.EventBuilder{
|
||||
Sender: userID,
|
||||
RoomID: membership.RoomID,
|
||||
RoomID: roomID,
|
||||
Type: "m.room.member",
|
||||
StateKey: &userID,
|
||||
}
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@ import (
|
|||
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
|
||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||
"github.com/matrix-org/dendrite/clientapi/producers"
|
||||
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
|
||||
eduServerAPI "github.com/matrix-org/dendrite/eduserver/api"
|
||||
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
|
||||
"github.com/matrix-org/dendrite/internal/config"
|
||||
|
|
@ -58,6 +59,7 @@ func Setup(
|
|||
syncProducer *producers.SyncAPIProducer,
|
||||
transactionsCache *transactions.Cache,
|
||||
federationSender federationSenderAPI.FederationSenderInternalAPI,
|
||||
stateAPI currentstateAPI.CurrentStateInternalAPI,
|
||||
) {
|
||||
|
||||
publicAPIMux.Handle("/client/versions",
|
||||
|
|
@ -98,7 +100,7 @@ func Setup(
|
|||
).Methods(http.MethodPost, http.MethodOptions)
|
||||
r0mux.Handle("/joined_rooms",
|
||||
httputil.MakeAuthAPI("joined_rooms", userAPI, func(req *http.Request, device *api.Device) util.JSONResponse {
|
||||
return GetJoinedRooms(req, device, accountDB)
|
||||
return GetJoinedRooms(req, device, stateAPI)
|
||||
}),
|
||||
).Methods(http.MethodGet, http.MethodOptions)
|
||||
r0mux.Handle("/rooms/{roomID}/join",
|
||||
|
|
@ -307,7 +309,7 @@ func Setup(
|
|||
if err != nil {
|
||||
return util.ErrorResponse(err)
|
||||
}
|
||||
return SendTyping(req, device, vars["roomID"], vars["userID"], accountDB, eduAPI)
|
||||
return SendTyping(req, device, vars["roomID"], vars["userID"], accountDB, eduAPI, stateAPI)
|
||||
}),
|
||||
).Methods(http.MethodPut, http.MethodOptions)
|
||||
|
||||
|
|
@ -404,7 +406,7 @@ func Setup(
|
|||
if err != nil {
|
||||
return util.ErrorResponse(err)
|
||||
}
|
||||
return SetAvatarURL(req, accountDB, device, vars["userID"], cfg, rsAPI)
|
||||
return SetAvatarURL(req, accountDB, stateAPI, device, vars["userID"], cfg, rsAPI)
|
||||
}),
|
||||
).Methods(http.MethodPut, http.MethodOptions)
|
||||
// Browsers use the OPTIONS HTTP method to check if the CORS policy allows
|
||||
|
|
@ -426,7 +428,7 @@ func Setup(
|
|||
if err != nil {
|
||||
return util.ErrorResponse(err)
|
||||
}
|
||||
return SetDisplayName(req, accountDB, device, vars["userID"], cfg, rsAPI)
|
||||
return SetDisplayName(req, accountDB, stateAPI, device, vars["userID"], cfg, rsAPI)
|
||||
}),
|
||||
).Methods(http.MethodPut, http.MethodOptions)
|
||||
// Browsers use the OPTIONS HTTP method to check if the CORS policy allows
|
||||
|
|
|
|||
|
|
@ -13,15 +13,15 @@
|
|||
package routing
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"net/http"
|
||||
|
||||
"github.com/matrix-org/dendrite/clientapi/httputil"
|
||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||
"github.com/matrix-org/dendrite/clientapi/userutil"
|
||||
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
|
||||
"github.com/matrix-org/dendrite/eduserver/api"
|
||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||
"github.com/matrix-org/dendrite/userapi/storage/accounts"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/util"
|
||||
)
|
||||
|
||||
|
|
@ -36,6 +36,7 @@ func SendTyping(
|
|||
req *http.Request, device *userapi.Device, roomID string,
|
||||
userID string, accountDB accounts.Database,
|
||||
eduAPI api.EDUServerInputAPI,
|
||||
stateAPI currentstateAPI.CurrentStateInternalAPI,
|
||||
) util.JSONResponse {
|
||||
if device.UserID != userID {
|
||||
return util.JSONResponse{
|
||||
|
|
@ -44,23 +45,38 @@ func SendTyping(
|
|||
}
|
||||
}
|
||||
|
||||
localpart, err := userutil.ParseUsernameParam(userID, nil)
|
||||
// Verify that the user is a member of this room
|
||||
tuple := gomatrixserverlib.StateKeyTuple{
|
||||
EventType: gomatrixserverlib.MRoomMember,
|
||||
StateKey: userID,
|
||||
}
|
||||
var res currentstateAPI.QueryCurrentStateResponse
|
||||
err := stateAPI.QueryCurrentState(req.Context(), ¤tstateAPI.QueryCurrentStateRequest{
|
||||
RoomID: roomID,
|
||||
StateTuples: []gomatrixserverlib.StateKeyTuple{tuple},
|
||||
}, &res)
|
||||
if err != nil {
|
||||
util.GetLogger(req.Context()).WithError(err).Error("userutil.ParseUsernameParam failed")
|
||||
util.GetLogger(req.Context()).WithError(err).Error("QueryCurrentState failed")
|
||||
return jsonerror.InternalServerError()
|
||||
}
|
||||
|
||||
// Verify that the user is a member of this room
|
||||
_, err = accountDB.GetMembershipInRoomByLocalpart(req.Context(), localpart, roomID)
|
||||
if err == sql.ErrNoRows {
|
||||
ev := res.StateEvents[tuple]
|
||||
if ev == nil {
|
||||
return util.JSONResponse{
|
||||
Code: http.StatusForbidden,
|
||||
JSON: jsonerror.Forbidden("User not in this room"),
|
||||
}
|
||||
} else if err != nil {
|
||||
util.GetLogger(req.Context()).WithError(err).Error("accountDB.GetMembershipInRoomByLocalPart failed")
|
||||
}
|
||||
membership, err := ev.Membership()
|
||||
if err != nil {
|
||||
util.GetLogger(req.Context()).WithError(err).Error("Member event isn't valid")
|
||||
return jsonerror.InternalServerError()
|
||||
}
|
||||
if membership != gomatrixserverlib.Join {
|
||||
return util.JSONResponse{
|
||||
Code: http.StatusForbidden,
|
||||
JSON: jsonerror.Forbidden("User not in this room"),
|
||||
}
|
||||
}
|
||||
|
||||
// parse the incoming http request
|
||||
var r typingContentJSON
|
||||
|
|
|
|||
|
|
@ -35,10 +35,11 @@ func main() {
|
|||
fsAPI := base.FederationSenderHTTPClient()
|
||||
eduInputAPI := base.EDUServerClient()
|
||||
userAPI := base.UserAPIClient()
|
||||
stateAPI := base.CurrentStateAPIClient()
|
||||
|
||||
clientapi.AddPublicRoutes(
|
||||
base.PublicAPIMux, base.Cfg, base.KafkaConsumer, base.KafkaProducer, deviceDB, accountDB, federation,
|
||||
rsAPI, eduInputAPI, asQuery, transactions.New(), fsAPI, userAPI,
|
||||
base.PublicAPIMux, base.Cfg, base.KafkaProducer, deviceDB, accountDB, federation,
|
||||
rsAPI, eduInputAPI, asQuery, stateAPI, transactions.New(), fsAPI, userAPI,
|
||||
)
|
||||
|
||||
base.SetupAndServeHTTP(string(base.Cfg.Bind.ClientAPI), string(base.Cfg.Listen.ClientAPI))
|
||||
|
|
|
|||
|
|
@ -30,6 +30,7 @@ import (
|
|||
p2pdisc "github.com/libp2p/go-libp2p/p2p/discovery"
|
||||
"github.com/matrix-org/dendrite/appservice"
|
||||
"github.com/matrix-org/dendrite/cmd/dendrite-demo-libp2p/storage"
|
||||
"github.com/matrix-org/dendrite/currentstateserver"
|
||||
"github.com/matrix-org/dendrite/eduserver"
|
||||
"github.com/matrix-org/dendrite/federationsender"
|
||||
"github.com/matrix-org/dendrite/internal/config"
|
||||
|
|
@ -166,6 +167,7 @@ func main() {
|
|||
if err != nil {
|
||||
logrus.WithError(err).Panicf("failed to connect to public rooms db")
|
||||
}
|
||||
stateAPI := currentstateserver.NewInternalAPI(base.Base.Cfg, base.Base.KafkaConsumer)
|
||||
|
||||
monolith := setup.Monolith{
|
||||
Config: base.Base.Cfg,
|
||||
|
|
@ -182,6 +184,7 @@ func main() {
|
|||
FederationSenderAPI: fsAPI,
|
||||
RoomserverAPI: rsAPI,
|
||||
ServerKeyAPI: serverKeyAPI,
|
||||
StateAPI: stateAPI,
|
||||
UserAPI: userAPI,
|
||||
|
||||
PublicRoomsDB: publicRoomsDB,
|
||||
|
|
|
|||
|
|
@ -27,6 +27,7 @@ import (
|
|||
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/embed"
|
||||
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing"
|
||||
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/yggconn"
|
||||
"github.com/matrix-org/dendrite/currentstateserver"
|
||||
"github.com/matrix-org/dendrite/eduserver"
|
||||
"github.com/matrix-org/dendrite/eduserver/cache"
|
||||
"github.com/matrix-org/dendrite/federationsender"
|
||||
|
|
@ -115,6 +116,8 @@ func main() {
|
|||
|
||||
embed.Embed(base.BaseMux, *instancePort, "Yggdrasil Demo")
|
||||
|
||||
stateAPI := currentstateserver.NewInternalAPI(base.Cfg, base.KafkaConsumer)
|
||||
|
||||
monolith := setup.Monolith{
|
||||
Config: base.Cfg,
|
||||
AccountDB: accountDB,
|
||||
|
|
@ -130,6 +133,7 @@ func main() {
|
|||
FederationSenderAPI: fsAPI,
|
||||
RoomserverAPI: rsAPI,
|
||||
UserAPI: userAPI,
|
||||
StateAPI: stateAPI,
|
||||
//ServerKeyAPI: serverKeyAPI,
|
||||
|
||||
PublicRoomsDB: publicRoomsDB,
|
||||
|
|
|
|||
|
|
@ -20,6 +20,7 @@ import (
|
|||
"os"
|
||||
|
||||
"github.com/matrix-org/dendrite/appservice"
|
||||
"github.com/matrix-org/dendrite/currentstateserver"
|
||||
"github.com/matrix-org/dendrite/eduserver"
|
||||
"github.com/matrix-org/dendrite/eduserver/cache"
|
||||
"github.com/matrix-org/dendrite/federationsender"
|
||||
|
|
@ -122,6 +123,8 @@ func main() {
|
|||
logrus.WithError(err).Panicf("failed to connect to public rooms db")
|
||||
}
|
||||
|
||||
stateAPI := currentstateserver.NewInternalAPI(base.Cfg, base.KafkaConsumer)
|
||||
|
||||
monolith := setup.Monolith{
|
||||
Config: base.Cfg,
|
||||
AccountDB: accountDB,
|
||||
|
|
@ -137,6 +140,7 @@ func main() {
|
|||
FederationSenderAPI: fsAPI,
|
||||
RoomserverAPI: rsAPI,
|
||||
ServerKeyAPI: serverKeyAPI,
|
||||
StateAPI: stateAPI,
|
||||
UserAPI: userAPI,
|
||||
|
||||
PublicRoomsDB: publicRoomsDB,
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ import (
|
|||
"syscall/js"
|
||||
|
||||
"github.com/matrix-org/dendrite/appservice"
|
||||
"github.com/matrix-org/dendrite/currentstateserver"
|
||||
"github.com/matrix-org/dendrite/eduserver"
|
||||
"github.com/matrix-org/dendrite/eduserver/cache"
|
||||
"github.com/matrix-org/dendrite/federationsender"
|
||||
|
|
@ -218,6 +219,8 @@ func main() {
|
|||
logrus.WithError(err).Panicf("failed to connect to public rooms db")
|
||||
}
|
||||
|
||||
stateAPI := currentstateserver.NewInternalAPI(base.Cfg, base.KafkaConsumer)
|
||||
|
||||
monolith := setup.Monolith{
|
||||
Config: base.Cfg,
|
||||
AccountDB: accountDB,
|
||||
|
|
@ -232,6 +235,7 @@ func main() {
|
|||
EDUInternalAPI: eduInputAPI,
|
||||
FederationSenderAPI: fedSenderAPI,
|
||||
RoomserverAPI: rsAPI,
|
||||
StateAPI: stateAPI,
|
||||
UserAPI: userAPI,
|
||||
//ServerKeyAPI: serverKeyAPI,
|
||||
|
||||
|
|
|
|||
78
currentstateserver/api/api.go
Normal file
78
currentstateserver/api/api.go
Normal file
|
|
@ -0,0 +1,78 @@
|
|||
// Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package api
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
type CurrentStateInternalAPI interface {
|
||||
// QueryCurrentState retrieves the requested state events. If state events are not found, they will be missing from
|
||||
// the response.
|
||||
QueryCurrentState(ctx context.Context, req *QueryCurrentStateRequest, res *QueryCurrentStateResponse) error
|
||||
// QueryRoomsForUser retrieves a list of room IDs matching the given query.
|
||||
QueryRoomsForUser(ctx context.Context, req *QueryRoomsForUserRequest, res *QueryRoomsForUserResponse) error
|
||||
}
|
||||
|
||||
type QueryRoomsForUserRequest struct {
|
||||
UserID string
|
||||
// The desired membership of the user. If this is the empty string then no rooms are returned.
|
||||
WantMembership string
|
||||
}
|
||||
|
||||
type QueryRoomsForUserResponse struct {
|
||||
RoomIDs []string
|
||||
}
|
||||
|
||||
type QueryCurrentStateRequest struct {
|
||||
RoomID string
|
||||
StateTuples []gomatrixserverlib.StateKeyTuple
|
||||
}
|
||||
|
||||
type QueryCurrentStateResponse struct {
|
||||
StateEvents map[gomatrixserverlib.StateKeyTuple]*gomatrixserverlib.HeaderedEvent
|
||||
}
|
||||
|
||||
// MarshalJSON stringifies the StateKeyTuple keys so they can be sent over the wire in HTTP API mode.
|
||||
func (r *QueryCurrentStateResponse) MarshalJSON() ([]byte, error) {
|
||||
se := make(map[string]*gomatrixserverlib.HeaderedEvent, len(r.StateEvents))
|
||||
for k, v := range r.StateEvents {
|
||||
// use 0x1F (unit separator) as the delimiter between type/state key,
|
||||
se[fmt.Sprintf("%s\x1F%s", k.EventType, k.StateKey)] = v
|
||||
}
|
||||
return json.Marshal(se)
|
||||
}
|
||||
|
||||
func (r *QueryCurrentStateResponse) UnmarshalJSON(data []byte) error {
|
||||
res := make(map[string]*gomatrixserverlib.HeaderedEvent)
|
||||
err := json.Unmarshal(data, &res)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r.StateEvents = make(map[gomatrixserverlib.StateKeyTuple]*gomatrixserverlib.HeaderedEvent, len(res))
|
||||
for k, v := range res {
|
||||
fields := strings.Split(k, "\x1F")
|
||||
r.StateEvents[gomatrixserverlib.StateKeyTuple{
|
||||
EventType: fields[0],
|
||||
StateKey: fields[1],
|
||||
}] = v
|
||||
}
|
||||
return nil
|
||||
}
|
||||
140
currentstateserver/consumers/roomserver.go
Normal file
140
currentstateserver/consumers/roomserver.go
Normal file
|
|
@ -0,0 +1,140 @@
|
|||
// Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package consumers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/Shopify/sarama"
|
||||
"github.com/matrix-org/dendrite/currentstateserver/storage"
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type OutputRoomEventConsumer struct {
|
||||
rsConsumer *internal.ContinualConsumer
|
||||
db storage.Database
|
||||
}
|
||||
|
||||
func NewOutputRoomEventConsumer(topicName string, kafkaConsumer sarama.Consumer, store storage.Database) *OutputRoomEventConsumer {
|
||||
consumer := &internal.ContinualConsumer{
|
||||
Topic: topicName,
|
||||
Consumer: kafkaConsumer,
|
||||
PartitionStore: store,
|
||||
}
|
||||
s := &OutputRoomEventConsumer{
|
||||
rsConsumer: consumer,
|
||||
db: store,
|
||||
}
|
||||
consumer.ProcessMessage = s.onMessage
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
func (c *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
||||
// Parse out the event JSON
|
||||
var output api.OutputEvent
|
||||
if err := json.Unmarshal(msg.Value, &output); err != nil {
|
||||
// If the message was invalid, log it and move on to the next message in the stream
|
||||
log.WithError(err).Errorf("roomserver output log: message parse failure")
|
||||
return nil
|
||||
}
|
||||
|
||||
switch output.Type {
|
||||
case api.OutputTypeNewRoomEvent:
|
||||
return c.onNewRoomEvent(context.TODO(), *output.NewRoomEvent)
|
||||
case api.OutputTypeNewInviteEvent:
|
||||
case api.OutputTypeRetireInviteEvent:
|
||||
default:
|
||||
log.WithField("type", output.Type).Debug(
|
||||
"roomserver output log: ignoring unknown output type",
|
||||
)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *OutputRoomEventConsumer) onNewRoomEvent(
|
||||
ctx context.Context, msg api.OutputNewRoomEvent,
|
||||
) error {
|
||||
ev := msg.Event
|
||||
|
||||
addsStateEvents := msg.AddsState()
|
||||
|
||||
ev, err := c.updateStateEvent(ev)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for i := range addsStateEvents {
|
||||
addsStateEvents[i], err = c.updateStateEvent(addsStateEvents[i])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
err = c.db.StoreStateEvents(
|
||||
ctx,
|
||||
addsStateEvents,
|
||||
msg.RemovesStateEventIDs,
|
||||
)
|
||||
if err != nil {
|
||||
// panic rather than continue with an inconsistent database
|
||||
log.WithFields(log.Fields{
|
||||
"event": string(ev.JSON()),
|
||||
log.ErrorKey: err,
|
||||
"add": msg.AddsStateEventIDs,
|
||||
"del": msg.RemovesStateEventIDs,
|
||||
}).Panicf("roomserver output log: write event failure")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Start consuming from room servers
|
||||
func (c *OutputRoomEventConsumer) Start() error {
|
||||
return c.rsConsumer.Start()
|
||||
}
|
||||
|
||||
func (c *OutputRoomEventConsumer) updateStateEvent(event gomatrixserverlib.HeaderedEvent) (gomatrixserverlib.HeaderedEvent, error) {
|
||||
var stateKey string
|
||||
if event.StateKey() == nil {
|
||||
stateKey = ""
|
||||
} else {
|
||||
stateKey = *event.StateKey()
|
||||
}
|
||||
|
||||
prevEvent, err := c.db.GetStateEvent(
|
||||
context.TODO(), event.RoomID(), event.Type(), stateKey,
|
||||
)
|
||||
if err != nil {
|
||||
return event, err
|
||||
}
|
||||
|
||||
if prevEvent == nil {
|
||||
return event, nil
|
||||
}
|
||||
|
||||
prev := types.PrevEventRef{
|
||||
PrevContent: prevEvent.Content(),
|
||||
ReplacesState: prevEvent.EventID(),
|
||||
PrevSender: prevEvent.Sender(),
|
||||
}
|
||||
|
||||
event.Event, err = event.SetUnsigned(prev)
|
||||
return event, err
|
||||
}
|
||||
51
currentstateserver/currentstateserver.go
Normal file
51
currentstateserver/currentstateserver.go
Normal file
|
|
@ -0,0 +1,51 @@
|
|||
// Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package currentstateserver
|
||||
|
||||
import (
|
||||
"github.com/Shopify/sarama"
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/matrix-org/dendrite/currentstateserver/api"
|
||||
"github.com/matrix-org/dendrite/currentstateserver/consumers"
|
||||
"github.com/matrix-org/dendrite/currentstateserver/internal"
|
||||
"github.com/matrix-org/dendrite/currentstateserver/inthttp"
|
||||
"github.com/matrix-org/dendrite/currentstateserver/storage"
|
||||
"github.com/matrix-org/dendrite/internal/config"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// AddInternalRoutes registers HTTP handlers for the internal API. Invokes functions
|
||||
// on the given input API.
|
||||
func AddInternalRoutes(router *mux.Router, intAPI api.CurrentStateInternalAPI) {
|
||||
inthttp.AddRoutes(router, intAPI)
|
||||
}
|
||||
|
||||
// NewInternalAPI returns a concrete implementation of the internal API. Callers
|
||||
// can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes.
|
||||
func NewInternalAPI(cfg *config.Dendrite, consumer sarama.Consumer) api.CurrentStateInternalAPI {
|
||||
csDB, err := storage.NewDatabase(string(cfg.Database.CurrentState), cfg.DbProperties())
|
||||
if err != nil {
|
||||
logrus.WithError(err).Panicf("failed to open database")
|
||||
}
|
||||
roomConsumer := consumers.NewOutputRoomEventConsumer(
|
||||
string(cfg.Kafka.Topics.OutputRoomEvent), consumer, csDB,
|
||||
)
|
||||
if err = roomConsumer.Start(); err != nil {
|
||||
logrus.WithError(err).Panicf("failed to start room server consumer")
|
||||
}
|
||||
return &internal.CurrentStateInternalAPI{
|
||||
DB: csDB,
|
||||
}
|
||||
}
|
||||
180
currentstateserver/currentstateserver_test.go
Normal file
180
currentstateserver/currentstateserver_test.go
Normal file
|
|
@ -0,0 +1,180 @@
|
|||
// Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package currentstateserver
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/Shopify/sarama"
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/matrix-org/dendrite/currentstateserver/api"
|
||||
"github.com/matrix-org/dendrite/currentstateserver/inthttp"
|
||||
"github.com/matrix-org/dendrite/internal/config"
|
||||
"github.com/matrix-org/dendrite/internal/httputil"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
"github.com/matrix-org/dendrite/internal/test"
|
||||
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/naffka"
|
||||
)
|
||||
|
||||
var (
|
||||
testRoomVersion = gomatrixserverlib.RoomVersionV1
|
||||
testData = []json.RawMessage{
|
||||
[]byte(`{"auth_events":[],"content":{"creator":"@userid:kaer.morhen"},"depth":0,"event_id":"$0ok8ynDp7kjc95e3:kaer.morhen","hashes":{"sha256":"17kPoH+h0Dk4Omn7Sus0qMb6+oGcf+CZFEgDhv7UKWs"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[],"prev_state":[],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"jP4a04f5/F10Pw95FPpdCyKAO44JOwUQ/MZOOeA/RTU1Dn+AHPMzGSaZnuGjRr/xQuADt+I3ctb5ZQfLKNzHDw"}},"state_key":"","type":"m.room.create"}`),
|
||||
[]byte(`{"auth_events":[["$0ok8ynDp7kjc95e3:kaer.morhen",{"sha256":"sWCi6Ckp9rDimQON+MrUlNRkyfZ2tjbPbWfg2NMB18Q"}]],"content":{"membership":"join"},"depth":1,"event_id":"$LEwEu0kxrtu5fOiS:kaer.morhen","hashes":{"sha256":"B7M88PhXf3vd1LaFtjQutFu4x/w7fHD28XKZ4sAsJTo"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$0ok8ynDp7kjc95e3:kaer.morhen",{"sha256":"sWCi6Ckp9rDimQON+MrUlNRkyfZ2tjbPbWfg2NMB18Q"}]],"prev_state":[],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"p2vqmuJn7ZBRImctSaKbXCAxCcBlIjPH9JHte1ouIUGy84gpu4eLipOvSBCLL26hXfC0Zrm4WUto6Hr+ohdrCg"}},"state_key":"@userid:kaer.morhen","type":"m.room.member"}`),
|
||||
[]byte(`{"auth_events":[["$0ok8ynDp7kjc95e3:kaer.morhen",{"sha256":"sWCi6Ckp9rDimQON+MrUlNRkyfZ2tjbPbWfg2NMB18Q"}],["$LEwEu0kxrtu5fOiS:kaer.morhen",{"sha256":"1aKajq6DWHru1R1HJjvdWMEavkJJHGaTmPvfuERUXaA"}]],"content":{"join_rule":"public"},"depth":2,"event_id":"$SMHlqUrNhhBBRLeN:kaer.morhen","hashes":{"sha256":"vIuJQvmMjrGxshAkj1SXe0C4RqvMbv4ZADDw9pFCWqQ"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$LEwEu0kxrtu5fOiS:kaer.morhen",{"sha256":"1aKajq6DWHru1R1HJjvdWMEavkJJHGaTmPvfuERUXaA"}]],"prev_state":[],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"hBMsb3Qppo3RaqqAl4JyTgaiWEbW5hlckATky6PrHun+F3YM203TzG7w9clwuQU5F5pZoB1a6nw+to0hN90FAw"}},"state_key":"","type":"m.room.join_rules"}`),
|
||||
[]byte(`{"auth_events":[["$0ok8ynDp7kjc95e3:kaer.morhen",{"sha256":"sWCi6Ckp9rDimQON+MrUlNRkyfZ2tjbPbWfg2NMB18Q"}],["$LEwEu0kxrtu5fOiS:kaer.morhen",{"sha256":"1aKajq6DWHru1R1HJjvdWMEavkJJHGaTmPvfuERUXaA"}]],"content":{"history_visibility":"shared"},"depth":3,"event_id":"$6F1yGIbO0J7TM93h:kaer.morhen","hashes":{"sha256":"Mr23GKSlZW7UCCYLgOWawI2Sg6KIoMjUWO2TDenuOgw"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$SMHlqUrNhhBBRLeN:kaer.morhen",{"sha256":"SylzE8U02I+6eyEHgL+FlU0L5YdqrVp8OOlxKS9VQW0"}]],"prev_state":[],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"sHLKrFI3hKGrEJfpMVZSDS3LvLasQsy50CTsOwru9XTVxgRsPo6wozNtRVjxo1J3Rk18RC9JppovmQ5VR5EcDw"}},"state_key":"","type":"m.room.history_visibility"}`),
|
||||
[]byte(`{"auth_events":[["$0ok8ynDp7kjc95e3:kaer.morhen",{"sha256":"sWCi6Ckp9rDimQON+MrUlNRkyfZ2tjbPbWfg2NMB18Q"}],["$LEwEu0kxrtu5fOiS:kaer.morhen",{"sha256":"1aKajq6DWHru1R1HJjvdWMEavkJJHGaTmPvfuERUXaA"}]],"content":{"ban":50,"events":null,"events_default":0,"invite":0,"kick":50,"redact":50,"state_default":50,"users":null,"users_default":0},"depth":4,"event_id":"$UKNe10XzYzG0TeA9:kaer.morhen","hashes":{"sha256":"ngbP3yja9U5dlckKerUs/fSOhtKxZMCVvsfhPURSS28"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$6F1yGIbO0J7TM93h:kaer.morhen",{"sha256":"A4CucrKSoWX4IaJXhq02mBg1sxIyZEftbC+5p3fZAvk"}]],"prev_state":[],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"zOmwlP01QL3yFchzuR9WHvogOoBZA3oVtNIF3lM0ZfDnqlSYZB9sns27G/4HVq0k7alaK7ZE3oGoCrVnMkPNCw"}},"state_key":"","type":"m.room.power_levels"}`),
|
||||
// messages
|
||||
[]byte(`{"auth_events":[["$0ok8ynDp7kjc95e3:kaer.morhen",{"sha256":"sWCi6Ckp9rDimQON+MrUlNRkyfZ2tjbPbWfg2NMB18Q"}],["$LEwEu0kxrtu5fOiS:kaer.morhen",{"sha256":"1aKajq6DWHru1R1HJjvdWMEavkJJHGaTmPvfuERUXaA"}]],"content":{"body":"Test Message"},"depth":5,"event_id":"$gl2T9l3qm0kUbiIJ:kaer.morhen","hashes":{"sha256":"Qx3nRMHLDPSL5hBAzuX84FiSSP0K0Kju2iFoBWH4Za8"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$UKNe10XzYzG0TeA9:kaer.morhen",{"sha256":"KtSRyMjt0ZSjsv2koixTRCxIRCGoOp6QrKscsW97XRo"}]],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"sqDgv3EG7ml5VREzmT9aZeBpS4gAPNIaIeJOwqjDhY0GPU/BcpX5wY4R7hYLrNe5cChgV+eFy/GWm1Zfg5FfDg"}},"type":"m.room.message"}`),
|
||||
[]byte(`{"auth_events":[["$0ok8ynDp7kjc95e3:kaer.morhen",{"sha256":"sWCi6Ckp9rDimQON+MrUlNRkyfZ2tjbPbWfg2NMB18Q"}],["$LEwEu0kxrtu5fOiS:kaer.morhen",{"sha256":"1aKajq6DWHru1R1HJjvdWMEavkJJHGaTmPvfuERUXaA"}]],"content":{"body":"Test Message"},"depth":6,"event_id":"$MYSbs8m4rEbsCWXD:kaer.morhen","hashes":{"sha256":"kgbYM7v4Ud2YaBsjBTolM4ySg6rHcJNYI6nWhMSdFUA"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$gl2T9l3qm0kUbiIJ:kaer.morhen",{"sha256":"C/rD04h9wGxRdN2G/IBfrgoE1UovzLZ+uskwaKZ37/Q"}]],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"x0UoKh968jj/F5l1/R7Ew0T6CTKuew3PLNHASNxqck/bkNe8yYQiDHXRr+kZxObeqPZZTpaF1+EI+bLU9W8GDQ"}},"type":"m.room.message"}`),
|
||||
[]byte(`{"auth_events":[["$0ok8ynDp7kjc95e3:kaer.morhen",{"sha256":"sWCi6Ckp9rDimQON+MrUlNRkyfZ2tjbPbWfg2NMB18Q"}],["$LEwEu0kxrtu5fOiS:kaer.morhen",{"sha256":"1aKajq6DWHru1R1HJjvdWMEavkJJHGaTmPvfuERUXaA"}]],"content":{"body":"Test Message"},"depth":7,"event_id":"$N5x9WJkl9ClPrAEg:kaer.morhen","hashes":{"sha256":"FWM8oz4yquTunRZ67qlW2gzPDzdWfBP6RPHXhK1I/x8"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$MYSbs8m4rEbsCWXD:kaer.morhen",{"sha256":"fatqgW+SE8mb2wFn3UN+drmluoD4UJ/EcSrL6Ur9q1M"}]],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"Y+LX/xcyufoXMOIoqQBNOzy6lZfUGB1ffgXIrSugk6obMiyAsiRejHQN/pciZXsHKxMJLYRFAz4zSJoS/LGPAA"}},"type":"m.room.message"}`),
|
||||
}
|
||||
testEvents = []gomatrixserverlib.HeaderedEvent{}
|
||||
testStateEvents = make(map[gomatrixserverlib.StateKeyTuple]gomatrixserverlib.HeaderedEvent)
|
||||
|
||||
kafkaTopic = "room_events"
|
||||
)
|
||||
|
||||
func init() {
|
||||
for _, j := range testData {
|
||||
e, err := gomatrixserverlib.NewEventFromTrustedJSON(j, false, testRoomVersion)
|
||||
if err != nil {
|
||||
panic("cannot load test data: " + err.Error())
|
||||
}
|
||||
h := e.Headered(testRoomVersion)
|
||||
testEvents = append(testEvents, h)
|
||||
if e.StateKey() != nil {
|
||||
testStateEvents[gomatrixserverlib.StateKeyTuple{
|
||||
EventType: e.Type(),
|
||||
StateKey: *e.StateKey(),
|
||||
}] = h
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func MustWriteOutputEvent(t *testing.T, producer sarama.SyncProducer, out *roomserverAPI.OutputNewRoomEvent) error {
|
||||
value, err := json.Marshal(roomserverAPI.OutputEvent{
|
||||
Type: roomserverAPI.OutputTypeNewRoomEvent,
|
||||
NewRoomEvent: out,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("failed to marshal output event: %s", err)
|
||||
}
|
||||
_, _, err = producer.SendMessage(&sarama.ProducerMessage{
|
||||
Topic: kafkaTopic,
|
||||
Key: sarama.StringEncoder(out.Event.RoomID()),
|
||||
Value: sarama.ByteEncoder(value),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("failed to send message: %s", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func MustMakeInternalAPI(t *testing.T) (api.CurrentStateInternalAPI, sarama.SyncProducer) {
|
||||
cfg := &config.Dendrite{}
|
||||
cfg.Kafka.Topics.OutputRoomEvent = config.Topic(kafkaTopic)
|
||||
cfg.Database.CurrentState = config.DataSource("file::memory:")
|
||||
db, err := sqlutil.Open(sqlutil.SQLiteDriverName(), "file::memory:", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to open naffka database: %s", err)
|
||||
}
|
||||
naffkaDB, err := naffka.NewSqliteDatabase(db)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to setup naffka database: %s", err)
|
||||
}
|
||||
naff, err := naffka.New(naffkaDB)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create naffka consumer: %s", err)
|
||||
}
|
||||
return NewInternalAPI(cfg, naff), naff
|
||||
}
|
||||
|
||||
func TestQueryCurrentState(t *testing.T) {
|
||||
currStateAPI, producer := MustMakeInternalAPI(t)
|
||||
plTuple := gomatrixserverlib.StateKeyTuple{
|
||||
EventType: "m.room.power_levels",
|
||||
StateKey: "",
|
||||
}
|
||||
plEvent := testEvents[4]
|
||||
MustWriteOutputEvent(t, producer, &roomserverAPI.OutputNewRoomEvent{
|
||||
Event: plEvent,
|
||||
AddsStateEventIDs: []string{plEvent.EventID()},
|
||||
})
|
||||
// we have no good way to know /when/ the server has consumed the event
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
testCases := []struct {
|
||||
req api.QueryCurrentStateRequest
|
||||
wantRes api.QueryCurrentStateResponse
|
||||
wantErr error
|
||||
}{
|
||||
{
|
||||
req: api.QueryCurrentStateRequest{
|
||||
RoomID: plEvent.RoomID(),
|
||||
StateTuples: []gomatrixserverlib.StateKeyTuple{
|
||||
plTuple,
|
||||
},
|
||||
},
|
||||
wantRes: api.QueryCurrentStateResponse{
|
||||
StateEvents: map[gomatrixserverlib.StateKeyTuple]*gomatrixserverlib.HeaderedEvent{
|
||||
plTuple: &plEvent,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
runCases := func(testAPI api.CurrentStateInternalAPI) {
|
||||
for _, tc := range testCases {
|
||||
var gotRes api.QueryCurrentStateResponse
|
||||
gotErr := testAPI.QueryCurrentState(context.TODO(), &tc.req, &gotRes)
|
||||
if tc.wantErr == nil && gotErr != nil || tc.wantErr != nil && gotErr == nil {
|
||||
t.Errorf("QueryCurrentState error, got %s want %s", gotErr, tc.wantErr)
|
||||
continue
|
||||
}
|
||||
for tuple, wantEvent := range tc.wantRes.StateEvents {
|
||||
gotEvent, ok := gotRes.StateEvents[tuple]
|
||||
if !ok {
|
||||
t.Errorf("QueryCurrentState want tuple %+v but it is missing from the response", tuple)
|
||||
continue
|
||||
}
|
||||
if !reflect.DeepEqual(gotEvent.JSON(), wantEvent.JSON()) {
|
||||
t.Errorf("QueryCurrentState tuple %+v got event JSON %s want %s", tuple, string(gotEvent.JSON()), string(wantEvent.JSON()))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
t.Run("HTTP API", func(t *testing.T) {
|
||||
router := mux.NewRouter().PathPrefix(httputil.InternalPathPrefix).Subrouter()
|
||||
AddInternalRoutes(router, currStateAPI)
|
||||
apiURL, cancel := test.ListenAndServe(t, router, false)
|
||||
defer cancel()
|
||||
httpAPI, err := inthttp.NewCurrentStateAPIClient(apiURL, &http.Client{})
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create HTTP client")
|
||||
}
|
||||
runCases(httpAPI)
|
||||
})
|
||||
t.Run("Monolith", func(t *testing.T) {
|
||||
runCases(currStateAPI)
|
||||
})
|
||||
}
|
||||
50
currentstateserver/internal/api.go
Normal file
50
currentstateserver/internal/api.go
Normal file
|
|
@ -0,0 +1,50 @@
|
|||
// Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package internal
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/matrix-org/dendrite/currentstateserver/api"
|
||||
"github.com/matrix-org/dendrite/currentstateserver/storage"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
type CurrentStateInternalAPI struct {
|
||||
DB storage.Database
|
||||
}
|
||||
|
||||
func (a *CurrentStateInternalAPI) QueryCurrentState(ctx context.Context, req *api.QueryCurrentStateRequest, res *api.QueryCurrentStateResponse) error {
|
||||
res.StateEvents = make(map[gomatrixserverlib.StateKeyTuple]*gomatrixserverlib.HeaderedEvent)
|
||||
for _, tuple := range req.StateTuples {
|
||||
ev, err := a.DB.GetStateEvent(ctx, req.RoomID, tuple.EventType, tuple.StateKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if ev != nil {
|
||||
res.StateEvents[tuple] = ev
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *CurrentStateInternalAPI) QueryRoomsForUser(ctx context.Context, req *api.QueryRoomsForUserRequest, res *api.QueryRoomsForUserResponse) error {
|
||||
roomIDs, err := a.DB.GetRoomsByMembership(ctx, req.UserID, req.WantMembership)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
res.RoomIDs = roomIDs
|
||||
return nil
|
||||
}
|
||||
75
currentstateserver/inthttp/client.go
Normal file
75
currentstateserver/inthttp/client.go
Normal file
|
|
@ -0,0 +1,75 @@
|
|||
// Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package inthttp
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"net/http"
|
||||
|
||||
"github.com/matrix-org/dendrite/currentstateserver/api"
|
||||
"github.com/matrix-org/dendrite/internal/httputil"
|
||||
"github.com/opentracing/opentracing-go"
|
||||
)
|
||||
|
||||
// HTTP paths for the internal HTTP APIs
|
||||
const (
|
||||
QueryCurrentStatePath = "/currentstateserver/queryCurrentState"
|
||||
QueryRoomsForUserPath = "/currentstateserver/queryRoomsForUser"
|
||||
)
|
||||
|
||||
// NewCurrentStateAPIClient creates a CurrentStateInternalAPI implemented by talking to a HTTP POST API.
|
||||
// If httpClient is nil an error is returned
|
||||
func NewCurrentStateAPIClient(
|
||||
apiURL string,
|
||||
httpClient *http.Client,
|
||||
) (api.CurrentStateInternalAPI, error) {
|
||||
if httpClient == nil {
|
||||
return nil, errors.New("NewCurrentStateAPIClient: httpClient is <nil>")
|
||||
}
|
||||
return &httpCurrentStateInternalAPI{
|
||||
apiURL: apiURL,
|
||||
httpClient: httpClient,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type httpCurrentStateInternalAPI struct {
|
||||
apiURL string
|
||||
httpClient *http.Client
|
||||
}
|
||||
|
||||
func (h *httpCurrentStateInternalAPI) QueryCurrentState(
|
||||
ctx context.Context,
|
||||
request *api.QueryCurrentStateRequest,
|
||||
response *api.QueryCurrentStateResponse,
|
||||
) error {
|
||||
span, ctx := opentracing.StartSpanFromContext(ctx, "QueryCurrentState")
|
||||
defer span.Finish()
|
||||
|
||||
apiURL := h.apiURL + QueryCurrentStatePath
|
||||
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
|
||||
}
|
||||
|
||||
func (h *httpCurrentStateInternalAPI) QueryRoomsForUser(
|
||||
ctx context.Context,
|
||||
request *api.QueryRoomsForUserRequest,
|
||||
response *api.QueryRoomsForUserResponse,
|
||||
) error {
|
||||
span, ctx := opentracing.StartSpanFromContext(ctx, "QueryRoomsForUser")
|
||||
defer span.Finish()
|
||||
|
||||
apiURL := h.apiURL + QueryRoomsForUserPath
|
||||
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
|
||||
}
|
||||
54
currentstateserver/inthttp/server.go
Normal file
54
currentstateserver/inthttp/server.go
Normal file
|
|
@ -0,0 +1,54 @@
|
|||
// Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package inthttp
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/matrix-org/dendrite/currentstateserver/api"
|
||||
"github.com/matrix-org/dendrite/internal/httputil"
|
||||
"github.com/matrix-org/util"
|
||||
)
|
||||
|
||||
func AddRoutes(internalAPIMux *mux.Router, intAPI api.CurrentStateInternalAPI) {
|
||||
internalAPIMux.Handle(QueryCurrentStatePath,
|
||||
httputil.MakeInternalAPI("queryCurrentState", func(req *http.Request) util.JSONResponse {
|
||||
request := api.QueryCurrentStateRequest{}
|
||||
response := api.QueryCurrentStateResponse{}
|
||||
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
|
||||
return util.MessageResponse(http.StatusBadRequest, err.Error())
|
||||
}
|
||||
if err := intAPI.QueryCurrentState(req.Context(), &request, &response); err != nil {
|
||||
return util.ErrorResponse(err)
|
||||
}
|
||||
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
||||
}),
|
||||
)
|
||||
internalAPIMux.Handle(QueryRoomsForUserPath,
|
||||
httputil.MakeInternalAPI("queryRoomsForUser", func(req *http.Request) util.JSONResponse {
|
||||
request := api.QueryRoomsForUserRequest{}
|
||||
response := api.QueryRoomsForUserResponse{}
|
||||
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
|
||||
return util.MessageResponse(http.StatusBadRequest, err.Error())
|
||||
}
|
||||
if err := intAPI.QueryRoomsForUser(req.Context(), &request, &response); err != nil {
|
||||
return util.ErrorResponse(err)
|
||||
}
|
||||
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
||||
}),
|
||||
)
|
||||
}
|
||||
34
currentstateserver/storage/interface.go
Normal file
34
currentstateserver/storage/interface.go
Normal file
|
|
@ -0,0 +1,34 @@
|
|||
// Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
type Database interface {
|
||||
internal.PartitionStorer
|
||||
// StoreStateEvents updates the database with new events from the roomserver.
|
||||
StoreStateEvents(ctx context.Context, addStateEvents []gomatrixserverlib.HeaderedEvent, removeStateEventIDs []string) error
|
||||
// GetStateEvent returns the state event of a given type for a given room with a given state key
|
||||
// If no event could be found, returns nil
|
||||
// If there was an issue during the retrieval, returns an error
|
||||
GetStateEvent(ctx context.Context, roomID, evType, stateKey string) (*gomatrixserverlib.HeaderedEvent, error)
|
||||
// GetRoomsByMembership returns a list of room IDs matching the provided membership and user ID (as state_key).
|
||||
GetRoomsByMembership(ctx context.Context, userID, membership string) ([]string, error)
|
||||
}
|
||||
208
currentstateserver/storage/postgres/current_room_state_table.go
Normal file
208
currentstateserver/storage/postgres/current_room_state_table.go
Normal file
|
|
@ -0,0 +1,208 @@
|
|||
// Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package postgres
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"strconv"
|
||||
|
||||
"github.com/lib/pq"
|
||||
"github.com/matrix-org/dendrite/currentstateserver/storage/tables"
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
var leaveEnum = strconv.Itoa(tables.MembershipToEnum["leave"])
|
||||
|
||||
var currentRoomStateSchema = `
|
||||
-- Stores the current room state for every room.
|
||||
CREATE TABLE IF NOT EXISTS currentstate_current_room_state (
|
||||
-- The 'room_id' key for the state event.
|
||||
room_id TEXT NOT NULL,
|
||||
-- The state event ID
|
||||
event_id TEXT NOT NULL,
|
||||
-- The state event type e.g 'm.room.member'
|
||||
type TEXT NOT NULL,
|
||||
-- The 'sender' property of the event.
|
||||
sender TEXT NOT NULL,
|
||||
-- The state_key value for this state event e.g ''
|
||||
state_key TEXT NOT NULL,
|
||||
-- The JSON for the event. Stored as TEXT because this should be valid UTF-8.
|
||||
headered_event_json TEXT NOT NULL,
|
||||
-- The 'content.membership' enum value if this event is an m.room.member event.
|
||||
membership SMALLINT NOT NULL DEFAULT 0,
|
||||
-- Clobber based on 3-uple of room_id, type and state_key
|
||||
CONSTRAINT currentstate_current_room_state_unique UNIQUE (room_id, type, state_key)
|
||||
);
|
||||
-- for event deletion
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS currentstate_event_id_idx ON currentstate_current_room_state(event_id, room_id, type, sender);
|
||||
-- for querying membership states of users
|
||||
CREATE INDEX IF NOT EXISTS currentstate_membership_idx ON currentstate_current_room_state(type, state_key, membership)
|
||||
WHERE membership IS NOT NULL AND membership != ` + leaveEnum + `;
|
||||
`
|
||||
|
||||
const upsertRoomStateSQL = "" +
|
||||
"INSERT INTO currentstate_current_room_state (room_id, event_id, type, sender, state_key, headered_event_json, membership)" +
|
||||
" VALUES ($1, $2, $3, $4, $5, $6, $7)" +
|
||||
" ON CONFLICT ON CONSTRAINT currentstate_current_room_state_unique" +
|
||||
" DO UPDATE SET event_id = $2, sender=$4, headered_event_json = $6, membership = $7"
|
||||
|
||||
const deleteRoomStateByEventIDSQL = "" +
|
||||
"DELETE FROM currentstate_current_room_state WHERE event_id = $1"
|
||||
|
||||
const selectRoomIDsWithMembershipSQL = "" +
|
||||
"SELECT room_id FROM currentstate_current_room_state WHERE type = 'm.room.member' AND state_key = $1 AND membership = $2"
|
||||
|
||||
const selectStateEventSQL = "" +
|
||||
"SELECT headered_event_json FROM currentstate_current_room_state WHERE room_id = $1 AND type = $2 AND state_key = $3"
|
||||
|
||||
const selectEventsWithEventIDsSQL = "" +
|
||||
"SELECT headered_event_json FROM currentstate_current_room_state WHERE event_id = ANY($1)"
|
||||
|
||||
type currentRoomStateStatements struct {
|
||||
upsertRoomStateStmt *sql.Stmt
|
||||
deleteRoomStateByEventIDStmt *sql.Stmt
|
||||
selectRoomIDsWithMembershipStmt *sql.Stmt
|
||||
selectEventsWithEventIDsStmt *sql.Stmt
|
||||
selectStateEventStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func NewPostgresCurrentRoomStateTable(db *sql.DB) (tables.CurrentRoomState, error) {
|
||||
s := ¤tRoomStateStatements{}
|
||||
_, err := db.Exec(currentRoomStateSchema)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.upsertRoomStateStmt, err = db.Prepare(upsertRoomStateSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.deleteRoomStateByEventIDStmt, err = db.Prepare(deleteRoomStateByEventIDSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.selectRoomIDsWithMembershipStmt, err = db.Prepare(selectRoomIDsWithMembershipSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.selectEventsWithEventIDsStmt, err = db.Prepare(selectEventsWithEventIDsSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.selectStateEventStmt, err = db.Prepare(selectStateEventSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// SelectRoomIDsWithMembership returns the list of room IDs which have the given user in the given membership state.
|
||||
func (s *currentRoomStateStatements) SelectRoomIDsWithMembership(
|
||||
ctx context.Context,
|
||||
txn *sql.Tx,
|
||||
userID string,
|
||||
membershipEnum int,
|
||||
) ([]string, error) {
|
||||
stmt := sqlutil.TxStmt(txn, s.selectRoomIDsWithMembershipStmt)
|
||||
rows, err := stmt.QueryContext(ctx, userID, membershipEnum)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer internal.CloseAndLogIfError(ctx, rows, "selectRoomIDsWithMembership: rows.close() failed")
|
||||
|
||||
var result []string
|
||||
for rows.Next() {
|
||||
var roomID string
|
||||
if err := rows.Scan(&roomID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result = append(result, roomID)
|
||||
}
|
||||
return result, rows.Err()
|
||||
}
|
||||
|
||||
func (s *currentRoomStateStatements) DeleteRoomStateByEventID(
|
||||
ctx context.Context, txn *sql.Tx, eventID string,
|
||||
) error {
|
||||
stmt := sqlutil.TxStmt(txn, s.deleteRoomStateByEventIDStmt)
|
||||
_, err := stmt.ExecContext(ctx, eventID)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *currentRoomStateStatements) UpsertRoomState(
|
||||
ctx context.Context, txn *sql.Tx,
|
||||
event gomatrixserverlib.HeaderedEvent, membershipEnum int,
|
||||
) error {
|
||||
headeredJSON, err := json.Marshal(event)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// upsert state event
|
||||
stmt := sqlutil.TxStmt(txn, s.upsertRoomStateStmt)
|
||||
_, err = stmt.ExecContext(
|
||||
ctx,
|
||||
event.RoomID(),
|
||||
event.EventID(),
|
||||
event.Type(),
|
||||
event.Sender(),
|
||||
*event.StateKey(),
|
||||
headeredJSON,
|
||||
membershipEnum,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *currentRoomStateStatements) SelectEventsWithEventIDs(
|
||||
ctx context.Context, txn *sql.Tx, eventIDs []string,
|
||||
) ([]gomatrixserverlib.HeaderedEvent, error) {
|
||||
stmt := sqlutil.TxStmt(txn, s.selectEventsWithEventIDsStmt)
|
||||
rows, err := stmt.QueryContext(ctx, pq.StringArray(eventIDs))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer internal.CloseAndLogIfError(ctx, rows, "selectEventsWithEventIDs: rows.close() failed")
|
||||
result := []gomatrixserverlib.HeaderedEvent{}
|
||||
for rows.Next() {
|
||||
var eventBytes []byte
|
||||
if err := rows.Scan(&eventBytes); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// TODO: Handle redacted events
|
||||
var ev gomatrixserverlib.HeaderedEvent
|
||||
if err := json.Unmarshal(eventBytes, &ev); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result = append(result, ev)
|
||||
}
|
||||
return result, rows.Err()
|
||||
}
|
||||
|
||||
func (s *currentRoomStateStatements) SelectStateEvent(
|
||||
ctx context.Context, roomID, evType, stateKey string,
|
||||
) (*gomatrixserverlib.HeaderedEvent, error) {
|
||||
stmt := s.selectStateEventStmt
|
||||
var res []byte
|
||||
err := stmt.QueryRowContext(ctx, roomID, evType, stateKey).Scan(&res)
|
||||
if err == sql.ErrNoRows {
|
||||
return nil, nil
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var ev gomatrixserverlib.HeaderedEvent
|
||||
if err = json.Unmarshal(res, &ev); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &ev, err
|
||||
}
|
||||
35
currentstateserver/storage/postgres/storage.go
Normal file
35
currentstateserver/storage/postgres/storage.go
Normal file
|
|
@ -0,0 +1,35 @@
|
|||
package postgres
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
|
||||
"github.com/matrix-org/dendrite/currentstateserver/storage/shared"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
)
|
||||
|
||||
type Database struct {
|
||||
shared.Database
|
||||
db *sql.DB
|
||||
sqlutil.PartitionOffsetStatements
|
||||
}
|
||||
|
||||
// NewDatabase creates a new sync server database
|
||||
func NewDatabase(dbDataSourceName string, dbProperties sqlutil.DbProperties) (*Database, error) {
|
||||
var d Database
|
||||
var err error
|
||||
if d.db, err = sqlutil.Open("postgres", dbDataSourceName, dbProperties); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = d.PartitionOffsetStatements.Prepare(d.db, "currentstate"); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
currRoomState, err := NewPostgresCurrentRoomStateTable(d.db)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
d.Database = shared.Database{
|
||||
DB: d.db,
|
||||
CurrentRoomState: currRoomState,
|
||||
}
|
||||
return &d, nil
|
||||
}
|
||||
78
currentstateserver/storage/shared/storage.go
Normal file
78
currentstateserver/storage/shared/storage.go
Normal file
|
|
@ -0,0 +1,78 @@
|
|||
// Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package shared
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
|
||||
"github.com/matrix-org/dendrite/currentstateserver/storage/tables"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
type Database struct {
|
||||
DB *sql.DB
|
||||
CurrentRoomState tables.CurrentRoomState
|
||||
}
|
||||
|
||||
func (d *Database) GetStateEvent(ctx context.Context, roomID, evType, stateKey string) (*gomatrixserverlib.HeaderedEvent, error) {
|
||||
return d.CurrentRoomState.SelectStateEvent(ctx, roomID, evType, stateKey)
|
||||
}
|
||||
|
||||
func (d *Database) StoreStateEvents(ctx context.Context, addStateEvents []gomatrixserverlib.HeaderedEvent,
|
||||
removeStateEventIDs []string) error {
|
||||
return sqlutil.WithTransaction(d.DB, func(txn *sql.Tx) error {
|
||||
// remove first, then add, as we do not ever delete state, but do replace state which is a remove followed by an add.
|
||||
for _, eventID := range removeStateEventIDs {
|
||||
if err := d.CurrentRoomState.DeleteRoomStateByEventID(ctx, txn, eventID); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
for _, event := range addStateEvents {
|
||||
if event.StateKey() == nil {
|
||||
// ignore non state events
|
||||
continue
|
||||
}
|
||||
var membershipEnum int
|
||||
if event.Type() == "m.room.member" {
|
||||
membership, err := event.Membership()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
enum, ok := tables.MembershipToEnum[membership]
|
||||
if !ok {
|
||||
return fmt.Errorf("unknown membership: %s", membership)
|
||||
}
|
||||
membershipEnum = enum
|
||||
}
|
||||
|
||||
if err := d.CurrentRoomState.UpsertRoomState(ctx, txn, event, membershipEnum); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (d *Database) GetRoomsByMembership(ctx context.Context, userID, membership string) ([]string, error) {
|
||||
enum, ok := tables.MembershipToEnum[membership]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("unknown membership: %s", membership)
|
||||
}
|
||||
return d.CurrentRoomState.SelectRoomIDsWithMembership(ctx, nil, userID, enum)
|
||||
}
|
||||
201
currentstateserver/storage/sqlite3/current_room_state_table.go
Normal file
201
currentstateserver/storage/sqlite3/current_room_state_table.go
Normal file
|
|
@ -0,0 +1,201 @@
|
|||
// Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package sqlite3
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"strings"
|
||||
|
||||
"github.com/matrix-org/dendrite/currentstateserver/storage/tables"
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
const currentRoomStateSchema = `
|
||||
-- Stores the current room state for every room.
|
||||
CREATE TABLE IF NOT EXISTS currentstate_current_room_state (
|
||||
room_id TEXT NOT NULL,
|
||||
event_id TEXT NOT NULL,
|
||||
type TEXT NOT NULL,
|
||||
sender TEXT NOT NULL,
|
||||
state_key TEXT NOT NULL,
|
||||
headered_event_json TEXT NOT NULL,
|
||||
membership INTEGER NOT NULL DEFAULT 0,
|
||||
UNIQUE (room_id, type, state_key)
|
||||
);
|
||||
-- for event deletion
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS currentstate_event_id_idx ON currentstate_current_room_state(event_id, room_id, type, sender);
|
||||
-- for querying membership states of users
|
||||
-- CREATE INDEX IF NOT EXISTS currentstate_membership_idx ON currentstate_current_room_state(type, state_key, membership) WHERE membership IS NOT NULL AND membership != 'leave';
|
||||
`
|
||||
|
||||
const upsertRoomStateSQL = "" +
|
||||
"INSERT INTO currentstate_current_room_state (room_id, event_id, type, sender, state_key, headered_event_json, membership)" +
|
||||
" VALUES ($1, $2, $3, $4, $5, $6, $7)" +
|
||||
" ON CONFLICT (event_id, room_id, type, sender)" +
|
||||
" DO UPDATE SET event_id = $2, sender=$4, headered_event_json = $6, membership = $7"
|
||||
|
||||
const deleteRoomStateByEventIDSQL = "" +
|
||||
"DELETE FROM currentstate_current_room_state WHERE event_id = $1"
|
||||
|
||||
const selectRoomIDsWithMembershipSQL = "" +
|
||||
"SELECT room_id FROM currentstate_current_room_state WHERE type = 'm.room.member' AND state_key = $1 AND membership = $2"
|
||||
|
||||
const selectStateEventSQL = "" +
|
||||
"SELECT headered_event_json FROM currentstate_current_room_state WHERE room_id = $1 AND type = $2 AND state_key = $3"
|
||||
|
||||
const selectEventsWithEventIDsSQL = "" +
|
||||
// TODO: The session_id and transaction_id blanks are here because otherwise
|
||||
// the rowsToStreamEvents expects there to be exactly five columns. We need to
|
||||
// figure out if these really need to be in the DB, and if so, we need a
|
||||
// better permanent fix for this. - neilalexander, 2 Jan 2020
|
||||
"SELECT added_at, headered_event_json, 0 AS session_id, false AS exclude_from_sync, '' AS transaction_id" +
|
||||
" FROM currentstate_current_room_state WHERE event_id IN ($1)"
|
||||
|
||||
type currentRoomStateStatements struct {
|
||||
upsertRoomStateStmt *sql.Stmt
|
||||
deleteRoomStateByEventIDStmt *sql.Stmt
|
||||
selectRoomIDsWithMembershipStmt *sql.Stmt
|
||||
selectStateEventStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func NewSqliteCurrentRoomStateTable(db *sql.DB) (tables.CurrentRoomState, error) {
|
||||
s := ¤tRoomStateStatements{}
|
||||
_, err := db.Exec(currentRoomStateSchema)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.upsertRoomStateStmt, err = db.Prepare(upsertRoomStateSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.deleteRoomStateByEventIDStmt, err = db.Prepare(deleteRoomStateByEventIDSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.selectRoomIDsWithMembershipStmt, err = db.Prepare(selectRoomIDsWithMembershipSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.selectStateEventStmt, err = db.Prepare(selectStateEventSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// SelectRoomIDsWithMembership returns the list of room IDs which have the given user in the given membership state.
|
||||
func (s *currentRoomStateStatements) SelectRoomIDsWithMembership(
|
||||
ctx context.Context,
|
||||
txn *sql.Tx,
|
||||
userID string,
|
||||
membershipEnum int,
|
||||
) ([]string, error) {
|
||||
stmt := sqlutil.TxStmt(txn, s.selectRoomIDsWithMembershipStmt)
|
||||
rows, err := stmt.QueryContext(ctx, userID, membershipEnum)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer internal.CloseAndLogIfError(ctx, rows, "selectRoomIDsWithMembership: rows.close() failed")
|
||||
|
||||
var result []string
|
||||
for rows.Next() {
|
||||
var roomID string
|
||||
if err := rows.Scan(&roomID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result = append(result, roomID)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (s *currentRoomStateStatements) DeleteRoomStateByEventID(
|
||||
ctx context.Context, txn *sql.Tx, eventID string,
|
||||
) error {
|
||||
stmt := sqlutil.TxStmt(txn, s.deleteRoomStateByEventIDStmt)
|
||||
_, err := stmt.ExecContext(ctx, eventID)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *currentRoomStateStatements) UpsertRoomState(
|
||||
ctx context.Context, txn *sql.Tx,
|
||||
event gomatrixserverlib.HeaderedEvent, membershipEnum int,
|
||||
) error {
|
||||
headeredJSON, err := json.Marshal(event)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// upsert state event
|
||||
stmt := sqlutil.TxStmt(txn, s.upsertRoomStateStmt)
|
||||
_, err = stmt.ExecContext(
|
||||
ctx,
|
||||
event.RoomID(),
|
||||
event.EventID(),
|
||||
event.Type(),
|
||||
event.Sender(),
|
||||
*event.StateKey(),
|
||||
headeredJSON,
|
||||
membershipEnum,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *currentRoomStateStatements) SelectEventsWithEventIDs(
|
||||
ctx context.Context, txn *sql.Tx, eventIDs []string,
|
||||
) ([]gomatrixserverlib.HeaderedEvent, error) {
|
||||
iEventIDs := make([]interface{}, len(eventIDs))
|
||||
for k, v := range eventIDs {
|
||||
iEventIDs[k] = v
|
||||
}
|
||||
query := strings.Replace(selectEventsWithEventIDsSQL, "($1)", sqlutil.QueryVariadic(len(iEventIDs)), 1)
|
||||
rows, err := txn.QueryContext(ctx, query, iEventIDs...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer internal.CloseAndLogIfError(ctx, rows, "selectEventsWithEventIDs: rows.close() failed")
|
||||
result := []gomatrixserverlib.HeaderedEvent{}
|
||||
for rows.Next() {
|
||||
var eventBytes []byte
|
||||
if err := rows.Scan(&eventBytes); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// TODO: Handle redacted events
|
||||
var ev gomatrixserverlib.HeaderedEvent
|
||||
if err := json.Unmarshal(eventBytes, &ev); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result = append(result, ev)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (s *currentRoomStateStatements) SelectStateEvent(
|
||||
ctx context.Context, roomID, evType, stateKey string,
|
||||
) (*gomatrixserverlib.HeaderedEvent, error) {
|
||||
stmt := s.selectStateEventStmt
|
||||
var res []byte
|
||||
err := stmt.QueryRowContext(ctx, roomID, evType, stateKey).Scan(&res)
|
||||
if err == sql.ErrNoRows {
|
||||
return nil, nil
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var ev gomatrixserverlib.HeaderedEvent
|
||||
if err = json.Unmarshal(res, &ev); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &ev, err
|
||||
}
|
||||
39
currentstateserver/storage/sqlite3/storage.go
Normal file
39
currentstateserver/storage/sqlite3/storage.go
Normal file
|
|
@ -0,0 +1,39 @@
|
|||
package sqlite3
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
|
||||
"github.com/matrix-org/dendrite/currentstateserver/storage/shared"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
)
|
||||
|
||||
type Database struct {
|
||||
shared.Database
|
||||
db *sql.DB
|
||||
sqlutil.PartitionOffsetStatements
|
||||
}
|
||||
|
||||
// NewDatabase creates a new sync server database
|
||||
// nolint: gocyclo
|
||||
func NewDatabase(dataSourceName string) (*Database, error) {
|
||||
var d Database
|
||||
cs, err := sqlutil.ParseFileURI(dataSourceName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if d.db, err = sqlutil.Open(sqlutil.SQLiteDriverName(), cs, nil); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = d.PartitionOffsetStatements.Prepare(d.db, "currentstate"); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
currRoomState, err := NewSqliteCurrentRoomStateTable(d.db)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
d.Database = shared.Database{
|
||||
DB: d.db,
|
||||
CurrentRoomState: currRoomState,
|
||||
}
|
||||
return &d, nil
|
||||
}
|
||||
41
currentstateserver/storage/storage.go
Normal file
41
currentstateserver/storage/storage.go
Normal file
|
|
@ -0,0 +1,41 @@
|
|||
// Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// +build !wasm
|
||||
|
||||
package storage
|
||||
|
||||
import (
|
||||
"net/url"
|
||||
|
||||
"github.com/matrix-org/dendrite/currentstateserver/storage/postgres"
|
||||
"github.com/matrix-org/dendrite/currentstateserver/storage/sqlite3"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
)
|
||||
|
||||
// NewDatabase opens a database connection.
|
||||
func NewDatabase(dataSourceName string, dbProperties sqlutil.DbProperties) (Database, error) {
|
||||
uri, err := url.Parse(dataSourceName)
|
||||
if err != nil {
|
||||
return postgres.NewDatabase(dataSourceName, dbProperties)
|
||||
}
|
||||
switch uri.Scheme {
|
||||
case "postgres":
|
||||
return postgres.NewDatabase(dataSourceName, dbProperties)
|
||||
case "file":
|
||||
return sqlite3.NewDatabase(dataSourceName)
|
||||
default:
|
||||
return postgres.NewDatabase(dataSourceName, dbProperties)
|
||||
}
|
||||
}
|
||||
42
currentstateserver/storage/storage_wasm.go
Normal file
42
currentstateserver/storage/storage_wasm.go
Normal file
|
|
@ -0,0 +1,42 @@
|
|||
// Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package storage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/url"
|
||||
|
||||
"github.com/matrix-org/dendrite/currentstateserver/storage/sqlite3"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
)
|
||||
|
||||
// NewDatabase opens a database connection.
|
||||
func NewDatabase(
|
||||
dataSourceName string,
|
||||
dbProperties sqlutil.DbProperties, // nolint:unparam
|
||||
) (Database, error) {
|
||||
uri, err := url.Parse(dataSourceName)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Cannot use postgres implementation")
|
||||
}
|
||||
switch uri.Scheme {
|
||||
case "postgres":
|
||||
return nil, fmt.Errorf("Cannot use postgres implementation")
|
||||
case "file":
|
||||
return sqlite3.NewDatabase(dataSourceName)
|
||||
default:
|
||||
return nil, fmt.Errorf("Cannot use postgres implementation")
|
||||
}
|
||||
}
|
||||
44
currentstateserver/storage/tables/interface.go
Normal file
44
currentstateserver/storage/tables/interface.go
Normal file
|
|
@ -0,0 +1,44 @@
|
|||
// Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package tables
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
var MembershipToEnum = map[string]int{
|
||||
gomatrixserverlib.Invite: 1,
|
||||
gomatrixserverlib.Join: 2,
|
||||
gomatrixserverlib.Leave: 3,
|
||||
gomatrixserverlib.Ban: 4,
|
||||
}
|
||||
var EnumToMembership = map[int]string{
|
||||
1: gomatrixserverlib.Invite,
|
||||
2: gomatrixserverlib.Join,
|
||||
3: gomatrixserverlib.Leave,
|
||||
4: gomatrixserverlib.Ban,
|
||||
}
|
||||
|
||||
type CurrentRoomState interface {
|
||||
SelectStateEvent(ctx context.Context, roomID, evType, stateKey string) (*gomatrixserverlib.HeaderedEvent, error)
|
||||
SelectEventsWithEventIDs(ctx context.Context, txn *sql.Tx, eventIDs []string) ([]gomatrixserverlib.HeaderedEvent, error)
|
||||
UpsertRoomState(ctx context.Context, txn *sql.Tx, event gomatrixserverlib.HeaderedEvent, membershipEnum int) error
|
||||
DeleteRoomStateByEventID(ctx context.Context, txn *sql.Tx, eventID string) error
|
||||
// SelectRoomIDsWithMembership returns the list of room IDs which have the given user in the given membership state.
|
||||
SelectRoomIDsWithMembership(ctx context.Context, txn *sql.Tx, userID string, membershipEnum int) ([]string, error)
|
||||
}
|
||||
|
|
@ -121,6 +121,7 @@ database:
|
|||
federation_sender: "postgres://dendrite:itsasecret@localhost/dendrite_federationsender?sslmode=disable"
|
||||
appservice: "postgres://dendrite:itsasecret@localhost/dendrite_appservice?sslmode=disable"
|
||||
public_rooms_api: "postgres://dendrite:itsasecret@localhost/dendrite_publicroomsapi?sslmode=disable"
|
||||
current_state: "postgres://dendrite:itsasecret@localhost/dendrite_currentstate?sslmode=disable"
|
||||
max_open_conns: 100
|
||||
max_idle_conns: 2
|
||||
conn_max_lifetime: -1
|
||||
|
|
@ -143,6 +144,7 @@ listen:
|
|||
key_server: "localhost:7779"
|
||||
server_key_api: "localhost:7780"
|
||||
user_api: "localhost:7781"
|
||||
current_state_server: "localhost:7782"
|
||||
|
||||
# The configuration for tracing the dendrite components.
|
||||
tracing:
|
||||
|
|
|
|||
|
|
@ -160,10 +160,13 @@ type Dendrite struct {
|
|||
// Postgres Config
|
||||
Database struct {
|
||||
// The Account database stores the login details and account information
|
||||
// for local users. It is accessed by the ClientAPI.
|
||||
// for local users. It is accessed by the UserAPI.
|
||||
Account DataSource `yaml:"account"`
|
||||
// The CurrentState database stores the current state of all rooms.
|
||||
// It is accessed by the CurrentStateServer.
|
||||
CurrentState DataSource `yaml:"current_state"`
|
||||
// The Device database stores session information for the devices of logged
|
||||
// in local users. It is accessed by the ClientAPI, the MediaAPI and the SyncAPI.
|
||||
// in local users. It is accessed by the UserAPI.
|
||||
Device DataSource `yaml:"device"`
|
||||
// The MediaAPI database stores information about files uploaded and downloaded
|
||||
// by local users. It is only accessed by the MediaAPI.
|
||||
|
|
@ -222,6 +225,7 @@ type Dendrite struct {
|
|||
Bind struct {
|
||||
MediaAPI Address `yaml:"media_api"`
|
||||
ClientAPI Address `yaml:"client_api"`
|
||||
CurrentState Address `yaml:"current_state_server"`
|
||||
FederationAPI Address `yaml:"federation_api"`
|
||||
ServerKeyAPI Address `yaml:"server_key_api"`
|
||||
AppServiceAPI Address `yaml:"appservice_api"`
|
||||
|
|
@ -238,6 +242,7 @@ type Dendrite struct {
|
|||
Listen struct {
|
||||
MediaAPI Address `yaml:"media_api"`
|
||||
ClientAPI Address `yaml:"client_api"`
|
||||
CurrentState Address `yaml:"current_state_server"`
|
||||
FederationAPI Address `yaml:"federation_api"`
|
||||
ServerKeyAPI Address `yaml:"server_key_api"`
|
||||
AppServiceAPI Address `yaml:"appservice_api"`
|
||||
|
|
@ -601,6 +606,7 @@ func (config *Dendrite) checkDatabase(configErrs *configErrors) {
|
|||
checkNotEmpty(configErrs, "database.media_api", string(config.Database.MediaAPI))
|
||||
checkNotEmpty(configErrs, "database.sync_api", string(config.Database.SyncAPI))
|
||||
checkNotEmpty(configErrs, "database.room_server", string(config.Database.RoomServer))
|
||||
checkNotEmpty(configErrs, "database.current_state", string(config.Database.CurrentState))
|
||||
}
|
||||
|
||||
// checkListen verifies the parameters listen.* are valid.
|
||||
|
|
@ -613,6 +619,7 @@ func (config *Dendrite) checkListen(configErrs *configErrors) {
|
|||
checkNotEmpty(configErrs, "listen.edu_server", string(config.Listen.EDUServer))
|
||||
checkNotEmpty(configErrs, "listen.server_key_api", string(config.Listen.EDUServer))
|
||||
checkNotEmpty(configErrs, "listen.user_api", string(config.Listen.UserAPI))
|
||||
checkNotEmpty(configErrs, "listen.current_state_server", string(config.Listen.CurrentState))
|
||||
}
|
||||
|
||||
// checkLogging verifies the parameters logging.* are valid.
|
||||
|
|
@ -735,6 +742,15 @@ func (config *Dendrite) UserAPIURL() string {
|
|||
return "http://" + string(config.Listen.UserAPI)
|
||||
}
|
||||
|
||||
// CurrentStateAPIURL returns an HTTP URL for where the currentstateserver is listening.
|
||||
func (config *Dendrite) CurrentStateAPIURL() string {
|
||||
// Hard code the currentstateserver to talk HTTP for now.
|
||||
// If we support HTTPS we need to think of a practical way to do certificate validation.
|
||||
// People setting up servers shouldn't need to get a certificate valid for the public
|
||||
// internet for an internal API.
|
||||
return "http://" + string(config.Listen.CurrentState)
|
||||
}
|
||||
|
||||
// EDUServerURL returns an HTTP URL for where the EDU server is listening.
|
||||
func (config *Dendrite) EDUServerURL() string {
|
||||
// Hard code the EDU server to talk HTTP for now.
|
||||
|
|
@ -753,7 +769,7 @@ func (config *Dendrite) FederationSenderURL() string {
|
|||
return "http://" + string(config.Listen.FederationSender)
|
||||
}
|
||||
|
||||
// FederationSenderURL returns an HTTP URL for where the federation sender is listening.
|
||||
// ServerKeyAPIURL returns an HTTP URL for where the federation sender is listening.
|
||||
func (config *Dendrite) ServerKeyAPIURL() string {
|
||||
// Hard code the server key API server to talk HTTP for now.
|
||||
// If we support HTTPS we need to think of a practical way to do certificate validation.
|
||||
|
|
|
|||
|
|
@ -55,6 +55,7 @@ database:
|
|||
sync_api: "postgresql:///syn_api"
|
||||
room_server: "postgresql:///room_server"
|
||||
appservice: "postgresql:///appservice"
|
||||
current_state: "postgresql:///current_state"
|
||||
listen:
|
||||
room_server: "localhost:7770"
|
||||
client_api: "localhost:7771"
|
||||
|
|
@ -64,6 +65,7 @@ listen:
|
|||
appservice_api: "localhost:7777"
|
||||
edu_server: "localhost:7778"
|
||||
user_api: "localhost:7779"
|
||||
current_state_server: "localhost:7775"
|
||||
logging:
|
||||
- type: "file"
|
||||
level: "info"
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ import (
|
|||
"net/url"
|
||||
"time"
|
||||
|
||||
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
|
||||
"github.com/matrix-org/dendrite/internal/caching"
|
||||
"github.com/matrix-org/dendrite/internal/httputil"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
|
|
@ -37,6 +38,7 @@ import (
|
|||
|
||||
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
|
||||
asinthttp "github.com/matrix-org/dendrite/appservice/inthttp"
|
||||
currentstateinthttp "github.com/matrix-org/dendrite/currentstateserver/inthttp"
|
||||
eduServerAPI "github.com/matrix-org/dendrite/eduserver/api"
|
||||
eduinthttp "github.com/matrix-org/dendrite/eduserver/inthttp"
|
||||
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
|
||||
|
|
@ -171,6 +173,15 @@ func (b *BaseDendrite) UserAPIClient() userapi.UserInternalAPI {
|
|||
return userAPI
|
||||
}
|
||||
|
||||
// CurrentStateAPIClient returns CurrentStateInternalAPI for hitting the currentstateserver over HTTP.
|
||||
func (b *BaseDendrite) CurrentStateAPIClient() currentstateAPI.CurrentStateInternalAPI {
|
||||
stateAPI, err := currentstateinthttp.NewCurrentStateAPIClient(b.Cfg.CurrentStateAPIURL(), b.httpClient)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Panic("UserAPIClient failed", b.httpClient)
|
||||
}
|
||||
return stateAPI
|
||||
}
|
||||
|
||||
// EDUServerClient returns EDUServerInputAPI for hitting the EDU server over HTTP
|
||||
func (b *BaseDendrite) EDUServerClient() eduServerAPI.EDUServerInputAPI {
|
||||
e, err := eduinthttp.NewEDUServerClient(b.Cfg.EDUServerURL(), b.httpClient)
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ import (
|
|||
"github.com/gorilla/mux"
|
||||
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
|
||||
"github.com/matrix-org/dendrite/clientapi"
|
||||
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
|
||||
eduServerAPI "github.com/matrix-org/dendrite/eduserver/api"
|
||||
"github.com/matrix-org/dendrite/federationapi"
|
||||
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
|
||||
|
|
@ -56,6 +57,7 @@ type Monolith struct {
|
|||
RoomserverAPI roomserverAPI.RoomserverInternalAPI
|
||||
ServerKeyAPI serverKeyAPI.ServerKeyInternalAPI
|
||||
UserAPI userapi.UserInternalAPI
|
||||
StateAPI currentstateAPI.CurrentStateInternalAPI
|
||||
|
||||
// TODO: can we remove this? It's weird that we are required the database
|
||||
// yet every other component can do that on its own. libp2p-demo uses a custom
|
||||
|
|
@ -69,9 +71,9 @@ type Monolith struct {
|
|||
// AddAllPublicRoutes attaches all public paths to the given router
|
||||
func (m *Monolith) AddAllPublicRoutes(publicMux *mux.Router) {
|
||||
clientapi.AddPublicRoutes(
|
||||
publicMux, m.Config, m.KafkaConsumer, m.KafkaProducer, m.DeviceDB, m.AccountDB,
|
||||
publicMux, m.Config, m.KafkaProducer, m.DeviceDB, m.AccountDB,
|
||||
m.FedClient, m.RoomserverAPI,
|
||||
m.EDUInternalAPI, m.AppserviceAPI, transactions.New(),
|
||||
m.EDUInternalAPI, m.AppserviceAPI, m.StateAPI, transactions.New(),
|
||||
m.FederationSenderAPI, m.UserAPI,
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -22,7 +22,6 @@ import (
|
|||
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/dendrite/userapi/api"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
type Database interface {
|
||||
|
|
@ -36,10 +35,6 @@ type Database interface {
|
|||
// account already exists, it will return nil, ErrUserExists.
|
||||
CreateAccount(ctx context.Context, localpart, plaintextPassword, appserviceID string) (*api.Account, error)
|
||||
CreateGuestAccount(ctx context.Context) (*api.Account, error)
|
||||
UpdateMemberships(ctx context.Context, eventsToAdd []gomatrixserverlib.Event, idsToRemove []string) error
|
||||
GetMembershipInRoomByLocalpart(ctx context.Context, localpart, roomID string) (authtypes.Membership, error)
|
||||
GetRoomIDsByLocalPart(ctx context.Context, localpart string) ([]string, error)
|
||||
GetMembershipsByLocalpart(ctx context.Context, localpart string) (memberships []authtypes.Membership, err error)
|
||||
SaveAccountData(ctx context.Context, localpart, roomID, dataType string, content json.RawMessage) error
|
||||
GetAccountData(ctx context.Context, localpart string) (global map[string]json.RawMessage, rooms map[string]map[string]json.RawMessage, err error)
|
||||
// GetAccountDataByType returns account data matching a given
|
||||
|
|
|
|||
|
|
@ -1,159 +0,0 @@
|
|||
// Copyright 2017 Vector Creations Ltd
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package postgres
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
|
||||
"github.com/lib/pq"
|
||||
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
)
|
||||
|
||||
const membershipSchema = `
|
||||
-- Stores data about users memberships to rooms.
|
||||
CREATE TABLE IF NOT EXISTS account_memberships (
|
||||
-- The Matrix user ID localpart for the member
|
||||
localpart TEXT NOT NULL,
|
||||
-- The room this user is a member of
|
||||
room_id TEXT NOT NULL,
|
||||
-- The ID of the join membership event
|
||||
event_id TEXT NOT NULL,
|
||||
|
||||
-- A user can only be member of a room once
|
||||
PRIMARY KEY (localpart, room_id)
|
||||
);
|
||||
|
||||
-- Use index to process deletion by ID more efficiently
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS account_membership_event_id ON account_memberships(event_id);
|
||||
`
|
||||
|
||||
const insertMembershipSQL = `
|
||||
INSERT INTO account_memberships(localpart, room_id, event_id) VALUES ($1, $2, $3)
|
||||
ON CONFLICT (localpart, room_id) DO UPDATE SET event_id = EXCLUDED.event_id
|
||||
`
|
||||
|
||||
const selectMembershipsByLocalpartSQL = "" +
|
||||
"SELECT room_id, event_id FROM account_memberships WHERE localpart = $1"
|
||||
|
||||
const selectMembershipInRoomByLocalpartSQL = "" +
|
||||
"SELECT event_id FROM account_memberships WHERE localpart = $1 AND room_id = $2"
|
||||
|
||||
const selectRoomIDsByLocalPartSQL = "" +
|
||||
"SELECT room_id FROM account_memberships WHERE localpart = $1"
|
||||
|
||||
const deleteMembershipsByEventIDsSQL = "" +
|
||||
"DELETE FROM account_memberships WHERE event_id = ANY($1)"
|
||||
|
||||
type membershipStatements struct {
|
||||
deleteMembershipsByEventIDsStmt *sql.Stmt
|
||||
insertMembershipStmt *sql.Stmt
|
||||
selectMembershipInRoomByLocalpartStmt *sql.Stmt
|
||||
selectMembershipsByLocalpartStmt *sql.Stmt
|
||||
selectRoomIDsByLocalPartStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func (s *membershipStatements) prepare(db *sql.DB) (err error) {
|
||||
_, err = db.Exec(membershipSchema)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if s.deleteMembershipsByEventIDsStmt, err = db.Prepare(deleteMembershipsByEventIDsSQL); err != nil {
|
||||
return
|
||||
}
|
||||
if s.insertMembershipStmt, err = db.Prepare(insertMembershipSQL); err != nil {
|
||||
return
|
||||
}
|
||||
if s.selectMembershipInRoomByLocalpartStmt, err = db.Prepare(selectMembershipInRoomByLocalpartSQL); err != nil {
|
||||
return
|
||||
}
|
||||
if s.selectMembershipsByLocalpartStmt, err = db.Prepare(selectMembershipsByLocalpartSQL); err != nil {
|
||||
return
|
||||
}
|
||||
if s.selectRoomIDsByLocalPartStmt, err = db.Prepare(selectRoomIDsByLocalPartSQL); err != nil {
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (s *membershipStatements) insertMembership(
|
||||
ctx context.Context, txn *sql.Tx, localpart, roomID, eventID string,
|
||||
) (err error) {
|
||||
stmt := txn.Stmt(s.insertMembershipStmt)
|
||||
_, err = stmt.ExecContext(ctx, localpart, roomID, eventID)
|
||||
return
|
||||
}
|
||||
|
||||
func (s *membershipStatements) deleteMembershipsByEventIDs(
|
||||
ctx context.Context, txn *sql.Tx, eventIDs []string,
|
||||
) (err error) {
|
||||
stmt := txn.Stmt(s.deleteMembershipsByEventIDsStmt)
|
||||
_, err = stmt.ExecContext(ctx, pq.StringArray(eventIDs))
|
||||
return
|
||||
}
|
||||
|
||||
func (s *membershipStatements) selectMembershipInRoomByLocalpart(
|
||||
ctx context.Context, localpart, roomID string,
|
||||
) (authtypes.Membership, error) {
|
||||
membership := authtypes.Membership{Localpart: localpart, RoomID: roomID}
|
||||
stmt := s.selectMembershipInRoomByLocalpartStmt
|
||||
err := stmt.QueryRowContext(ctx, localpart, roomID).Scan(&membership.EventID)
|
||||
|
||||
return membership, err
|
||||
}
|
||||
|
||||
func (s *membershipStatements) selectMembershipsByLocalpart(
|
||||
ctx context.Context, localpart string,
|
||||
) (memberships []authtypes.Membership, err error) {
|
||||
stmt := s.selectMembershipsByLocalpartStmt
|
||||
rows, err := stmt.QueryContext(ctx, localpart)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
memberships = []authtypes.Membership{}
|
||||
|
||||
defer internal.CloseAndLogIfError(ctx, rows, "selectMembershipsByLocalpart: rows.close() failed")
|
||||
for rows.Next() {
|
||||
var m authtypes.Membership
|
||||
m.Localpart = localpart
|
||||
if err = rows.Scan(&m.RoomID, &m.EventID); err != nil {
|
||||
return
|
||||
}
|
||||
memberships = append(memberships, m)
|
||||
}
|
||||
return memberships, rows.Err()
|
||||
}
|
||||
|
||||
func (s *membershipStatements) selectRoomIDsByLocalPart(
|
||||
ctx context.Context, localPart string,
|
||||
) ([]string, error) {
|
||||
stmt := s.selectRoomIDsByLocalPartStmt
|
||||
rows, err := stmt.QueryContext(ctx, localPart)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
roomIDs := []string{}
|
||||
defer rows.Close() // nolint: errcheck
|
||||
for rows.Next() {
|
||||
var roomID string
|
||||
if err = rows.Scan(&roomID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
roomIDs = append(roomIDs, roomID)
|
||||
}
|
||||
return roomIDs, rows.Err()
|
||||
}
|
||||
|
|
@ -37,7 +37,6 @@ type Database struct {
|
|||
sqlutil.PartitionOffsetStatements
|
||||
accounts accountsStatements
|
||||
profiles profilesStatements
|
||||
memberships membershipStatements
|
||||
accountDatas accountDataStatements
|
||||
threepids threepidStatements
|
||||
serverName gomatrixserverlib.ServerName
|
||||
|
|
@ -62,10 +61,6 @@ func NewDatabase(dataSourceName string, dbProperties sqlutil.DbProperties, serve
|
|||
if err = p.prepare(db); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
m := membershipStatements{}
|
||||
if err = m.prepare(db); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ac := accountDataStatements{}
|
||||
if err = ac.prepare(db); err != nil {
|
||||
return nil, err
|
||||
|
|
@ -74,7 +69,7 @@ func NewDatabase(dataSourceName string, dbProperties sqlutil.DbProperties, serve
|
|||
if err = t.prepare(db); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &Database{db, partitions, a, p, m, ac, t, serverName}, nil
|
||||
return &Database{db, partitions, a, p, ac, t, serverName}, nil
|
||||
}
|
||||
|
||||
// GetAccountByPassword returns the account associated with the given localpart and password.
|
||||
|
|
@ -179,112 +174,6 @@ func (d *Database) createAccount(
|
|||
return d.accounts.insertAccount(ctx, txn, localpart, hash, appserviceID)
|
||||
}
|
||||
|
||||
// SaveMembership saves the user matching a given localpart as a member of a given
|
||||
// room. It also stores the ID of the membership event.
|
||||
// If a membership already exists between the user and the room, or if the
|
||||
// insert fails, returns the SQL error
|
||||
func (d *Database) saveMembership(
|
||||
ctx context.Context, txn *sql.Tx, localpart, roomID, eventID string,
|
||||
) error {
|
||||
return d.memberships.insertMembership(ctx, txn, localpart, roomID, eventID)
|
||||
}
|
||||
|
||||
// removeMembershipsByEventIDs removes the memberships corresponding to the
|
||||
// `join` membership events IDs in the eventIDs slice.
|
||||
// If the removal fails, or if there is no membership to remove, returns an error
|
||||
func (d *Database) removeMembershipsByEventIDs(
|
||||
ctx context.Context, txn *sql.Tx, eventIDs []string,
|
||||
) error {
|
||||
return d.memberships.deleteMembershipsByEventIDs(ctx, txn, eventIDs)
|
||||
}
|
||||
|
||||
// UpdateMemberships adds the "join" membership events included in a given state
|
||||
// events array, and removes those which ID is included in a given array of events
|
||||
// IDs. All of the process is run in a transaction, which commits only once/if every
|
||||
// insertion and deletion has been successfully processed.
|
||||
// Returns a SQL error if there was an issue with any part of the process
|
||||
func (d *Database) UpdateMemberships(
|
||||
ctx context.Context, eventsToAdd []gomatrixserverlib.Event, idsToRemove []string,
|
||||
) error {
|
||||
return sqlutil.WithTransaction(d.db, func(txn *sql.Tx) error {
|
||||
if err := d.removeMembershipsByEventIDs(ctx, txn, idsToRemove); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, event := range eventsToAdd {
|
||||
if err := d.newMembership(ctx, txn, event); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// GetMembershipInRoomByLocalpart returns the membership for an user
|
||||
// matching the given localpart if he is a member of the room matching roomID,
|
||||
// if not sql.ErrNoRows is returned.
|
||||
// If there was an issue during the retrieval, returns the SQL error
|
||||
func (d *Database) GetMembershipInRoomByLocalpart(
|
||||
ctx context.Context, localpart, roomID string,
|
||||
) (authtypes.Membership, error) {
|
||||
return d.memberships.selectMembershipInRoomByLocalpart(ctx, localpart, roomID)
|
||||
}
|
||||
|
||||
// GetRoomIDsByLocalPart returns an array containing the room ids of all
|
||||
// the rooms a user matching a given localpart is a member of
|
||||
// If no membership match the given localpart, returns an empty array
|
||||
// If there was an issue during the retrieval, returns the SQL error
|
||||
func (d *Database) GetRoomIDsByLocalPart(
|
||||
ctx context.Context, localpart string,
|
||||
) ([]string, error) {
|
||||
return d.memberships.selectRoomIDsByLocalPart(ctx, localpart)
|
||||
}
|
||||
|
||||
// GetMembershipsByLocalpart returns an array containing the memberships for all
|
||||
// the rooms a user matching a given localpart is a member of
|
||||
// If no membership match the given localpart, returns an empty array
|
||||
// If there was an issue during the retrieval, returns the SQL error
|
||||
func (d *Database) GetMembershipsByLocalpart(
|
||||
ctx context.Context, localpart string,
|
||||
) (memberships []authtypes.Membership, err error) {
|
||||
return d.memberships.selectMembershipsByLocalpart(ctx, localpart)
|
||||
}
|
||||
|
||||
// newMembership saves a new membership in the database.
|
||||
// If the event isn't a valid m.room.member event with type `join`, does nothing.
|
||||
// If an error occurred, returns the SQL error
|
||||
func (d *Database) newMembership(
|
||||
ctx context.Context, txn *sql.Tx, ev gomatrixserverlib.Event,
|
||||
) error {
|
||||
if ev.Type() == "m.room.member" && ev.StateKey() != nil {
|
||||
localpart, serverName, err := gomatrixserverlib.SplitID('@', *ev.StateKey())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// We only want state events from local users
|
||||
if string(serverName) != string(d.serverName) {
|
||||
return nil
|
||||
}
|
||||
|
||||
eventID := ev.EventID()
|
||||
roomID := ev.RoomID()
|
||||
membership, err := ev.Membership()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Only "join" membership events can be considered as new memberships
|
||||
if membership == gomatrixserverlib.Join {
|
||||
if err := d.saveMembership(ctx, txn, localpart, roomID, eventID); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// SaveAccountData saves new account data for a given user and a given room.
|
||||
// If the account data is not specific to a room, the room ID should be an empty string
|
||||
// If an account data already exists for a given set (user, room, data type), it will
|
||||
|
|
|
|||
|
|
@ -1,159 +0,0 @@
|
|||
// Copyright 2017 Vector Creations Ltd
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package sqlite3
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"strings"
|
||||
|
||||
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
)
|
||||
|
||||
const membershipSchema = `
|
||||
-- Stores data about users memberships to rooms.
|
||||
CREATE TABLE IF NOT EXISTS account_memberships (
|
||||
-- The Matrix user ID localpart for the member
|
||||
localpart TEXT NOT NULL,
|
||||
-- The room this user is a member of
|
||||
room_id TEXT NOT NULL,
|
||||
-- The ID of the join membership event
|
||||
event_id TEXT NOT NULL,
|
||||
|
||||
-- A user can only be member of a room once
|
||||
PRIMARY KEY (localpart, room_id),
|
||||
|
||||
UNIQUE (event_id)
|
||||
);
|
||||
`
|
||||
|
||||
const insertMembershipSQL = `
|
||||
INSERT INTO account_memberships(localpart, room_id, event_id) VALUES ($1, $2, $3)
|
||||
ON CONFLICT (localpart, room_id) DO UPDATE SET event_id = EXCLUDED.event_id
|
||||
`
|
||||
|
||||
const selectMembershipsByLocalpartSQL = "" +
|
||||
"SELECT room_id, event_id FROM account_memberships WHERE localpart = $1"
|
||||
|
||||
const selectMembershipInRoomByLocalpartSQL = "" +
|
||||
"SELECT event_id FROM account_memberships WHERE localpart = $1 AND room_id = $2"
|
||||
|
||||
const selectRoomIDsByLocalPartSQL = "" +
|
||||
"SELECT room_id FROM account_memberships WHERE localpart = $1"
|
||||
|
||||
const deleteMembershipsByEventIDsSQL = "" +
|
||||
"DELETE FROM account_memberships WHERE event_id IN ($1)"
|
||||
|
||||
type membershipStatements struct {
|
||||
insertMembershipStmt *sql.Stmt
|
||||
selectMembershipInRoomByLocalpartStmt *sql.Stmt
|
||||
selectMembershipsByLocalpartStmt *sql.Stmt
|
||||
selectRoomIDsByLocalPartStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func (s *membershipStatements) prepare(db *sql.DB) (err error) {
|
||||
_, err = db.Exec(membershipSchema)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if s.insertMembershipStmt, err = db.Prepare(insertMembershipSQL); err != nil {
|
||||
return
|
||||
}
|
||||
if s.selectMembershipInRoomByLocalpartStmt, err = db.Prepare(selectMembershipInRoomByLocalpartSQL); err != nil {
|
||||
return
|
||||
}
|
||||
if s.selectMembershipsByLocalpartStmt, err = db.Prepare(selectMembershipsByLocalpartSQL); err != nil {
|
||||
return
|
||||
}
|
||||
if s.selectRoomIDsByLocalPartStmt, err = db.Prepare(selectRoomIDsByLocalPartSQL); err != nil {
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (s *membershipStatements) insertMembership(
|
||||
ctx context.Context, txn *sql.Tx, localpart, roomID, eventID string,
|
||||
) (err error) {
|
||||
stmt := txn.Stmt(s.insertMembershipStmt)
|
||||
_, err = stmt.ExecContext(ctx, localpart, roomID, eventID)
|
||||
return
|
||||
}
|
||||
|
||||
func (s *membershipStatements) deleteMembershipsByEventIDs(
|
||||
ctx context.Context, txn *sql.Tx, eventIDs []string,
|
||||
) (err error) {
|
||||
sqlStr := strings.Replace(deleteMembershipsByEventIDsSQL, "($1)", sqlutil.QueryVariadic(len(eventIDs)), 1)
|
||||
iEventIDs := make([]interface{}, len(eventIDs))
|
||||
for i, e := range eventIDs {
|
||||
iEventIDs[i] = e
|
||||
}
|
||||
_, err = txn.ExecContext(ctx, sqlStr, iEventIDs...)
|
||||
return
|
||||
}
|
||||
|
||||
func (s *membershipStatements) selectMembershipInRoomByLocalpart(
|
||||
ctx context.Context, localpart, roomID string,
|
||||
) (authtypes.Membership, error) {
|
||||
membership := authtypes.Membership{Localpart: localpart, RoomID: roomID}
|
||||
stmt := s.selectMembershipInRoomByLocalpartStmt
|
||||
err := stmt.QueryRowContext(ctx, localpart, roomID).Scan(&membership.EventID)
|
||||
|
||||
return membership, err
|
||||
}
|
||||
|
||||
func (s *membershipStatements) selectMembershipsByLocalpart(
|
||||
ctx context.Context, localpart string,
|
||||
) (memberships []authtypes.Membership, err error) {
|
||||
stmt := s.selectMembershipsByLocalpartStmt
|
||||
rows, err := stmt.QueryContext(ctx, localpart)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
memberships = []authtypes.Membership{}
|
||||
|
||||
defer internal.CloseAndLogIfError(ctx, rows, "selectMembershipsByLocalpart: rows.close() failed")
|
||||
for rows.Next() {
|
||||
var m authtypes.Membership
|
||||
m.Localpart = localpart
|
||||
if err := rows.Scan(&m.RoomID, &m.EventID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
memberships = append(memberships, m)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
func (s *membershipStatements) selectRoomIDsByLocalPart(
|
||||
ctx context.Context, localPart string,
|
||||
) ([]string, error) {
|
||||
stmt := s.selectRoomIDsByLocalPartStmt
|
||||
rows, err := stmt.QueryContext(ctx, localPart)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
roomIDs := []string{}
|
||||
defer rows.Close() // nolint: errcheck
|
||||
for rows.Next() {
|
||||
var roomID string
|
||||
if err = rows.Scan(&roomID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
roomIDs = append(roomIDs, roomID)
|
||||
}
|
||||
return roomIDs, rows.Err()
|
||||
}
|
||||
|
|
@ -36,7 +36,6 @@ type Database struct {
|
|||
sqlutil.PartitionOffsetStatements
|
||||
accounts accountsStatements
|
||||
profiles profilesStatements
|
||||
memberships membershipStatements
|
||||
accountDatas accountDataStatements
|
||||
threepids threepidStatements
|
||||
serverName gomatrixserverlib.ServerName
|
||||
|
|
@ -67,10 +66,6 @@ func NewDatabase(dataSourceName string, serverName gomatrixserverlib.ServerName)
|
|||
if err = p.prepare(db); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
m := membershipStatements{}
|
||||
if err = m.prepare(db); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ac := accountDataStatements{}
|
||||
if err = ac.prepare(db); err != nil {
|
||||
return nil, err
|
||||
|
|
@ -79,7 +74,7 @@ func NewDatabase(dataSourceName string, serverName gomatrixserverlib.ServerName)
|
|||
if err = t.prepare(db); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &Database{db, partitions, a, p, m, ac, t, serverName, sync.Mutex{}}, nil
|
||||
return &Database{db, partitions, a, p, ac, t, serverName, sync.Mutex{}}, nil
|
||||
}
|
||||
|
||||
// GetAccountByPassword returns the account associated with the given localpart and password.
|
||||
|
|
@ -193,112 +188,6 @@ func (d *Database) createAccount(
|
|||
return d.accounts.insertAccount(ctx, txn, localpart, hash, appserviceID)
|
||||
}
|
||||
|
||||
// SaveMembership saves the user matching a given localpart as a member of a given
|
||||
// room. It also stores the ID of the membership event.
|
||||
// If a membership already exists between the user and the room, or if the
|
||||
// insert fails, returns the SQL error
|
||||
func (d *Database) saveMembership(
|
||||
ctx context.Context, txn *sql.Tx, localpart, roomID, eventID string,
|
||||
) error {
|
||||
return d.memberships.insertMembership(ctx, txn, localpart, roomID, eventID)
|
||||
}
|
||||
|
||||
// removeMembershipsByEventIDs removes the memberships corresponding to the
|
||||
// `join` membership events IDs in the eventIDs slice.
|
||||
// If the removal fails, or if there is no membership to remove, returns an error
|
||||
func (d *Database) removeMembershipsByEventIDs(
|
||||
ctx context.Context, txn *sql.Tx, eventIDs []string,
|
||||
) error {
|
||||
return d.memberships.deleteMembershipsByEventIDs(ctx, txn, eventIDs)
|
||||
}
|
||||
|
||||
// UpdateMemberships adds the "join" membership events included in a given state
|
||||
// events array, and removes those which ID is included in a given array of events
|
||||
// IDs. All of the process is run in a transaction, which commits only once/if every
|
||||
// insertion and deletion has been successfully processed.
|
||||
// Returns a SQL error if there was an issue with any part of the process
|
||||
func (d *Database) UpdateMemberships(
|
||||
ctx context.Context, eventsToAdd []gomatrixserverlib.Event, idsToRemove []string,
|
||||
) error {
|
||||
return sqlutil.WithTransaction(d.db, func(txn *sql.Tx) error {
|
||||
if err := d.removeMembershipsByEventIDs(ctx, txn, idsToRemove); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, event := range eventsToAdd {
|
||||
if err := d.newMembership(ctx, txn, event); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// GetMembershipInRoomByLocalpart returns the membership for an user
|
||||
// matching the given localpart if he is a member of the room matching roomID,
|
||||
// if not sql.ErrNoRows is returned.
|
||||
// If there was an issue during the retrieval, returns the SQL error
|
||||
func (d *Database) GetMembershipInRoomByLocalpart(
|
||||
ctx context.Context, localpart, roomID string,
|
||||
) (authtypes.Membership, error) {
|
||||
return d.memberships.selectMembershipInRoomByLocalpart(ctx, localpart, roomID)
|
||||
}
|
||||
|
||||
// GetMembershipsByLocalpart returns an array containing the memberships for all
|
||||
// the rooms a user matching a given localpart is a member of
|
||||
// If no membership match the given localpart, returns an empty array
|
||||
// If there was an issue during the retrieval, returns the SQL error
|
||||
func (d *Database) GetMembershipsByLocalpart(
|
||||
ctx context.Context, localpart string,
|
||||
) (memberships []authtypes.Membership, err error) {
|
||||
return d.memberships.selectMembershipsByLocalpart(ctx, localpart)
|
||||
}
|
||||
|
||||
// GetRoomIDsByLocalPart returns an array containing the room ids of all
|
||||
// the rooms a user matching a given localpart is a member of
|
||||
// If no membership match the given localpart, returns an empty array
|
||||
// If there was an issue during the retrieval, returns the SQL error
|
||||
func (d *Database) GetRoomIDsByLocalPart(
|
||||
ctx context.Context, localpart string,
|
||||
) ([]string, error) {
|
||||
return d.memberships.selectRoomIDsByLocalPart(ctx, localpart)
|
||||
}
|
||||
|
||||
// newMembership saves a new membership in the database.
|
||||
// If the event isn't a valid m.room.member event with type `join`, does nothing.
|
||||
// If an error occurred, returns the SQL error
|
||||
func (d *Database) newMembership(
|
||||
ctx context.Context, txn *sql.Tx, ev gomatrixserverlib.Event,
|
||||
) error {
|
||||
if ev.Type() == "m.room.member" && ev.StateKey() != nil {
|
||||
localpart, serverName, err := gomatrixserverlib.SplitID('@', *ev.StateKey())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// We only want state events from local users
|
||||
if string(serverName) != string(d.serverName) {
|
||||
return nil
|
||||
}
|
||||
|
||||
eventID := ev.EventID()
|
||||
roomID := ev.RoomID()
|
||||
membership, err := ev.Membership()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Only "join" membership events can be considered as new memberships
|
||||
if membership == gomatrixserverlib.Join {
|
||||
if err := d.saveMembership(ctx, txn, localpart, roomID, eventID); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// SaveAccountData saves new account data for a given user and a given room.
|
||||
// If the account data is not specific to a room, the room ID should be an empty string
|
||||
// If an account data already exists for a given set (user, room, data type), it will
|
||||
|
|
|
|||
Loading…
Reference in a new issue