mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-10 16:33:11 -06:00
Merge remote-tracking branch 'origin/master' into markjh/invites
This commit is contained in:
commit
749fd213a0
18
.editorconfig
Normal file
18
.editorconfig
Normal file
|
|
@ -0,0 +1,18 @@
|
|||
root = true
|
||||
|
||||
[*]
|
||||
charset = utf-8
|
||||
|
||||
end_of_line = lf
|
||||
insert_final_newline = true
|
||||
trim_trailing_whitespace = true
|
||||
|
||||
[*.go]
|
||||
indent_style = tab
|
||||
indent_size = 4
|
||||
|
||||
[*.md]
|
||||
trim_trailing_whitespace = false
|
||||
|
||||
[*.{yml,yaml}]
|
||||
indent_style = space
|
||||
|
|
@ -126,7 +126,7 @@ so that the readers can avoid querying the room server unnecessarily.
|
|||
* Checks if it has already processed or is processing a request with the
|
||||
same `txnID`.
|
||||
* Checks the signatures for the events.
|
||||
Fetches the ed2519 keys for the event senders if necessary.
|
||||
Fetches the ed25519 keys for the event senders if necessary.
|
||||
* Queries the RoomServer for a copy of the state of the room at each event.
|
||||
* If the RoomServer doesn't know the state of the room at an event then
|
||||
query the state of the room at the event from the remote server using
|
||||
|
|
|
|||
|
|
@ -25,7 +25,7 @@ import (
|
|||
|
||||
const accountsSchema = `
|
||||
-- Stores data about accounts.
|
||||
CREATE TABLE IF NOT EXISTS accounts (
|
||||
CREATE TABLE IF NOT EXISTS account_accounts (
|
||||
-- The Matrix user ID localpart for this account
|
||||
localpart TEXT NOT NULL PRIMARY KEY,
|
||||
-- When this account was first created, as a unix timestamp (ms resolution).
|
||||
|
|
@ -38,13 +38,13 @@ CREATE TABLE IF NOT EXISTS accounts (
|
|||
`
|
||||
|
||||
const insertAccountSQL = "" +
|
||||
"INSERT INTO accounts(localpart, created_ts, password_hash) VALUES ($1, $2, $3)"
|
||||
"INSERT INTO account_accounts(localpart, created_ts, password_hash) VALUES ($1, $2, $3)"
|
||||
|
||||
const selectAccountByLocalpartSQL = "" +
|
||||
"SELECT localpart FROM accounts WHERE localpart = $1"
|
||||
"SELECT localpart FROM account_accounts WHERE localpart = $1"
|
||||
|
||||
const selectPasswordHashSQL = "" +
|
||||
"SELECT password_hash FROM accounts WHERE localpart = $1"
|
||||
"SELECT password_hash FROM account_accounts WHERE localpart = $1"
|
||||
|
||||
// TODO: Update password
|
||||
|
||||
|
|
|
|||
|
|
@ -23,7 +23,7 @@ import (
|
|||
|
||||
const membershipSchema = `
|
||||
-- Stores data about users memberships to rooms.
|
||||
CREATE TABLE IF NOT EXISTS memberships (
|
||||
CREATE TABLE IF NOT EXISTS account_memberships (
|
||||
-- The Matrix user ID localpart for the member
|
||||
localpart TEXT NOT NULL,
|
||||
-- The room this user is a member of
|
||||
|
|
@ -36,25 +36,25 @@ CREATE TABLE IF NOT EXISTS memberships (
|
|||
);
|
||||
|
||||
-- Use index to process deletion by ID more efficiently
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS membership_event_id ON memberships(event_id);
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS account_membership_event_id ON account_memberships(event_id);
|
||||
`
|
||||
|
||||
const insertMembershipSQL = `
|
||||
INSERT INTO memberships(localpart, room_id, event_id) VALUES ($1, $2, $3)
|
||||
INSERT INTO account_memberships(localpart, room_id, event_id) VALUES ($1, $2, $3)
|
||||
ON CONFLICT (localpart, room_id) DO UPDATE SET event_id = EXCLUDED.event_id
|
||||
`
|
||||
|
||||
const selectMembershipSQL = "" +
|
||||
"SELECT * from memberships WHERE localpart = $1 AND room_id = $2"
|
||||
"SELECT * from account_memberships WHERE localpart = $1 AND room_id = $2"
|
||||
|
||||
const selectMembershipsByLocalpartSQL = "" +
|
||||
"SELECT room_id, event_id FROM memberships WHERE localpart = $1"
|
||||
"SELECT room_id, event_id FROM account_memberships WHERE localpart = $1"
|
||||
|
||||
const deleteMembershipsByEventIDsSQL = "" +
|
||||
"DELETE FROM memberships WHERE event_id = ANY($1)"
|
||||
"DELETE FROM account_memberships WHERE event_id = ANY($1)"
|
||||
|
||||
const updateMembershipByEventIDSQL = "" +
|
||||
"UPDATE memberships SET event_id = $2 WHERE event_id = $1"
|
||||
"UPDATE account_memberships SET event_id = $2 WHERE event_id = $1"
|
||||
|
||||
type membershipStatements struct {
|
||||
deleteMembershipsByEventIDsStmt *sql.Stmt
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@ import (
|
|||
|
||||
const profilesSchema = `
|
||||
-- Stores data about accounts profiles.
|
||||
CREATE TABLE IF NOT EXISTS profiles (
|
||||
CREATE TABLE IF NOT EXISTS account_profiles (
|
||||
-- The Matrix user ID localpart for this account
|
||||
localpart TEXT NOT NULL PRIMARY KEY,
|
||||
-- The display name for this account
|
||||
|
|
@ -33,16 +33,16 @@ CREATE TABLE IF NOT EXISTS profiles (
|
|||
`
|
||||
|
||||
const insertProfileSQL = "" +
|
||||
"INSERT INTO profiles(localpart, display_name, avatar_url) VALUES ($1, $2, $3)"
|
||||
"INSERT INTO account_profiles(localpart, display_name, avatar_url) VALUES ($1, $2, $3)"
|
||||
|
||||
const selectProfileByLocalpartSQL = "" +
|
||||
"SELECT localpart, display_name, avatar_url FROM profiles WHERE localpart = $1"
|
||||
"SELECT localpart, display_name, avatar_url FROM account_profiles WHERE localpart = $1"
|
||||
|
||||
const setAvatarURLSQL = "" +
|
||||
"UPDATE profiles SET avatar_url = $1 WHERE localpart = $2"
|
||||
"UPDATE account_profiles SET avatar_url = $1 WHERE localpart = $2"
|
||||
|
||||
const setDisplayNameSQL = "" +
|
||||
"UPDATE profiles SET display_name = $1 WHERE localpart = $2"
|
||||
"UPDATE account_profiles SET display_name = $1 WHERE localpart = $2"
|
||||
|
||||
type profilesStatements struct {
|
||||
insertProfileStmt *sql.Stmt
|
||||
|
|
|
|||
|
|
@ -44,7 +44,7 @@ func NewDatabase(dataSourceName string, serverName gomatrixserverlib.ServerName)
|
|||
return nil, err
|
||||
}
|
||||
partitions := common.PartitionOffsetStatements{}
|
||||
if err = partitions.Prepare(db); err != nil {
|
||||
if err = partitions.Prepare(db, "account"); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
a := accountsStatements{}
|
||||
|
|
|
|||
|
|
@ -25,7 +25,7 @@ import (
|
|||
|
||||
const devicesSchema = `
|
||||
-- Stores data about devices.
|
||||
CREATE TABLE IF NOT EXISTS devices (
|
||||
CREATE TABLE IF NOT EXISTS device_devices (
|
||||
-- The access token granted to this device. This has to be the primary key
|
||||
-- so we can distinguish which device is making a given request.
|
||||
access_token TEXT NOT NULL PRIMARY KEY,
|
||||
|
|
@ -42,17 +42,17 @@ CREATE TABLE IF NOT EXISTS devices (
|
|||
);
|
||||
|
||||
-- Device IDs must be unique for a given user.
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS localpart_id_idx ON devices(localpart, device_id);
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS device_localpart_id_idx ON device_devices(localpart, device_id);
|
||||
`
|
||||
|
||||
const insertDeviceSQL = "" +
|
||||
"INSERT INTO devices(device_id, localpart, access_token, created_ts) VALUES ($1, $2, $3, $4)"
|
||||
"INSERT INTO device_devices(device_id, localpart, access_token, created_ts) VALUES ($1, $2, $3, $4)"
|
||||
|
||||
const selectDeviceByTokenSQL = "" +
|
||||
"SELECT device_id, localpart FROM devices WHERE access_token = $1"
|
||||
"SELECT device_id, localpart FROM device_devices WHERE access_token = $1"
|
||||
|
||||
const deleteDeviceSQL = "" +
|
||||
"DELETE FROM devices WHERE device_id = $1 AND localpart = $2"
|
||||
"DELETE FROM device_devices WHERE device_id = $1 AND localpart = $2"
|
||||
|
||||
// TODO: List devices?
|
||||
|
||||
|
|
|
|||
|
|
@ -25,6 +25,7 @@ type MemberContent struct {
|
|||
Membership string `json:"membership"`
|
||||
DisplayName string `json:"displayname,omitempty"`
|
||||
AvatarURL string `json:"avatar_url,omitempty"`
|
||||
Reason string `json:"reason,omitempty"`
|
||||
// TODO: ThirdPartyInvite string `json:"third_party_invite,omitempty"`
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,87 @@
|
|||
// Copyright 2017 Vector Creations Ltd
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package events
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/dendrite/common/config"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/util"
|
||||
)
|
||||
|
||||
// ErrRoomNoExists is returned when trying to lookup the state of a room that
|
||||
// doesn't exist
|
||||
var ErrRoomNoExists = errors.New("Room does not exist")
|
||||
|
||||
// BuildEvent builds a Matrix event using the event builder and roomserver query
|
||||
// API client provided. If also fills roomserver query API response (if provided)
|
||||
// in case the function calling FillBuilder needs to use it.
|
||||
// Returns ErrRoomNoExists if the state of the room could not be retrieved because
|
||||
// the room doesn't exist
|
||||
// Returns an error if something else went wrong
|
||||
func BuildEvent(
|
||||
builder *gomatrixserverlib.EventBuilder, cfg config.Dendrite,
|
||||
queryAPI api.RoomserverQueryAPI, queryRes *api.QueryLatestEventsAndStateResponse,
|
||||
) (*gomatrixserverlib.Event, error) {
|
||||
eventsNeeded, err := gomatrixserverlib.StateNeededForEventBuilder(builder)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Ask the roomserver for information about this room
|
||||
queryReq := api.QueryLatestEventsAndStateRequest{
|
||||
RoomID: builder.RoomID,
|
||||
StateToFetch: eventsNeeded.Tuples(),
|
||||
}
|
||||
if queryRes == nil {
|
||||
queryRes = &api.QueryLatestEventsAndStateResponse{}
|
||||
}
|
||||
if queryErr := queryAPI.QueryLatestEventsAndState(&queryReq, queryRes); queryErr != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !queryRes.RoomExists {
|
||||
return nil, ErrRoomNoExists
|
||||
}
|
||||
|
||||
builder.Depth = queryRes.Depth
|
||||
builder.PrevEvents = queryRes.LatestEvents
|
||||
|
||||
authEvents := gomatrixserverlib.NewAuthEvents(nil)
|
||||
|
||||
for i := range queryRes.StateEvents {
|
||||
authEvents.AddEvent(&queryRes.StateEvents[i])
|
||||
}
|
||||
|
||||
refs, err := eventsNeeded.AuthEventReferences(&authEvents)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
builder.AuthEvents = refs
|
||||
|
||||
eventID := fmt.Sprintf("$%s:%s", util.RandomString(16), cfg.Matrix.ServerName)
|
||||
now := time.Now()
|
||||
event, err := builder.Build(eventID, now, cfg.Matrix.ServerName, cfg.Matrix.KeyID, cfg.Matrix.PrivateKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &event, nil
|
||||
}
|
||||
|
|
@ -25,9 +25,9 @@ type RoomserverProducer struct {
|
|||
}
|
||||
|
||||
// NewRoomserverProducer creates a new RoomserverProducer
|
||||
func NewRoomserverProducer(roomserverURI string) *RoomserverProducer {
|
||||
func NewRoomserverProducer(inputAPI api.RoomserverInputAPI) *RoomserverProducer {
|
||||
return &RoomserverProducer{
|
||||
InputAPI: api.NewRoomserverInputAPIHTTP(roomserverURI, nil),
|
||||
InputAPI: inputAPI,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -15,9 +15,7 @@
|
|||
package readers
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
||||
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
|
||||
|
|
@ -284,44 +282,12 @@ func buildMembershipEvents(
|
|||
return nil, err
|
||||
}
|
||||
|
||||
eventsNeeded, err := gomatrixserverlib.StateNeededForEventBuilder(&builder)
|
||||
event, err := events.BuildEvent(&builder, *cfg, queryAPI, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Ask the roomserver for information about this room
|
||||
queryReq := api.QueryLatestEventsAndStateRequest{
|
||||
RoomID: membership.RoomID,
|
||||
StateToFetch: eventsNeeded.Tuples(),
|
||||
}
|
||||
var queryRes api.QueryLatestEventsAndStateResponse
|
||||
if queryErr := queryAPI.QueryLatestEventsAndState(&queryReq, &queryRes); queryErr != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
builder.Depth = queryRes.Depth
|
||||
builder.PrevEvents = queryRes.LatestEvents
|
||||
|
||||
authEvents := gomatrixserverlib.NewAuthEvents(nil)
|
||||
|
||||
for i := range queryRes.StateEvents {
|
||||
authEvents.AddEvent(&queryRes.StateEvents[i])
|
||||
}
|
||||
|
||||
refs, err := eventsNeeded.AuthEventReferences(&authEvents)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
builder.AuthEvents = refs
|
||||
|
||||
eventID := fmt.Sprintf("$%s:%s", util.RandomString(16), cfg.Matrix.ServerName)
|
||||
now := time.Now()
|
||||
event, err := builder.Build(eventID, now, cfg.Matrix.ServerName, cfg.Matrix.KeyID, cfg.Matrix.PrivateKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
evs = append(evs, event)
|
||||
evs = append(evs, *event)
|
||||
}
|
||||
|
||||
return evs, nil
|
||||
|
|
|
|||
|
|
@ -31,7 +31,6 @@ import (
|
|||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/util"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
const pathPrefixR0 = "/_matrix/client/r0"
|
||||
|
|
@ -40,7 +39,7 @@ const pathPrefixUnstable = "/_matrix/client/unstable"
|
|||
// Setup registers HTTP handlers with the given ServeMux. It also supplies the given http.Client
|
||||
// to clients which need to make outbound HTTP requests.
|
||||
func Setup(
|
||||
servMux *http.ServeMux, httpClient *http.Client, cfg config.Dendrite,
|
||||
apiMux *mux.Router, httpClient *http.Client, cfg config.Dendrite,
|
||||
producer *producers.RoomserverProducer, queryAPI api.RoomserverQueryAPI,
|
||||
aliasAPI api.RoomserverAliasAPI,
|
||||
accountDB *accounts.Database,
|
||||
|
|
@ -50,7 +49,6 @@ func Setup(
|
|||
userUpdateProducer *producers.UserUpdateProducer,
|
||||
syncProducer *producers.SyncAPIProducer,
|
||||
) {
|
||||
apiMux := mux.NewRouter()
|
||||
|
||||
apiMux.Handle("/_matrix/client/versions",
|
||||
common.MakeAPI("versions", func(req *http.Request) util.JSONResponse {
|
||||
|
|
@ -83,6 +81,12 @@ func Setup(
|
|||
)
|
||||
}),
|
||||
)
|
||||
r0mux.Handle("/rooms/{roomID}/{membership:(?:join|kick|ban|unban|leave|invite)}",
|
||||
common.MakeAuthAPI("membership", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
||||
vars := mux.Vars(req)
|
||||
return writers.SendMembership(req, accountDB, device, vars["roomID"], vars["membership"], cfg, queryAPI, producer)
|
||||
}),
|
||||
).Methods("POST", "OPTIONS")
|
||||
r0mux.Handle("/rooms/{roomID}/send/{eventType}/{txnID}",
|
||||
common.MakeAuthAPI("send_message", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
||||
vars := mux.Vars(req)
|
||||
|
|
@ -316,7 +320,4 @@ func Setup(
|
|||
return util.JSONResponse{Code: 200, JSON: struct{}{}}
|
||||
}),
|
||||
)
|
||||
|
||||
servMux.Handle("/metrics", prometheus.Handler())
|
||||
servMux.Handle("/api/", http.StripPrefix("/api", apiMux))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,139 @@
|
|||
// Copyright 2017 Vector Creations Ltd
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package writers
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
||||
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
|
||||
"github.com/matrix-org/dendrite/clientapi/events"
|
||||
"github.com/matrix-org/dendrite/clientapi/httputil"
|
||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||
"github.com/matrix-org/dendrite/clientapi/producers"
|
||||
"github.com/matrix-org/dendrite/common/config"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
|
||||
"github.com/matrix-org/util"
|
||||
)
|
||||
|
||||
// SendMembership implements PUT /rooms/{roomID}/(join|kick|ban|unban|leave|invite)
|
||||
// by building a m.room.member event then sending it to the room server
|
||||
func SendMembership(
|
||||
req *http.Request, accountDB *accounts.Database, device *authtypes.Device,
|
||||
roomID string, membership string, cfg config.Dendrite,
|
||||
queryAPI api.RoomserverQueryAPI, producer *producers.RoomserverProducer,
|
||||
) util.JSONResponse {
|
||||
stateKey, reason, reqErr := getMembershipStateKey(req, device, membership)
|
||||
if reqErr != nil {
|
||||
return *reqErr
|
||||
}
|
||||
|
||||
localpart, serverName, err := gomatrixserverlib.SplitID('@', stateKey)
|
||||
if err != nil {
|
||||
return httputil.LogThenError(req, err)
|
||||
}
|
||||
|
||||
var profile *authtypes.Profile
|
||||
if serverName == cfg.Matrix.ServerName {
|
||||
profile, err = accountDB.GetProfileByLocalpart(localpart)
|
||||
if err != nil {
|
||||
return httputil.LogThenError(req, err)
|
||||
}
|
||||
} else {
|
||||
profile = &authtypes.Profile{}
|
||||
}
|
||||
|
||||
builder := gomatrixserverlib.EventBuilder{
|
||||
Sender: device.UserID,
|
||||
RoomID: roomID,
|
||||
Type: "m.room.member",
|
||||
StateKey: &stateKey,
|
||||
}
|
||||
|
||||
// "unban" or "kick" isn't a valid membership value, change it to "leave"
|
||||
if membership == "unban" || membership == "kick" {
|
||||
membership = "leave"
|
||||
}
|
||||
|
||||
content := events.MemberContent{
|
||||
Membership: membership,
|
||||
DisplayName: profile.DisplayName,
|
||||
AvatarURL: profile.AvatarURL,
|
||||
Reason: reason,
|
||||
}
|
||||
|
||||
if err = builder.SetContent(content); err != nil {
|
||||
return httputil.LogThenError(req, err)
|
||||
}
|
||||
|
||||
event, err := events.BuildEvent(&builder, cfg, queryAPI, nil)
|
||||
if err == events.ErrRoomNoExists {
|
||||
return util.JSONResponse{
|
||||
Code: 404,
|
||||
JSON: jsonerror.NotFound(err.Error()),
|
||||
}
|
||||
} else if err != nil {
|
||||
return httputil.LogThenError(req, err)
|
||||
}
|
||||
|
||||
if err := producer.SendEvents([]gomatrixserverlib.Event{*event}, cfg.Matrix.ServerName); err != nil {
|
||||
return httputil.LogThenError(req, err)
|
||||
}
|
||||
|
||||
return util.JSONResponse{
|
||||
Code: 200,
|
||||
JSON: struct{}{},
|
||||
}
|
||||
}
|
||||
|
||||
// getMembershipStateKey extracts the target user ID of a membership change.
|
||||
// For "join" and "leave" this will be the ID of the user making the change.
|
||||
// For "ban", "unban", "kick" and "invite" the target user ID will be in the JSON request body.
|
||||
// In the latter case, if there was an issue retrieving the user ID from the request body,
|
||||
// returns a JSONResponse with a corresponding error code and message.
|
||||
func getMembershipStateKey(
|
||||
req *http.Request, device *authtypes.Device, membership string,
|
||||
) (stateKey string, reason string, response *util.JSONResponse) {
|
||||
if membership == "ban" || membership == "unban" || membership == "kick" || membership == "invite" {
|
||||
// If we're in this case, the state key is contained in the request body,
|
||||
// possibly along with a reason (for "kick" and "ban") so we need to parse
|
||||
// it
|
||||
var requestBody struct {
|
||||
UserID string `json:"user_id"`
|
||||
Reason string `json:"reason"`
|
||||
}
|
||||
|
||||
if reqErr := httputil.UnmarshalJSONRequest(req, &requestBody); reqErr != nil {
|
||||
response = reqErr
|
||||
return
|
||||
}
|
||||
if requestBody.UserID == "" {
|
||||
response = &util.JSONResponse{
|
||||
Code: 400,
|
||||
JSON: jsonerror.BadJSON("'user_id' must be supplied."),
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
stateKey = requestBody.UserID
|
||||
reason = requestBody.Reason
|
||||
} else {
|
||||
stateKey = device.UserID
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
|
@ -17,10 +17,8 @@ package writers
|
|||
import (
|
||||
"net/http"
|
||||
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
||||
"github.com/matrix-org/dendrite/clientapi/events"
|
||||
"github.com/matrix-org/dendrite/clientapi/httputil"
|
||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||
"github.com/matrix-org/dendrite/clientapi/producers"
|
||||
|
|
@ -64,41 +62,14 @@ func SendEvent(
|
|||
}
|
||||
builder.SetContent(r)
|
||||
|
||||
// work out what will be required in order to send this event
|
||||
needed, err := gomatrixserverlib.StateNeededForEventBuilder(&builder)
|
||||
if err != nil {
|
||||
return httputil.LogThenError(req, err)
|
||||
}
|
||||
|
||||
// Ask the roomserver for information about this room
|
||||
queryReq := api.QueryLatestEventsAndStateRequest{
|
||||
RoomID: roomID,
|
||||
StateToFetch: needed.Tuples(),
|
||||
}
|
||||
var queryRes api.QueryLatestEventsAndStateResponse
|
||||
if queryErr := queryAPI.QueryLatestEventsAndState(&queryReq, &queryRes); queryErr != nil {
|
||||
return httputil.LogThenError(req, queryErr)
|
||||
}
|
||||
if !queryRes.RoomExists {
|
||||
e, err := events.BuildEvent(&builder, cfg, queryAPI, &queryRes)
|
||||
if err == events.ErrRoomNoExists {
|
||||
return util.JSONResponse{
|
||||
Code: 404,
|
||||
JSON: jsonerror.NotFound("Room does not exist"),
|
||||
}
|
||||
}
|
||||
|
||||
// set the fields we previously couldn't do and build the event
|
||||
builder.PrevEvents = queryRes.LatestEvents // the current events will be the prev events of the new event
|
||||
var refs []gomatrixserverlib.EventReference
|
||||
for _, e := range queryRes.StateEvents {
|
||||
refs = append(refs, e.EventReference())
|
||||
}
|
||||
builder.AuthEvents = refs
|
||||
builder.Depth = queryRes.Depth
|
||||
eventID := fmt.Sprintf("$%s:%s", util.RandomString(16), cfg.Matrix.ServerName)
|
||||
e, err := builder.Build(
|
||||
eventID, time.Now(), cfg.Matrix.ServerName, cfg.Matrix.KeyID, cfg.Matrix.PrivateKey,
|
||||
)
|
||||
if err != nil {
|
||||
} else if err != nil {
|
||||
return httputil.LogThenError(req, err)
|
||||
}
|
||||
|
||||
|
|
@ -108,7 +79,7 @@ func SendEvent(
|
|||
stateEvents[i] = &queryRes.StateEvents[i]
|
||||
}
|
||||
provider := gomatrixserverlib.NewAuthEvents(stateEvents)
|
||||
if err = gomatrixserverlib.Allowed(e, &provider); err != nil {
|
||||
if err = gomatrixserverlib.Allowed(*e, &provider); err != nil {
|
||||
return util.JSONResponse{
|
||||
Code: 403,
|
||||
JSON: jsonerror.Forbidden(err.Error()), // TODO: Is this error string comprehensible to the client?
|
||||
|
|
@ -116,7 +87,7 @@ func SendEvent(
|
|||
}
|
||||
|
||||
// pass the new event to the roomserver
|
||||
if err := producer.SendEvents([]gomatrixserverlib.Event{e}, cfg.Matrix.ServerName); err != nil {
|
||||
if err := producer.SendEvents([]gomatrixserverlib.Event{*e}, cfg.Matrix.ServerName); err != nil {
|
||||
return httputil.LogThenError(req, err)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ import (
|
|||
"net/http"
|
||||
"os"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
|
||||
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
|
||||
"github.com/matrix-org/dendrite/clientapi/consumers"
|
||||
|
|
@ -53,8 +54,9 @@ func main() {
|
|||
|
||||
queryAPI := api.NewRoomserverQueryAPIHTTP(cfg.RoomServerURL(), nil)
|
||||
aliasAPI := api.NewRoomserverAliasAPIHTTP(cfg.RoomServerURL(), nil)
|
||||
inputAPI := api.NewRoomserverInputAPIHTTP(cfg.RoomServerURL(), nil)
|
||||
|
||||
roomserverProducer := producers.NewRoomserverProducer(cfg.RoomServerURL())
|
||||
roomserverProducer := producers.NewRoomserverProducer(inputAPI)
|
||||
userUpdateProducer, err := producers.NewUserUpdateProducer(
|
||||
cfg.Kafka.Addresses, string(cfg.Kafka.Topics.UserUpdates),
|
||||
)
|
||||
|
|
@ -102,10 +104,14 @@ func main() {
|
|||
}
|
||||
|
||||
log.Info("Starting client API server on ", cfg.Listen.ClientAPI)
|
||||
|
||||
api := mux.NewRouter()
|
||||
routing.Setup(
|
||||
http.DefaultServeMux, http.DefaultClient, *cfg, roomserverProducer,
|
||||
api, http.DefaultClient, *cfg, roomserverProducer,
|
||||
queryAPI, aliasAPI, accountDB, deviceDB, federation, keyRing,
|
||||
userUpdateProducer, syncProducer,
|
||||
)
|
||||
common.SetupHTTPAPI(http.DefaultServeMux, api)
|
||||
|
||||
log.Fatal(http.ListenAndServe(string(cfg.Listen.ClientAPI), nil))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ import (
|
|||
"net/http"
|
||||
"os"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/matrix-org/dendrite/clientapi/producers"
|
||||
"github.com/matrix-org/dendrite/common"
|
||||
"github.com/matrix-org/dendrite/common/config"
|
||||
|
|
@ -66,8 +67,9 @@ func main() {
|
|||
}
|
||||
|
||||
queryAPI := api.NewRoomserverQueryAPIHTTP(cfg.RoomServerURL(), nil)
|
||||
inputAPI := api.NewRoomserverInputAPIHTTP(cfg.RoomServerURL(), nil)
|
||||
|
||||
roomserverProducer := producers.NewRoomserverProducer(cfg.RoomServerURL())
|
||||
roomserverProducer := producers.NewRoomserverProducer(inputAPI)
|
||||
|
||||
if err != nil {
|
||||
log.Panicf("Failed to setup kafka producers(%s): %s", cfg.Kafka.Addresses, err)
|
||||
|
|
@ -75,6 +77,9 @@ func main() {
|
|||
|
||||
log.Info("Starting federation API server on ", cfg.Listen.FederationAPI)
|
||||
|
||||
routing.Setup(http.DefaultServeMux, *cfg, queryAPI, roomserverProducer, keyRing, federation)
|
||||
api := mux.NewRouter()
|
||||
routing.Setup(api, *cfg, queryAPI, roomserverProducer, keyRing, federation)
|
||||
common.SetupHTTPAPI(http.DefaultServeMux, api)
|
||||
|
||||
log.Fatal(http.ListenAndServe(string(cfg.Listen.FederationAPI), nil))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,13 +19,13 @@ import (
|
|||
"net/http"
|
||||
"os"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/matrix-org/dendrite/common"
|
||||
"github.com/matrix-org/dendrite/common/config"
|
||||
"github.com/matrix-org/dendrite/federationsender/consumers"
|
||||
"github.com/matrix-org/dendrite/federationsender/queue"
|
||||
"github.com/matrix-org/dendrite/federationsender/storage"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
|
||||
log "github.com/Sirupsen/logrus"
|
||||
)
|
||||
|
|
@ -66,7 +66,8 @@ func main() {
|
|||
log.WithError(err).Panicf("startup: failed to start room server consumer")
|
||||
}
|
||||
|
||||
http.DefaultServeMux.Handle("/metrics", prometheus.Handler())
|
||||
api := mux.NewRouter()
|
||||
common.SetupHTTPAPI(http.DefaultServeMux, api)
|
||||
|
||||
if err := http.ListenAndServe(string(cfg.Listen.FederationSender), nil); err != nil {
|
||||
panic(err)
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ import (
|
|||
"net/http"
|
||||
"os"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/matrix-org/dendrite/common"
|
||||
"github.com/matrix-org/dendrite/common/config"
|
||||
"github.com/matrix-org/dendrite/mediaapi/routing"
|
||||
|
|
@ -52,6 +53,9 @@ func main() {
|
|||
|
||||
log.Info("Starting media API server on ", cfg.Listen.MediaAPI)
|
||||
|
||||
routing.Setup(http.DefaultServeMux, http.DefaultClient, cfg, db)
|
||||
api := mux.NewRouter()
|
||||
routing.Setup(api, http.DefaultClient, cfg, db)
|
||||
common.SetupHTTPAPI(http.DefaultServeMux, api)
|
||||
|
||||
log.Fatal(http.ListenAndServe(string(cfg.Listen.MediaAPI), nil))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,298 @@
|
|||
// Copyright 2017 Vector Creations Ltd
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"net/http"
|
||||
"os"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
|
||||
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
|
||||
"github.com/matrix-org/dendrite/clientapi/producers"
|
||||
"github.com/matrix-org/dendrite/common"
|
||||
"github.com/matrix-org/dendrite/common/config"
|
||||
"github.com/matrix-org/dendrite/common/keydb"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
|
||||
mediaapi_routing "github.com/matrix-org/dendrite/mediaapi/routing"
|
||||
mediaapi_storage "github.com/matrix-org/dendrite/mediaapi/storage"
|
||||
|
||||
roomserver_alias "github.com/matrix-org/dendrite/roomserver/alias"
|
||||
roomserver_input "github.com/matrix-org/dendrite/roomserver/input"
|
||||
roomserver_query "github.com/matrix-org/dendrite/roomserver/query"
|
||||
roomserver_storage "github.com/matrix-org/dendrite/roomserver/storage"
|
||||
|
||||
clientapi_consumers "github.com/matrix-org/dendrite/clientapi/consumers"
|
||||
clientapi_routing "github.com/matrix-org/dendrite/clientapi/routing"
|
||||
|
||||
syncapi_consumers "github.com/matrix-org/dendrite/syncapi/consumers"
|
||||
syncapi_routing "github.com/matrix-org/dendrite/syncapi/routing"
|
||||
syncapi_storage "github.com/matrix-org/dendrite/syncapi/storage"
|
||||
syncapi_sync "github.com/matrix-org/dendrite/syncapi/sync"
|
||||
syncapi_types "github.com/matrix-org/dendrite/syncapi/types"
|
||||
|
||||
federationapi_routing "github.com/matrix-org/dendrite/federationapi/routing"
|
||||
|
||||
federationsender_consumers "github.com/matrix-org/dendrite/federationsender/consumers"
|
||||
"github.com/matrix-org/dendrite/federationsender/queue"
|
||||
federationsender_storage "github.com/matrix-org/dendrite/federationsender/storage"
|
||||
|
||||
log "github.com/Sirupsen/logrus"
|
||||
sarama "gopkg.in/Shopify/sarama.v1"
|
||||
)
|
||||
|
||||
var (
|
||||
logDir = os.Getenv("LOG_DIR")
|
||||
configPath = flag.String("config", "dendrite.yaml", "The path to the config file. For more information, see the config file in this repository.")
|
||||
httpBindAddr = flag.String("http-bind-address", ":8008", "The HTTP listening port for the server")
|
||||
httpsBindAddr = flag.String("https-bind-address", ":8448", "The HTTPS listening port for the server")
|
||||
certFile = flag.String("tls-cert", "", "The PEM formatted X509 certificate to use for TLS")
|
||||
keyFile = flag.String("tls-key", "", "The PEM private key to use for TLS")
|
||||
)
|
||||
|
||||
func main() {
|
||||
common.SetupLogging(logDir)
|
||||
|
||||
flag.Parse()
|
||||
|
||||
if *configPath == "" {
|
||||
log.Fatal("--config must be supplied")
|
||||
}
|
||||
cfg, err := config.Load(*configPath)
|
||||
if err != nil {
|
||||
log.Fatalf("Invalid config file: %s", err)
|
||||
}
|
||||
|
||||
m := newMonolith(cfg)
|
||||
m.setupDatabases()
|
||||
m.setupFederation()
|
||||
m.setupRoomServer()
|
||||
m.setupProducers()
|
||||
m.setupNotifiers()
|
||||
m.setupConsumers()
|
||||
m.setupAPIs()
|
||||
|
||||
// Expose the matrix APIs directly rather than putting them under a /api path.
|
||||
go func() {
|
||||
log.Info("Listening on ", *httpBindAddr)
|
||||
log.Fatal(http.ListenAndServe(*httpBindAddr, m.api))
|
||||
}()
|
||||
// Handle HTTPS if certificate and key are provided
|
||||
go func() {
|
||||
if *certFile != "" && *keyFile != "" {
|
||||
log.Info("Listening on ", *httpsBindAddr)
|
||||
log.Fatal(http.ListenAndServeTLS(*httpsBindAddr, *certFile, *keyFile, m.api))
|
||||
}
|
||||
}()
|
||||
|
||||
// We want to block forever to let the HTTP and HTTPS handler serve the APIs
|
||||
select {}
|
||||
}
|
||||
|
||||
// A monolith contains all the dendrite components.
|
||||
// Some of the setup functions depend on previous setup functions, so they must
|
||||
// be called in the same order as they are defined in the file.
|
||||
type monolith struct {
|
||||
cfg *config.Dendrite
|
||||
api *mux.Router
|
||||
|
||||
roomServerDB *roomserver_storage.Database
|
||||
accountDB *accounts.Database
|
||||
deviceDB *devices.Database
|
||||
keyDB *keydb.Database
|
||||
mediaAPIDB *mediaapi_storage.Database
|
||||
syncAPIDB *syncapi_storage.SyncServerDatabase
|
||||
federationSenderDB *federationsender_storage.Database
|
||||
|
||||
federation *gomatrixserverlib.FederationClient
|
||||
keyRing gomatrixserverlib.KeyRing
|
||||
|
||||
inputAPI *roomserver_input.RoomserverInputAPI
|
||||
queryAPI *roomserver_query.RoomserverQueryAPI
|
||||
aliasAPI *roomserver_alias.RoomserverAliasAPI
|
||||
|
||||
roomServerProducer *producers.RoomserverProducer
|
||||
userUpdateProducer *producers.UserUpdateProducer
|
||||
syncProducer *producers.SyncAPIProducer
|
||||
|
||||
syncAPINotifier *syncapi_sync.Notifier
|
||||
}
|
||||
|
||||
func newMonolith(cfg *config.Dendrite) *monolith {
|
||||
return &monolith{cfg: cfg, api: mux.NewRouter()}
|
||||
}
|
||||
|
||||
func (m *monolith) setupDatabases() {
|
||||
var err error
|
||||
m.roomServerDB, err = roomserver_storage.Open(string(m.cfg.Database.RoomServer))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
m.accountDB, err = accounts.NewDatabase(string(m.cfg.Database.Account), m.cfg.Matrix.ServerName)
|
||||
if err != nil {
|
||||
log.Panicf("Failed to setup account database(%q): %s", m.cfg.Database.Account, err.Error())
|
||||
}
|
||||
m.deviceDB, err = devices.NewDatabase(string(m.cfg.Database.Device), m.cfg.Matrix.ServerName)
|
||||
if err != nil {
|
||||
log.Panicf("Failed to setup device database(%q): %s", m.cfg.Database.Device, err.Error())
|
||||
}
|
||||
m.keyDB, err = keydb.NewDatabase(string(m.cfg.Database.ServerKey))
|
||||
if err != nil {
|
||||
log.Panicf("Failed to setup key database(%q): %s", m.cfg.Database.ServerKey, err.Error())
|
||||
}
|
||||
m.mediaAPIDB, err = mediaapi_storage.Open(string(m.cfg.Database.MediaAPI))
|
||||
if err != nil {
|
||||
log.Panicf("Failed to setup sync api database(%q): %s", m.cfg.Database.MediaAPI, err.Error())
|
||||
}
|
||||
m.syncAPIDB, err = syncapi_storage.NewSyncServerDatabase(string(m.cfg.Database.SyncAPI))
|
||||
if err != nil {
|
||||
log.Panicf("Failed to setup sync api database(%q): %s", m.cfg.Database.SyncAPI, err.Error())
|
||||
}
|
||||
m.federationSenderDB, err = federationsender_storage.NewDatabase(string(m.cfg.Database.FederationSender))
|
||||
if err != nil {
|
||||
log.Panicf("startup: failed to create federation sender database with data source %s : %s", m.cfg.Database.FederationSender, err)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *monolith) setupFederation() {
|
||||
m.federation = gomatrixserverlib.NewFederationClient(
|
||||
m.cfg.Matrix.ServerName, m.cfg.Matrix.KeyID, m.cfg.Matrix.PrivateKey,
|
||||
)
|
||||
|
||||
m.keyRing = gomatrixserverlib.KeyRing{
|
||||
KeyFetchers: []gomatrixserverlib.KeyFetcher{
|
||||
// TODO: Use perspective key fetchers for production.
|
||||
&gomatrixserverlib.DirectKeyFetcher{Client: m.federation.Client},
|
||||
},
|
||||
KeyDatabase: m.keyDB,
|
||||
}
|
||||
}
|
||||
|
||||
func (m *monolith) setupRoomServer() {
|
||||
kafkaProducer, err := sarama.NewSyncProducer(m.cfg.Kafka.Addresses, nil)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
m.inputAPI = &roomserver_input.RoomserverInputAPI{
|
||||
DB: m.roomServerDB,
|
||||
Producer: kafkaProducer,
|
||||
OutputRoomEventTopic: string(m.cfg.Kafka.Topics.OutputRoomEvent),
|
||||
}
|
||||
|
||||
m.queryAPI = &roomserver_query.RoomserverQueryAPI{
|
||||
DB: m.roomServerDB,
|
||||
}
|
||||
|
||||
m.aliasAPI = &roomserver_alias.RoomserverAliasAPI{
|
||||
DB: m.roomServerDB,
|
||||
Cfg: m.cfg,
|
||||
InputAPI: m.inputAPI,
|
||||
QueryAPI: m.queryAPI,
|
||||
}
|
||||
}
|
||||
|
||||
func (m *monolith) setupProducers() {
|
||||
var err error
|
||||
m.roomServerProducer = producers.NewRoomserverProducer(m.inputAPI)
|
||||
m.userUpdateProducer, err = producers.NewUserUpdateProducer(
|
||||
m.cfg.Kafka.Addresses, string(m.cfg.Kafka.Topics.UserUpdates),
|
||||
)
|
||||
if err != nil {
|
||||
log.Panicf("Failed to setup kafka producers(%q): %s", m.cfg.Kafka.Addresses, err)
|
||||
}
|
||||
m.syncProducer, err = producers.NewSyncAPIProducer(
|
||||
m.cfg.Kafka.Addresses, string(m.cfg.Kafka.Topics.OutputClientData),
|
||||
)
|
||||
if err != nil {
|
||||
log.Panicf("Failed to setup kafka producers(%q): %s", m.cfg.Kafka.Addresses, err)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *monolith) setupNotifiers() {
|
||||
pos, err := m.syncAPIDB.SyncStreamPosition()
|
||||
if err != nil {
|
||||
log.Panicf("startup: failed to get latest sync stream position : %s", err)
|
||||
}
|
||||
|
||||
m.syncAPINotifier = syncapi_sync.NewNotifier(syncapi_types.StreamPosition(pos))
|
||||
if err = m.syncAPINotifier.Load(m.syncAPIDB); err != nil {
|
||||
log.Panicf("startup: failed to set up notifier: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *monolith) setupConsumers() {
|
||||
clientAPIConsumer, err := clientapi_consumers.NewOutputRoomEvent(m.cfg, m.accountDB)
|
||||
if err != nil {
|
||||
log.Panicf("startup: failed to create room server consumer: %s", err)
|
||||
}
|
||||
if err = clientAPIConsumer.Start(); err != nil {
|
||||
log.Panicf("startup: failed to start room server consumer")
|
||||
}
|
||||
|
||||
syncAPIRoomConsumer, err := syncapi_consumers.NewOutputRoomEvent(
|
||||
m.cfg, m.syncAPINotifier, m.syncAPIDB,
|
||||
)
|
||||
if err != nil {
|
||||
log.Panicf("startup: failed to create room server consumer: %s", err)
|
||||
}
|
||||
if err = syncAPIRoomConsumer.Start(); err != nil {
|
||||
log.Panicf("startup: failed to start room server consumer: %s", err)
|
||||
}
|
||||
|
||||
syncAPIClientConsumer, err := syncapi_consumers.NewOutputClientData(
|
||||
m.cfg, m.syncAPINotifier, m.syncAPIDB,
|
||||
)
|
||||
if err != nil {
|
||||
log.Panicf("startup: failed to create client API server consumer: %s", err)
|
||||
}
|
||||
if err = syncAPIClientConsumer.Start(); err != nil {
|
||||
log.Panicf("startup: failed to start client API server consumer: %s", err)
|
||||
}
|
||||
|
||||
federationSenderQueues := queue.NewOutgoingQueues(m.cfg.Matrix.ServerName, m.federation)
|
||||
|
||||
federationSenderRoomConsumer, err := federationsender_consumers.NewOutputRoomEvent(
|
||||
m.cfg, federationSenderQueues, m.federationSenderDB,
|
||||
)
|
||||
if err != nil {
|
||||
log.WithError(err).Panicf("startup: failed to create room server consumer")
|
||||
}
|
||||
if err = federationSenderRoomConsumer.Start(); err != nil {
|
||||
log.WithError(err).Panicf("startup: failed to start room server consumer")
|
||||
}
|
||||
}
|
||||
|
||||
func (m *monolith) setupAPIs() {
|
||||
clientapi_routing.Setup(
|
||||
m.api, http.DefaultClient, *m.cfg, m.roomServerProducer,
|
||||
m.queryAPI, m.aliasAPI, m.accountDB, m.deviceDB, m.federation, m.keyRing,
|
||||
m.userUpdateProducer, m.syncProducer,
|
||||
)
|
||||
|
||||
mediaapi_routing.Setup(
|
||||
m.api, http.DefaultClient, m.cfg, m.mediaAPIDB,
|
||||
)
|
||||
|
||||
syncapi_routing.Setup(m.api, syncapi_sync.NewRequestPool(
|
||||
m.syncAPIDB, m.syncAPINotifier, m.accountDB,
|
||||
), m.deviceDB)
|
||||
|
||||
federationapi_routing.Setup(
|
||||
m.api, *m.cfg, m.queryAPI, m.roomServerProducer, m.keyRing, m.federation,
|
||||
)
|
||||
}
|
||||
|
|
@ -67,15 +67,15 @@ func main() {
|
|||
|
||||
inputAPI.SetupHTTP(http.DefaultServeMux)
|
||||
|
||||
queryAPI := query.RoomserverQueryAPI{db}
|
||||
queryAPI := query.RoomserverQueryAPI{DB: db}
|
||||
|
||||
queryAPI.SetupHTTP(http.DefaultServeMux)
|
||||
|
||||
aliasAPI := alias.RoomserverAliasAPI{
|
||||
DB: db,
|
||||
Cfg: cfg,
|
||||
InputAPI: inputAPI,
|
||||
QueryAPI: queryAPI,
|
||||
InputAPI: &inputAPI,
|
||||
QueryAPI: &queryAPI,
|
||||
}
|
||||
|
||||
aliasAPI.SetupHTTP(http.DefaultServeMux)
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ import (
|
|||
"net/http"
|
||||
"os"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
|
||||
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
|
||||
"github.com/matrix-org/dendrite/common"
|
||||
|
|
@ -89,6 +90,10 @@ func main() {
|
|||
}
|
||||
|
||||
log.Info("Starting sync server on ", cfg.Listen.SyncAPI)
|
||||
routing.SetupSyncServerListeners(http.DefaultServeMux, http.DefaultClient, sync.NewRequestPool(db, n, adb), deviceDB)
|
||||
|
||||
api := mux.NewRouter()
|
||||
routing.Setup(api, sync.NewRequestPool(db, n, adb), deviceDB)
|
||||
common.SetupHTTPAPI(http.DefaultServeMux, api)
|
||||
|
||||
log.Fatal(http.ListenAndServe(string(cfg.Listen.SyncAPI), nil))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ package common
|
|||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/matrix-org/dendrite/clientapi/auth"
|
||||
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
||||
"github.com/matrix-org/util"
|
||||
|
|
@ -26,3 +27,10 @@ func MakeAPI(metricsName string, f func(*http.Request) util.JSONResponse) http.H
|
|||
h := util.NewJSONRequestHandler(f)
|
||||
return prometheus.InstrumentHandler(metricsName, util.MakeJSONAPI(h))
|
||||
}
|
||||
|
||||
// SetupHTTPAPI registers an HTTP API mux under /api and sets up a metrics
|
||||
// listener.
|
||||
func SetupHTTPAPI(servMux *http.ServeMux, apiMux *mux.Router) {
|
||||
servMux.Handle("/metrics", prometheus.Handler())
|
||||
servMux.Handle("/api/", http.StripPrefix("/api", apiMux))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,13 +17,14 @@ package keydb
|
|||
import (
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/lib/pq"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
const serverKeysSchema = `
|
||||
-- A cache of server keys downloaded from remote servers.
|
||||
CREATE TABLE IF NOT EXISTS server_keys (
|
||||
CREATE TABLE IF NOT EXISTS keydb_server_keys (
|
||||
-- The name of the matrix server the key is for.
|
||||
server_name TEXT NOT NULL,
|
||||
-- The ID of the server key.
|
||||
|
|
@ -35,21 +36,21 @@ CREATE TABLE IF NOT EXISTS server_keys (
|
|||
valid_until_ts BIGINT NOT NULL,
|
||||
-- The raw JSON for the server key.
|
||||
server_key_json TEXT NOT NULL,
|
||||
CONSTRAINT server_keys_unique UNIQUE (server_name, server_key_id)
|
||||
CONSTRAINT keydb_server_keys_unique UNIQUE (server_name, server_key_id)
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS server_name_and_key_id ON server_keys (server_name_and_key_id);
|
||||
CREATE INDEX IF NOT EXISTS keydb_server_name_and_key_id ON keydb_server_keys (server_name_and_key_id);
|
||||
`
|
||||
|
||||
const bulkSelectServerKeysSQL = "" +
|
||||
"SELECT server_name, server_key_id, server_key_json FROM server_keys" +
|
||||
"SELECT server_name, server_key_id, server_key_json FROM keydb_server_keys" +
|
||||
" WHERE server_name_and_key_id = ANY($1)"
|
||||
|
||||
const upsertServerKeysSQL = "" +
|
||||
"INSERT INTO server_keys (server_name, server_key_id," +
|
||||
"INSERT INTO keydb_server_keys (server_name, server_key_id," +
|
||||
" server_name_and_key_id, valid_until_ts, server_key_json)" +
|
||||
" VALUES ($1, $2, $3, $4, $5)" +
|
||||
" ON CONFLICT ON CONSTRAINT server_keys_unique" +
|
||||
" ON CONFLICT ON CONSTRAINT keydb_server_keys_unique" +
|
||||
" DO UPDATE SET valid_until_ts = $4, server_key_json = $5"
|
||||
|
||||
type serverKeyStatements struct {
|
||||
|
|
|
|||
|
|
@ -15,26 +15,27 @@
|
|||
package common
|
||||
|
||||
import "database/sql"
|
||||
import "strings"
|
||||
|
||||
const partitionOffsetsSchema = `
|
||||
-- The offsets that the server has processed up to.
|
||||
CREATE TABLE IF NOT EXISTS partition_offsets (
|
||||
CREATE TABLE IF NOT EXISTS ${prefix}_partition_offsets (
|
||||
-- The name of the topic.
|
||||
topic TEXT NOT NULL,
|
||||
-- The 32-bit partition ID
|
||||
partition INTEGER NOT NULL,
|
||||
-- The 64-bit offset.
|
||||
partition_offset BIGINT NOT NULL,
|
||||
CONSTRAINT topic_partition_unique UNIQUE (topic, partition)
|
||||
CONSTRAINT ${prefix}_topic_partition_unique UNIQUE (topic, partition)
|
||||
);
|
||||
`
|
||||
|
||||
const selectPartitionOffsetsSQL = "" +
|
||||
"SELECT partition, partition_offset FROM partition_offsets WHERE topic = $1"
|
||||
"SELECT partition, partition_offset FROM ${prefix}_partition_offsets WHERE topic = $1"
|
||||
|
||||
const upsertPartitionOffsetsSQL = "" +
|
||||
"INSERT INTO partition_offsets (topic, partition, partition_offset) VALUES ($1, $2, $3)" +
|
||||
" ON CONFLICT ON CONSTRAINT topic_partition_unique" +
|
||||
"INSERT INTO ${prefix}_partition_offsets (topic, partition, partition_offset) VALUES ($1, $2, $3)" +
|
||||
" ON CONFLICT ON CONSTRAINT ${prefix}_topic_partition_unique" +
|
||||
" DO UPDATE SET partition_offset = $3"
|
||||
|
||||
// PartitionOffsetStatements represents a set of statements that can be run on a partition_offsets table.
|
||||
|
|
@ -44,15 +45,21 @@ type PartitionOffsetStatements struct {
|
|||
}
|
||||
|
||||
// Prepare converts the raw SQL statements into prepared statements.
|
||||
func (s *PartitionOffsetStatements) Prepare(db *sql.DB) (err error) {
|
||||
_, err = db.Exec(partitionOffsetsSchema)
|
||||
// Takes a prefix to prepend to the table name used to store the partition offsets.
|
||||
// This allows multiple components to share the same database schema.
|
||||
func (s *PartitionOffsetStatements) Prepare(db *sql.DB, prefix string) (err error) {
|
||||
_, err = db.Exec(strings.Replace(partitionOffsetsSchema, "${prefix}", prefix, -1))
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if s.selectPartitionOffsetsStmt, err = db.Prepare(selectPartitionOffsetsSQL); err != nil {
|
||||
if s.selectPartitionOffsetsStmt, err = db.Prepare(
|
||||
strings.Replace(selectPartitionOffsetsSQL, "${prefix}", prefix, -1),
|
||||
); err != nil {
|
||||
return
|
||||
}
|
||||
if s.upsertPartitionOffsetStmt, err = db.Prepare(upsertPartitionOffsetsSQL); err != nil {
|
||||
if s.upsertPartitionOffsetStmt, err = db.Prepare(
|
||||
strings.Replace(upsertPartitionOffsetsSQL, "${prefix}", prefix, -1),
|
||||
); err != nil {
|
||||
return
|
||||
}
|
||||
return
|
||||
|
|
|
|||
|
|
@ -15,6 +15,9 @@
|
|||
package routing
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/matrix-org/dendrite/clientapi/producers"
|
||||
"github.com/matrix-org/dendrite/common/config"
|
||||
|
|
@ -24,8 +27,6 @@ import (
|
|||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/util"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
|
|
@ -35,14 +36,13 @@ const (
|
|||
|
||||
// Setup registers HTTP handlers with the given ServeMux.
|
||||
func Setup(
|
||||
servMux *http.ServeMux,
|
||||
apiMux *mux.Router,
|
||||
cfg config.Dendrite,
|
||||
query api.RoomserverQueryAPI,
|
||||
producer *producers.RoomserverProducer,
|
||||
keys gomatrixserverlib.KeyRing,
|
||||
federation *gomatrixserverlib.FederationClient,
|
||||
) {
|
||||
apiMux := mux.NewRouter()
|
||||
v2keysmux := apiMux.PathPrefix(pathPrefixV2Keys).Subrouter()
|
||||
v1fedmux := apiMux.PathPrefix(pathPrefixV1Federation).Subrouter()
|
||||
|
||||
|
|
@ -67,9 +67,6 @@ func Setup(
|
|||
)
|
||||
},
|
||||
))
|
||||
|
||||
servMux.Handle("/metrics", prometheus.Handler())
|
||||
servMux.Handle("/api/", http.StripPrefix("/api", apiMux))
|
||||
}
|
||||
|
||||
func makeAPI(metricsName string, f func(*http.Request) util.JSONResponse) http.Handler {
|
||||
|
|
|
|||
|
|
@ -26,7 +26,7 @@ const joinedHostsSchema = `
|
|||
-- The joined_hosts table stores a list of m.room.member event ids in the
|
||||
-- current state for each room where the membership is "join".
|
||||
-- There will be an entry for every user that is joined to the room.
|
||||
CREATE TABLE IF NOT EXISTS joined_hosts (
|
||||
CREATE TABLE IF NOT EXISTS federationsender_joined_hosts (
|
||||
-- The string ID of the room.
|
||||
room_id TEXT NOT NULL,
|
||||
-- The event ID of the m.room.member join event.
|
||||
|
|
@ -35,22 +35,22 @@ CREATE TABLE IF NOT EXISTS joined_hosts (
|
|||
server_name TEXT NOT NULL
|
||||
);
|
||||
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS joined_hosts_event_id_idx
|
||||
ON joined_hosts (event_id);
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS federatonsender_joined_hosts_event_id_idx
|
||||
ON federationsender_joined_hosts (event_id);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS joined_hosts_room_id_idx
|
||||
ON joined_hosts (room_id)
|
||||
CREATE INDEX IF NOT EXISTS federatonsender_joined_hosts_room_id_idx
|
||||
ON federationsender_joined_hosts (room_id)
|
||||
`
|
||||
|
||||
const insertJoinedHostsSQL = "" +
|
||||
"INSERT INTO joined_hosts (room_id, event_id, server_name)" +
|
||||
"INSERT INTO federationsender_joined_hosts (room_id, event_id, server_name)" +
|
||||
" VALUES ($1, $2, $3)"
|
||||
|
||||
const deleteJoinedHostsSQL = "" +
|
||||
"DELETE FROM joined_hosts WHERE event_id = ANY($1)"
|
||||
"DELETE FROM federationsender_joined_hosts WHERE event_id = ANY($1)"
|
||||
|
||||
const selectJoinedHostsSQL = "" +
|
||||
"SELECT event_id, server_name FROM joined_hosts" +
|
||||
"SELECT event_id, server_name FROM federationsender_joined_hosts" +
|
||||
" WHERE room_id = $1"
|
||||
|
||||
type joinedHostsStatements struct {
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@ import (
|
|||
)
|
||||
|
||||
const roomSchema = `
|
||||
CREATE TABLE IF NOT EXISTS rooms (
|
||||
CREATE TABLE IF NOT EXISTS federationsender_rooms (
|
||||
-- The string ID of the room
|
||||
room_id TEXT PRIMARY KEY,
|
||||
-- The most recent event state by the room server.
|
||||
|
|
@ -29,14 +29,14 @@ CREATE TABLE IF NOT EXISTS rooms (
|
|||
);`
|
||||
|
||||
const insertRoomSQL = "" +
|
||||
"INSERT INTO rooms (room_id, last_event_id) VALUES ($1, '')" +
|
||||
"INSERT INTO federationsender_rooms (room_id, last_event_id) VALUES ($1, '')" +
|
||||
" ON CONFLICT DO NOTHING"
|
||||
|
||||
const selectRoomForUpdateSQL = "" +
|
||||
"SELECT last_event_id FROM rooms WHERE room_id = $1 FOR UPDATE"
|
||||
"SELECT last_event_id FROM federationsender_rooms WHERE room_id = $1 FOR UPDATE"
|
||||
|
||||
const updateRoomSQL = "" +
|
||||
"UPDATE rooms SET last_event_id = $2 WHERE room_id = $1"
|
||||
"UPDATE federationsender_rooms SET last_event_id = $2 WHERE room_id = $1"
|
||||
|
||||
type roomStatements struct {
|
||||
insertRoomStmt *sql.Stmt
|
||||
|
|
|
|||
|
|
@ -53,7 +53,7 @@ func (d *Database) prepare() error {
|
|||
return err
|
||||
}
|
||||
|
||||
if err = d.PartitionOffsetStatements.Prepare(d.db); err != nil {
|
||||
if err = d.PartitionOffsetStatements.Prepare(d.db, "federationsender"); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -30,10 +30,8 @@ import (
|
|||
|
||||
const pathPrefixR0 = "/_matrix/media/v1"
|
||||
|
||||
// Setup registers HTTP handlers with the given ServeMux. It also supplies the given http.Client
|
||||
// to clients which need to make outbound HTTP requests.
|
||||
func Setup(servMux *http.ServeMux, httpClient *http.Client, cfg *config.Dendrite, db *storage.Database) {
|
||||
apiMux := mux.NewRouter()
|
||||
// Setup registers the media API HTTP handlers
|
||||
func Setup(apiMux *mux.Router, httpClient *http.Client, cfg *config.Dendrite, db *storage.Database) {
|
||||
r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter()
|
||||
|
||||
activeThumbnailGeneration := &types.ActiveThumbnailGeneration{
|
||||
|
|
@ -54,9 +52,6 @@ func Setup(servMux *http.ServeMux, httpClient *http.Client, cfg *config.Dendrite
|
|||
r0mux.Handle("/thumbnail/{serverName}/{mediaId}",
|
||||
makeDownloadAPI("thumbnail", cfg, db, activeRemoteRequests, activeThumbnailGeneration),
|
||||
)
|
||||
|
||||
servMux.Handle("/metrics", prometheus.Handler())
|
||||
servMux.Handle("/api/", http.StripPrefix("/api", apiMux))
|
||||
}
|
||||
|
||||
func makeDownloadAPI(name string, cfg *config.Dendrite, db *storage.Database, activeRemoteRequests *types.ActiveRemoteRequests, activeThumbnailGeneration *types.ActiveThumbnailGeneration) http.HandlerFunc {
|
||||
|
|
|
|||
|
|
@ -25,7 +25,7 @@ import (
|
|||
const mediaSchema = `
|
||||
-- The media_repository table holds metadata for each media file stored and accessible to the local server,
|
||||
-- the actual file is stored separately.
|
||||
CREATE TABLE IF NOT EXISTS media_repository (
|
||||
CREATE TABLE IF NOT EXISTS mediaapi_media_repository (
|
||||
-- The id used to refer to the media.
|
||||
-- For uploads to this server this is a base64-encoded sha256 hash of the file data
|
||||
-- For media from remote servers, this can be any unique identifier string
|
||||
|
|
@ -45,16 +45,16 @@ CREATE TABLE IF NOT EXISTS media_repository (
|
|||
-- The user who uploaded the file. Should be a Matrix user ID.
|
||||
user_id TEXT NOT NULL
|
||||
);
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS media_repository_index ON media_repository (media_id, media_origin);
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS mediaapi_media_repository_index ON mediaapi_media_repository (media_id, media_origin);
|
||||
`
|
||||
|
||||
const insertMediaSQL = `
|
||||
INSERT INTO media_repository (media_id, media_origin, content_type, file_size_bytes, creation_ts, upload_name, base64hash, user_id)
|
||||
INSERT INTO mediaapi_media_repository (media_id, media_origin, content_type, file_size_bytes, creation_ts, upload_name, base64hash, user_id)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
||||
`
|
||||
|
||||
const selectMediaSQL = `
|
||||
SELECT content_type, file_size_bytes, creation_ts, upload_name, base64hash, user_id FROM media_repository WHERE media_id = $1 AND media_origin = $2
|
||||
SELECT content_type, file_size_bytes, creation_ts, upload_name, base64hash, user_id FROM mediaapi_media_repository WHERE media_id = $1 AND media_origin = $2
|
||||
`
|
||||
|
||||
type mediaStatements struct {
|
||||
|
|
|
|||
|
|
@ -23,9 +23,9 @@ import (
|
|||
)
|
||||
|
||||
const thumbnailSchema = `
|
||||
-- The thumbnail table holds metadata for each thumbnail file stored and accessible to the local server,
|
||||
-- The mediaapi_thumbnail table holds metadata for each thumbnail file stored and accessible to the local server,
|
||||
-- the actual file is stored separately.
|
||||
CREATE TABLE IF NOT EXISTS thumbnail (
|
||||
CREATE TABLE IF NOT EXISTS mediaapi_thumbnail (
|
||||
-- The id used to refer to the media.
|
||||
-- For uploads to this server this is a base64-encoded sha256 hash of the file data
|
||||
-- For media from remote servers, this can be any unique identifier string
|
||||
|
|
@ -45,22 +45,22 @@ CREATE TABLE IF NOT EXISTS thumbnail (
|
|||
-- The resize method used to generate the thumbnail. Can be crop or scale.
|
||||
resize_method TEXT NOT NULL
|
||||
);
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS thumbnail_index ON thumbnail (media_id, media_origin, width, height, resize_method);
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS mediaapi_thumbnail_index ON mediaapi_thumbnail (media_id, media_origin, width, height, resize_method);
|
||||
`
|
||||
|
||||
const insertThumbnailSQL = `
|
||||
INSERT INTO thumbnail (media_id, media_origin, content_type, file_size_bytes, creation_ts, width, height, resize_method)
|
||||
INSERT INTO mediaapi_thumbnail (media_id, media_origin, content_type, file_size_bytes, creation_ts, width, height, resize_method)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
||||
`
|
||||
|
||||
// Note: this selects one specific thumbnail
|
||||
const selectThumbnailSQL = `
|
||||
SELECT content_type, file_size_bytes, creation_ts FROM thumbnail WHERE media_id = $1 AND media_origin = $2 AND width = $3 AND height = $4 AND resize_method = $5
|
||||
SELECT content_type, file_size_bytes, creation_ts FROM mediaapi_thumbnail WHERE media_id = $1 AND media_origin = $2 AND width = $3 AND height = $4 AND resize_method = $5
|
||||
`
|
||||
|
||||
// Note: this selects all thumbnails for a media_origin and media_id
|
||||
const selectThumbnailsSQL = `
|
||||
SELECT content_type, file_size_bytes, creation_ts, width, height, resize_method FROM thumbnail WHERE media_id = $1 AND media_origin = $2
|
||||
SELECT content_type, file_size_bytes, creation_ts, width, height, resize_method FROM mediaapi_thumbnail WHERE media_id = $1 AND media_origin = $2
|
||||
`
|
||||
|
||||
type thumbnailStatements struct {
|
||||
|
|
|
|||
|
|
@ -23,8 +23,6 @@ import (
|
|||
"github.com/matrix-org/dendrite/common"
|
||||
"github.com/matrix-org/dendrite/common/config"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/roomserver/input"
|
||||
"github.com/matrix-org/dendrite/roomserver/query"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/util"
|
||||
)
|
||||
|
|
@ -49,8 +47,8 @@ type RoomserverAliasAPIDatabase interface {
|
|||
type RoomserverAliasAPI struct {
|
||||
DB RoomserverAliasAPIDatabase
|
||||
Cfg *config.Dendrite
|
||||
InputAPI input.RoomserverInputAPI
|
||||
QueryAPI query.RoomserverQueryAPI
|
||||
InputAPI api.RoomserverInputAPI
|
||||
QueryAPI api.RoomserverQueryAPI
|
||||
}
|
||||
|
||||
// SetRoomAlias implements api.RoomserverAliasAPI
|
||||
|
|
|
|||
|
|
@ -16,13 +16,14 @@ package storage
|
|||
|
||||
import (
|
||||
"database/sql"
|
||||
|
||||
"github.com/matrix-org/dendrite/roomserver/types"
|
||||
)
|
||||
|
||||
const eventJSONSchema = `
|
||||
-- Stores the JSON for each event. This kept separate from the main events
|
||||
-- table to keep the rows in the main events table small.
|
||||
CREATE TABLE IF NOT EXISTS event_json (
|
||||
CREATE TABLE IF NOT EXISTS roomserver_event_json (
|
||||
-- Local numeric ID for the event.
|
||||
event_nid BIGINT NOT NULL PRIMARY KEY,
|
||||
-- The JSON for the event.
|
||||
|
|
@ -37,14 +38,14 @@ CREATE TABLE IF NOT EXISTS event_json (
|
|||
`
|
||||
|
||||
const insertEventJSONSQL = "" +
|
||||
"INSERT INTO event_json (event_nid, event_json) VALUES ($1, $2)" +
|
||||
"INSERT INTO roomserver_event_json (event_nid, event_json) VALUES ($1, $2)" +
|
||||
" ON CONFLICT DO NOTHING"
|
||||
|
||||
// Bulk event JSON lookup by numeric event ID.
|
||||
// Sort by the numeric event ID.
|
||||
// This means that we can use binary search to lookup by numeric event ID.
|
||||
const bulkSelectEventJSONSQL = "" +
|
||||
"SELECT event_nid, event_json FROM event_json" +
|
||||
"SELECT event_nid, event_json FROM roomserver_event_json" +
|
||||
" WHERE event_nid = ANY($1)" +
|
||||
" ORDER BY event_nid ASC"
|
||||
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ package storage
|
|||
|
||||
import (
|
||||
"database/sql"
|
||||
|
||||
"github.com/lib/pq"
|
||||
"github.com/matrix-org/dendrite/roomserver/types"
|
||||
)
|
||||
|
|
@ -31,29 +32,30 @@ const eventStateKeysSchema = `
|
|||
-- Other state keys are automatically assigned numeric IDs starting from 2**16.
|
||||
-- This leaves room to add more pre-assigned numeric IDs and clearly separates
|
||||
-- the automatically assigned IDs from the pre-assigned IDs.
|
||||
CREATE SEQUENCE IF NOT EXISTS event_state_key_nid_seq START 65536;
|
||||
CREATE TABLE IF NOT EXISTS event_state_keys (
|
||||
CREATE SEQUENCE IF NOT EXISTS roomserver_event_state_key_nid_seq START 65536;
|
||||
CREATE TABLE IF NOT EXISTS roomserver_event_state_keys (
|
||||
-- Local numeric ID for the state key.
|
||||
event_state_key_nid BIGINT PRIMARY KEY DEFAULT nextval('event_state_key_nid_seq'),
|
||||
event_state_key TEXT NOT NULL CONSTRAINT event_state_key_unique UNIQUE
|
||||
event_state_key_nid BIGINT PRIMARY KEY DEFAULT nextval('roomserver_event_state_key_nid_seq'),
|
||||
event_state_key TEXT NOT NULL CONSTRAINT roomserver_event_state_key_unique UNIQUE
|
||||
);
|
||||
INSERT INTO event_state_keys (event_state_key_nid, event_state_key) VALUES
|
||||
INSERT INTO roomserver_event_state_keys (event_state_key_nid, event_state_key) VALUES
|
||||
(1, '') ON CONFLICT DO NOTHING;
|
||||
`
|
||||
|
||||
// Same as insertEventTypeNIDSQL
|
||||
const insertEventStateKeyNIDSQL = "" +
|
||||
"INSERT INTO event_state_keys (event_state_key) VALUES ($1)" +
|
||||
" ON CONFLICT ON CONSTRAINT event_state_key_unique" +
|
||||
"INSERT INTO roomserver_event_state_keys (event_state_key) VALUES ($1)" +
|
||||
" ON CONFLICT ON CONSTRAINT roomserver_event_state_key_unique" +
|
||||
" DO NOTHING RETURNING (event_state_key_nid)"
|
||||
|
||||
const selectEventStateKeyNIDSQL = "" +
|
||||
"SELECT event_state_key_nid FROM event_state_keys WHERE event_state_key = $1"
|
||||
"SELECT event_state_key_nid FROM roomserver_event_state_keys" +
|
||||
" WHERE event_state_key = $1"
|
||||
|
||||
// Bulk lookup from string state key to numeric ID for that state key.
|
||||
// Takes an array of strings as the query parameter.
|
||||
const bulkSelectEventStateKeyNIDSQL = "" +
|
||||
"SELECT event_state_key, event_state_key_nid FROM event_state_keys" +
|
||||
"SELECT event_state_key, event_state_key_nid FROM roomserver_event_state_keys" +
|
||||
" WHERE event_state_key = ANY($1)"
|
||||
|
||||
type eventStateKeyStatements struct {
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ package storage
|
|||
|
||||
import (
|
||||
"database/sql"
|
||||
|
||||
"github.com/lib/pq"
|
||||
"github.com/matrix-org/dendrite/roomserver/types"
|
||||
)
|
||||
|
|
@ -43,14 +44,14 @@ const eventTypesSchema = `
|
|||
-- Other event types are automatically assigned numeric IDs starting from 2**16.
|
||||
-- This leaves room to add more pre-assigned numeric IDs and clearly separates
|
||||
-- the automatically assigned IDs from the pre-assigned IDs.
|
||||
CREATE SEQUENCE IF NOT EXISTS event_type_nid_seq START 65536;
|
||||
CREATE TABLE IF NOT EXISTS event_types (
|
||||
CREATE SEQUENCE IF NOT EXISTS roomserver_event_type_nid_seq START 65536;
|
||||
CREATE TABLE IF NOT EXISTS roomserver_event_types (
|
||||
-- Local numeric ID for the event type.
|
||||
event_type_nid BIGINT PRIMARY KEY DEFAULT nextval('event_type_nid_seq'),
|
||||
event_type_nid BIGINT PRIMARY KEY DEFAULT nextval('roomserver_event_type_nid_seq'),
|
||||
-- The string event_type.
|
||||
event_type TEXT NOT NULL CONSTRAINT event_type_unique UNIQUE
|
||||
event_type TEXT NOT NULL CONSTRAINT roomserver_event_type_unique UNIQUE
|
||||
);
|
||||
INSERT INTO event_types (event_type_nid, event_type) VALUES
|
||||
INSERT INTO roomserver_event_types (event_type_nid, event_type) VALUES
|
||||
(1, 'm.room.create'),
|
||||
(2, 'm.room.power_levels'),
|
||||
(3, 'm.room.join_rules'),
|
||||
|
|
@ -74,17 +75,17 @@ INSERT INTO event_types (event_type_nid, event_type) VALUES
|
|||
// row even though the data doesn't change resulting in unncesssary modifications
|
||||
// to the indexes.
|
||||
const insertEventTypeNIDSQL = "" +
|
||||
"INSERT INTO event_types (event_type) VALUES ($1)" +
|
||||
" ON CONFLICT ON CONSTRAINT event_type_unique" +
|
||||
"INSERT INTO roomserver_event_types (event_type) VALUES ($1)" +
|
||||
" ON CONFLICT ON CONSTRAINT roomserver_event_type_unique" +
|
||||
" DO NOTHING RETURNING (event_type_nid)"
|
||||
|
||||
const selectEventTypeNIDSQL = "" +
|
||||
"SELECT event_type_nid FROM event_types WHERE event_type = $1"
|
||||
"SELECT event_type_nid FROM roomserver_event_types WHERE event_type = $1"
|
||||
|
||||
// Bulk lookup from string event type to numeric ID for that event type.
|
||||
// Takes an array of strings as the query parameter.
|
||||
const bulkSelectEventTypeNIDSQL = "" +
|
||||
"SELECT event_type, event_type_nid FROM event_types" +
|
||||
"SELECT event_type, event_type_nid FROM roomserver_event_types" +
|
||||
" WHERE event_type = ANY($1)"
|
||||
|
||||
type eventTypeStatements struct {
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ package storage
|
|||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
|
||||
"github.com/lib/pq"
|
||||
"github.com/matrix-org/dendrite/roomserver/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
|
|
@ -25,10 +26,10 @@ import (
|
|||
const eventsSchema = `
|
||||
-- The events table holds metadata for each event, the actual JSON is stored
|
||||
-- separately to keep the size of the rows small.
|
||||
CREATE SEQUENCE IF NOT EXISTS event_nid_seq;
|
||||
CREATE TABLE IF NOT EXISTS events (
|
||||
CREATE SEQUENCE IF NOT EXISTS roomserver_event_nid_seq;
|
||||
CREATE TABLE IF NOT EXISTS roomserver_events (
|
||||
-- Local numeric ID for the event.
|
||||
event_nid BIGINT PRIMARY KEY DEFAULT nextval('event_nid_seq'),
|
||||
event_nid BIGINT PRIMARY KEY DEFAULT nextval('roomserver_event_nid_seq'),
|
||||
-- Local numeric ID for the room the event is in.
|
||||
-- This is never 0.
|
||||
room_nid BIGINT NOT NULL,
|
||||
|
|
@ -53,7 +54,7 @@ CREATE TABLE IF NOT EXISTS events (
|
|||
-- Used to lookup the numeric ID when processing requests.
|
||||
-- Needed for state resolution.
|
||||
-- An event may only appear in this table once.
|
||||
event_id TEXT NOT NULL CONSTRAINT event_id_unique UNIQUE,
|
||||
event_id TEXT NOT NULL CONSTRAINT roomserver_event_id_unique UNIQUE,
|
||||
-- The sha256 reference hash for the event.
|
||||
-- Needed for setting reference hashes when sending new events.
|
||||
reference_sha256 BYTEA NOT NULL,
|
||||
|
|
@ -63,54 +64,54 @@ CREATE TABLE IF NOT EXISTS events (
|
|||
`
|
||||
|
||||
const insertEventSQL = "" +
|
||||
"INSERT INTO events (room_nid, event_type_nid, event_state_key_nid, event_id, reference_sha256, auth_event_nids, depth)" +
|
||||
"INSERT INTO roomserver_events (room_nid, event_type_nid, event_state_key_nid, event_id, reference_sha256, auth_event_nids, depth)" +
|
||||
" VALUES ($1, $2, $3, $4, $5, $6, $7)" +
|
||||
" ON CONFLICT ON CONSTRAINT event_id_unique" +
|
||||
" ON CONFLICT ON CONSTRAINT roomserver_event_id_unique" +
|
||||
" DO NOTHING" +
|
||||
" RETURNING event_nid, state_snapshot_nid"
|
||||
|
||||
const selectEventSQL = "" +
|
||||
"SELECT event_nid, state_snapshot_nid FROM events WHERE event_id = $1"
|
||||
"SELECT event_nid, state_snapshot_nid FROM roomserver_events WHERE event_id = $1"
|
||||
|
||||
// Bulk lookup of events by string ID.
|
||||
// Sort by the numeric IDs for event type and state key.
|
||||
// This means we can use binary search to lookup entries by type and state key.
|
||||
const bulkSelectStateEventByIDSQL = "" +
|
||||
"SELECT event_type_nid, event_state_key_nid, event_nid FROM events" +
|
||||
"SELECT event_type_nid, event_state_key_nid, event_nid FROM roomserver_events" +
|
||||
" WHERE event_id = ANY($1)" +
|
||||
" ORDER BY event_type_nid, event_state_key_nid ASC"
|
||||
|
||||
const bulkSelectStateAtEventByIDSQL = "" +
|
||||
"SELECT event_type_nid, event_state_key_nid, event_nid, state_snapshot_nid FROM events" +
|
||||
"SELECT event_type_nid, event_state_key_nid, event_nid, state_snapshot_nid FROM roomserver_events" +
|
||||
" WHERE event_id = ANY($1)"
|
||||
|
||||
const updateEventStateSQL = "" +
|
||||
"UPDATE events SET state_snapshot_nid = $2 WHERE event_nid = $1"
|
||||
"UPDATE roomserver_events SET state_snapshot_nid = $2 WHERE event_nid = $1"
|
||||
|
||||
const selectEventSentToOutputSQL = "" +
|
||||
"SELECT sent_to_output FROM events WHERE event_nid = $1"
|
||||
"SELECT sent_to_output FROM roomserver_events WHERE event_nid = $1"
|
||||
|
||||
const updateEventSentToOutputSQL = "" +
|
||||
"UPDATE events SET sent_to_output = TRUE WHERE event_nid = $1"
|
||||
"UPDATE roomserver_events SET sent_to_output = TRUE WHERE event_nid = $1"
|
||||
|
||||
const selectEventIDSQL = "" +
|
||||
"SELECT event_id FROM events WHERE event_nid = $1"
|
||||
"SELECT event_id FROM roomserver_events WHERE event_nid = $1"
|
||||
|
||||
const bulkSelectStateAtEventAndReferenceSQL = "" +
|
||||
"SELECT event_type_nid, event_state_key_nid, event_nid, state_snapshot_nid, event_id, reference_sha256" +
|
||||
" FROM events WHERE event_nid = ANY($1)"
|
||||
" FROM roomserver_events WHERE event_nid = ANY($1)"
|
||||
|
||||
const bulkSelectEventReferenceSQL = "" +
|
||||
"SELECT event_id, reference_sha256 FROM events WHERE event_nid = ANY($1)"
|
||||
"SELECT event_id, reference_sha256 FROM roomserver_events WHERE event_nid = ANY($1)"
|
||||
|
||||
const bulkSelectEventIDSQL = "" +
|
||||
"SELECT event_nid, event_id FROM events WHERE event_nid = ANY($1)"
|
||||
"SELECT event_nid, event_id FROM roomserver_events WHERE event_nid = ANY($1)"
|
||||
|
||||
const bulkSelectEventNIDSQL = "" +
|
||||
"SELECT event_id, event_nid FROM events WHERE event_id = ANY($1)"
|
||||
"SELECT event_id, event_nid FROM roomserver_events WHERE event_id = ANY($1)"
|
||||
|
||||
const selectMaxEventDepthSQL = "" +
|
||||
"SELECT COALESCE(MAX(depth) + 1, 0) FROM events WHERE event_nid = ANY($1)"
|
||||
"SELECT COALESCE(MAX(depth) + 1, 0) FROM roomserver_events WHERE event_nid = ANY($1)"
|
||||
|
||||
type eventStatements struct {
|
||||
insertEventStmt *sql.Stmt
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ package storage
|
|||
|
||||
import (
|
||||
"database/sql"
|
||||
|
||||
"github.com/matrix-org/dendrite/roomserver/types"
|
||||
)
|
||||
|
||||
|
|
@ -24,14 +25,14 @@ const previousEventSchema = `
|
|||
-- stored in the events table.
|
||||
-- This is used to tell if a new event is already referenced by an event in
|
||||
-- the database.
|
||||
CREATE TABLE IF NOT EXISTS previous_events (
|
||||
CREATE TABLE IF NOT EXISTS roomserver_previous_events (
|
||||
-- The string event ID taken from the prev_events key of an event.
|
||||
previous_event_id TEXT NOT NULL,
|
||||
-- The SHA256 reference hash taken from the prev_events key of an event.
|
||||
previous_reference_sha256 BYTEA NOT NULL,
|
||||
-- A list of numeric event IDs of events that reference this prev_event.
|
||||
event_nids BIGINT[] NOT NULL,
|
||||
CONSTRAINT previous_event_id_unique UNIQUE (previous_event_id, previous_reference_sha256)
|
||||
CONSTRAINT roomserver_previous_event_id_unique UNIQUE (previous_event_id, previous_reference_sha256)
|
||||
);
|
||||
`
|
||||
|
||||
|
|
@ -41,17 +42,17 @@ CREATE TABLE IF NOT EXISTS previous_events (
|
|||
// This should only be modified while holding a "FOR UPDATE" lock on the row in the rooms table for this room.
|
||||
// The lock is necessary to avoid data races when checking whether an event is already referenced by another event.
|
||||
const insertPreviousEventSQL = "" +
|
||||
"INSERT INTO previous_events" +
|
||||
"INSERT INTO roomserver_previous_events" +
|
||||
" (previous_event_id, previous_reference_sha256, event_nids)" +
|
||||
" VALUES ($1, $2, array_append('{}'::bigint[], $3))" +
|
||||
" ON CONFLICT ON CONSTRAINT previous_event_id_unique" +
|
||||
" DO UPDATE SET event_nids = array_append(previous_events.event_nids, $3)" +
|
||||
" WHERE $3 != ALL(previous_events.event_nids)"
|
||||
" ON CONFLICT ON CONSTRAINT roomserver_previous_event_id_unique" +
|
||||
" DO UPDATE SET event_nids = array_append(roomserver_previous_events.event_nids, $3)" +
|
||||
" WHERE $3 != ALL(roomserver_previous_events.event_nids)"
|
||||
|
||||
// Check if the event is referenced by another event in the table.
|
||||
// This should only be done while holding a "FOR UPDATE" lock on the row in the rooms table for this room.
|
||||
const selectPreviousEventExistsSQL = "" +
|
||||
"SELECT 1 FROM previous_events" +
|
||||
"SELECT 1 FROM roomserver_previous_events" +
|
||||
" WHERE previous_event_id = $1 AND previous_reference_sha256 = $2"
|
||||
|
||||
type previousEventStatements struct {
|
||||
|
|
|
|||
|
|
@ -20,27 +20,27 @@ import (
|
|||
|
||||
const roomAliasesSchema = `
|
||||
-- Stores room aliases and room IDs they refer to
|
||||
CREATE TABLE IF NOT EXISTS room_aliases (
|
||||
CREATE TABLE IF NOT EXISTS roomserver_room_aliases (
|
||||
-- Alias of the room
|
||||
alias TEXT NOT NULL PRIMARY KEY,
|
||||
-- Room ID the alias refers to
|
||||
room_id TEXT NOT NULL
|
||||
);
|
||||
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS room_id_idx ON room_aliases(room_id);
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS roomserver_room_id_idx ON roomserver_room_aliases(room_id);
|
||||
`
|
||||
|
||||
const insertRoomAliasSQL = "" +
|
||||
"INSERT INTO room_aliases (alias, room_id) VALUES ($1, $2)"
|
||||
"INSERT INTO roomserver_room_aliases (alias, room_id) VALUES ($1, $2)"
|
||||
|
||||
const selectRoomIDFromAliasSQL = "" +
|
||||
"SELECT room_id FROM room_aliases WHERE alias = $1"
|
||||
"SELECT room_id FROM roomserver_room_aliases WHERE alias = $1"
|
||||
|
||||
const selectAliasesFromRoomIDSQL = "" +
|
||||
"SELECT alias FROM room_aliases WHERE room_id = $1"
|
||||
"SELECT alias FROM roomserver_room_aliases WHERE room_id = $1"
|
||||
|
||||
const deleteRoomAliasSQL = "" +
|
||||
"DELETE FROM room_aliases WHERE alias = $1"
|
||||
"DELETE FROM roomserver_room_aliases WHERE alias = $1"
|
||||
|
||||
type roomAliasesStatements struct {
|
||||
insertRoomAliasStmt *sql.Stmt
|
||||
|
|
|
|||
|
|
@ -16,17 +16,18 @@ package storage
|
|||
|
||||
import (
|
||||
"database/sql"
|
||||
|
||||
"github.com/lib/pq"
|
||||
"github.com/matrix-org/dendrite/roomserver/types"
|
||||
)
|
||||
|
||||
const roomsSchema = `
|
||||
CREATE SEQUENCE IF NOT EXISTS room_nid_seq;
|
||||
CREATE TABLE IF NOT EXISTS rooms (
|
||||
CREATE SEQUENCE IF NOT EXISTS roomserver_room_nid_seq;
|
||||
CREATE TABLE IF NOT EXISTS roomserver_rooms (
|
||||
-- Local numeric ID for the room.
|
||||
room_nid BIGINT PRIMARY KEY DEFAULT nextval('room_nid_seq'),
|
||||
room_nid BIGINT PRIMARY KEY DEFAULT nextval('roomserver_room_nid_seq'),
|
||||
-- Textual ID for the room.
|
||||
room_id TEXT NOT NULL CONSTRAINT room_id_unique UNIQUE,
|
||||
room_id TEXT NOT NULL CONSTRAINT roomserver_room_id_unique UNIQUE,
|
||||
-- The most recent events in the room that aren't referenced by another event.
|
||||
-- This list may empty if the server hasn't joined the room yet.
|
||||
-- (The server will be in that state while it stores the events for the initial state of the room)
|
||||
|
|
@ -41,21 +42,21 @@ CREATE TABLE IF NOT EXISTS rooms (
|
|||
|
||||
// Same as insertEventTypeNIDSQL
|
||||
const insertRoomNIDSQL = "" +
|
||||
"INSERT INTO rooms (room_id) VALUES ($1)" +
|
||||
" ON CONFLICT ON CONSTRAINT room_id_unique" +
|
||||
"INSERT INTO roomserver_rooms (room_id) VALUES ($1)" +
|
||||
" ON CONFLICT ON CONSTRAINT roomserver_room_id_unique" +
|
||||
" DO NOTHING RETURNING (room_nid)"
|
||||
|
||||
const selectRoomNIDSQL = "" +
|
||||
"SELECT room_nid FROM rooms WHERE room_id = $1"
|
||||
"SELECT room_nid FROM roomserver_rooms WHERE room_id = $1"
|
||||
|
||||
const selectLatestEventNIDsSQL = "" +
|
||||
"SELECT latest_event_nids, state_snapshot_nid FROM rooms WHERE room_nid = $1"
|
||||
"SELECT latest_event_nids, state_snapshot_nid FROM roomserver_rooms WHERE room_nid = $1"
|
||||
|
||||
const selectLatestEventNIDsForUpdateSQL = "" +
|
||||
"SELECT latest_event_nids, last_event_sent_nid, state_snapshot_nid FROM rooms WHERE room_nid = $1 FOR UPDATE"
|
||||
"SELECT latest_event_nids, last_event_sent_nid, state_snapshot_nid FROM roomserver_rooms WHERE room_nid = $1 FOR UPDATE"
|
||||
|
||||
const updateLatestEventNIDsSQL = "" +
|
||||
"UPDATE rooms SET latest_event_nids = $2, last_event_sent_nid = $3, state_snapshot_nid = $4 WHERE room_nid = $1"
|
||||
"UPDATE roomserver_rooms SET latest_event_nids = $2, last_event_sent_nid = $3, state_snapshot_nid = $4 WHERE room_nid = $1"
|
||||
|
||||
type roomStatements struct {
|
||||
insertRoomNIDStmt *sql.Stmt
|
||||
|
|
|
|||
|
|
@ -17,10 +17,11 @@ package storage
|
|||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"sort"
|
||||
|
||||
"github.com/lib/pq"
|
||||
"github.com/matrix-org/dendrite/roomserver/types"
|
||||
"github.com/matrix-org/util"
|
||||
"sort"
|
||||
)
|
||||
|
||||
const stateDataSchema = `
|
||||
|
|
@ -33,8 +34,8 @@ const stateDataSchema = `
|
|||
-- lookup a specific (type, state_key) pair for an event. It also makes it easy
|
||||
-- to read the state for a given state_block_nid ordered by (type, state_key)
|
||||
-- which in turn makes it easier to merge state data blocks.
|
||||
CREATE SEQUENCE IF NOT EXISTS state_block_nid_seq;
|
||||
CREATE TABLE IF NOT EXISTS state_block (
|
||||
CREATE SEQUENCE IF NOT EXISTS roomserver_state_block_nid_seq;
|
||||
CREATE TABLE IF NOT EXISTS roomserver_state_block (
|
||||
-- Local numeric ID for this state data.
|
||||
state_block_nid bigint NOT NULL,
|
||||
event_type_nid bigint NOT NULL,
|
||||
|
|
@ -45,11 +46,11 @@ CREATE TABLE IF NOT EXISTS state_block (
|
|||
`
|
||||
|
||||
const insertStateDataSQL = "" +
|
||||
"INSERT INTO state_block (state_block_nid, event_type_nid, event_state_key_nid, event_nid)" +
|
||||
"INSERT INTO roomserver_state_block (state_block_nid, event_type_nid, event_state_key_nid, event_nid)" +
|
||||
" VALUES ($1, $2, $3, $4)"
|
||||
|
||||
const selectNextStateBlockNIDSQL = "" +
|
||||
"SELECT nextval('state_block_nid_seq')"
|
||||
"SELECT nextval('roomserver_state_block_nid_seq')"
|
||||
|
||||
// Bulk state lookup by numeric state block ID.
|
||||
// Sort by the state_block_nid, event_type_nid, event_state_key_nid
|
||||
|
|
@ -59,7 +60,7 @@ const selectNextStateBlockNIDSQL = "" +
|
|||
// state data blocks together.
|
||||
const bulkSelectStateBlockEntriesSQL = "" +
|
||||
"SELECT state_block_nid, event_type_nid, event_state_key_nid, event_nid" +
|
||||
" FROM state_block WHERE state_block_nid = ANY($1)" +
|
||||
" FROM roomserver_state_block WHERE state_block_nid = ANY($1)" +
|
||||
" ORDER BY state_block_nid, event_type_nid, event_state_key_nid"
|
||||
|
||||
// Bulk state lookup by numeric state block ID.
|
||||
|
|
@ -71,7 +72,7 @@ const bulkSelectStateBlockEntriesSQL = "" +
|
|||
// actually wanted.
|
||||
const bulkSelectFilteredStateBlockEntriesSQL = "" +
|
||||
"SELECT state_block_nid, event_type_nid, event_state_key_nid, event_nid" +
|
||||
" FROM state_block WHERE state_block_nid = ANY($1)" +
|
||||
" FROM roomserver_state_block WHERE state_block_nid = ANY($1)" +
|
||||
" AND event_type_nid = ANY($2) AND event_state_key_nid = ANY($3)" +
|
||||
" ORDER BY state_block_nid, event_type_nid, event_state_key_nid"
|
||||
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ package storage
|
|||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
|
||||
"github.com/lib/pq"
|
||||
"github.com/matrix-org/dendrite/roomserver/types"
|
||||
)
|
||||
|
|
@ -32,10 +33,10 @@ const stateSnapshotSchema = `
|
|||
-- because room state tends to accumulate small changes over time. Although if
|
||||
-- the list of deltas becomes too long it becomes more efficient to encode
|
||||
-- the full state under single state_block_nid.
|
||||
CREATE SEQUENCE IF NOT EXISTS state_snapshot_nid_seq;
|
||||
CREATE TABLE IF NOT EXISTS state_snapshots (
|
||||
CREATE SEQUENCE IF NOT EXISTS roomserver_state_snapshot_nid_seq;
|
||||
CREATE TABLE IF NOT EXISTS roomserver_state_snapshots (
|
||||
-- Local numeric ID for the state.
|
||||
state_snapshot_nid bigint PRIMARY KEY DEFAULT nextval('state_snapshot_nid_seq'),
|
||||
state_snapshot_nid bigint PRIMARY KEY DEFAULT nextval('roomserver_state_snapshot_nid_seq'),
|
||||
-- Local numeric ID of the room this state is for.
|
||||
-- Unused in normal operation, but useful for background work or ad-hoc debugging.
|
||||
room_nid bigint NOT NULL,
|
||||
|
|
@ -45,7 +46,7 @@ CREATE TABLE IF NOT EXISTS state_snapshots (
|
|||
`
|
||||
|
||||
const insertStateSQL = "" +
|
||||
"INSERT INTO state_snapshots (room_nid, state_block_nids)" +
|
||||
"INSERT INTO roomserver_state_snapshots (room_nid, state_block_nids)" +
|
||||
" VALUES ($1, $2)" +
|
||||
" RETURNING state_snapshot_nid"
|
||||
|
||||
|
|
@ -53,7 +54,7 @@ const insertStateSQL = "" +
|
|||
// Sorting by state_snapshot_nid means we can use binary search over the result
|
||||
// to lookup the state data NIDs for a state snapshot NID.
|
||||
const bulkSelectStateBlockNIDsSQL = "" +
|
||||
"SELECT state_snapshot_nid, state_block_nids FROM state_snapshots" +
|
||||
"SELECT state_snapshot_nid, state_block_nids FROM roomserver_state_snapshots" +
|
||||
" WHERE state_snapshot_nid = ANY($1) ORDER BY state_snapshot_nid ASC"
|
||||
|
||||
type stateSnapshotStatements struct {
|
||||
|
|
|
|||
|
|
@ -23,18 +23,14 @@ import (
|
|||
"github.com/matrix-org/dendrite/common"
|
||||
"github.com/matrix-org/dendrite/syncapi/sync"
|
||||
"github.com/matrix-org/util"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
const pathPrefixR0 = "/_matrix/client/r0"
|
||||
|
||||
// SetupSyncServerListeners configures the given mux with sync-server listeners
|
||||
func SetupSyncServerListeners(servMux *http.ServeMux, httpClient *http.Client, srp *sync.RequestPool, deviceDB *devices.Database) {
|
||||
apiMux := mux.NewRouter()
|
||||
// Setup configures the given mux with sync-server listeners
|
||||
func Setup(apiMux *mux.Router, srp *sync.RequestPool, deviceDB *devices.Database) {
|
||||
r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter()
|
||||
r0mux.Handle("/sync", common.MakeAuthAPI("sync", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
||||
return srp.OnIncomingSyncRequest(req, device)
|
||||
}))
|
||||
servMux.Handle("/metrics", prometheus.Handler())
|
||||
servMux.Handle("/api/", http.StripPrefix("/api", apiMux))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@ import (
|
|||
|
||||
const accountDataSchema = `
|
||||
-- Stores the users account data
|
||||
CREATE TABLE IF NOT EXISTS account_data_type (
|
||||
CREATE TABLE IF NOT EXISTS syncapi_account_data_type (
|
||||
-- The highest numeric ID from the output_room_events at the time of saving the data
|
||||
id BIGINT,
|
||||
-- ID of the user the data belongs to
|
||||
|
|
@ -35,19 +35,19 @@ CREATE TABLE IF NOT EXISTS account_data_type (
|
|||
PRIMARY KEY(user_id, room_id, type),
|
||||
|
||||
-- We don't want two entries of the same type for the same user
|
||||
CONSTRAINT account_data_unique UNIQUE (user_id, room_id, type)
|
||||
CONSTRAINT syncapi_account_data_unique UNIQUE (user_id, room_id, type)
|
||||
);
|
||||
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS account_data_id_idx ON account_data_type(id);
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS syncapi_account_data_id_idx ON syncapi_account_data_type(id);
|
||||
`
|
||||
|
||||
const insertAccountDataSQL = "" +
|
||||
"INSERT INTO account_data_type (id, user_id, room_id, type) VALUES ($1, $2, $3, $4)" +
|
||||
" ON CONFLICT ON CONSTRAINT account_data_unique" +
|
||||
"INSERT INTO syncapi_account_data_type (id, user_id, room_id, type) VALUES ($1, $2, $3, $4)" +
|
||||
" ON CONFLICT ON CONSTRAINT syncapi_account_data_unique" +
|
||||
" DO UPDATE SET id = EXCLUDED.id"
|
||||
|
||||
const selectAccountDataInRangeSQL = "" +
|
||||
"SELECT room_id, type FROM account_data_type" +
|
||||
"SELECT room_id, type FROM syncapi_account_data_type" +
|
||||
" WHERE user_id = $1 AND id > $2 AND id <= $3" +
|
||||
" ORDER BY id ASC"
|
||||
|
||||
|
|
|
|||
|
|
@ -16,13 +16,14 @@ package storage
|
|||
|
||||
import (
|
||||
"database/sql"
|
||||
|
||||
"github.com/lib/pq"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
const currentRoomStateSchema = `
|
||||
-- Stores the current room state for every room.
|
||||
CREATE TABLE IF NOT EXISTS current_room_state (
|
||||
CREATE TABLE IF NOT EXISTS syncapi_current_room_state (
|
||||
-- The 'room_id' key for the state event.
|
||||
room_id TEXT NOT NULL,
|
||||
-- The state event ID
|
||||
|
|
@ -40,37 +41,37 @@ CREATE TABLE IF NOT EXISTS current_room_state (
|
|||
-- part of the current state of the room.
|
||||
added_at BIGINT,
|
||||
-- Clobber based on 3-uple of room_id, type and state_key
|
||||
CONSTRAINT room_state_unique UNIQUE (room_id, type, state_key)
|
||||
CONSTRAINT syncapi_room_state_unique UNIQUE (room_id, type, state_key)
|
||||
);
|
||||
-- for event deletion
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS event_id_idx ON current_room_state(event_id);
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_current_room_state(event_id);
|
||||
-- for querying membership states of users
|
||||
CREATE INDEX IF NOT EXISTS membership_idx ON current_room_state(type, state_key, membership) WHERE membership IS NOT NULL AND membership != 'leave';
|
||||
CREATE INDEX IF NOT EXISTS syncapi_membership_idx ON syncapi_current_room_state(type, state_key, membership) WHERE membership IS NOT NULL AND membership != 'leave';
|
||||
`
|
||||
|
||||
const upsertRoomStateSQL = "" +
|
||||
"INSERT INTO current_room_state (room_id, event_id, type, state_key, event_json, membership, added_at)" +
|
||||
"INSERT INTO syncapi_current_room_state (room_id, event_id, type, state_key, event_json, membership, added_at)" +
|
||||
" VALUES ($1, $2, $3, $4, $5, $6, $7)" +
|
||||
" ON CONFLICT ON CONSTRAINT room_state_unique" +
|
||||
" ON CONFLICT ON CONSTRAINT syncapi_room_state_unique" +
|
||||
" DO UPDATE SET event_id = $2, event_json = $5, membership = $6, added_at = $7"
|
||||
|
||||
const deleteRoomStateByEventIDSQL = "" +
|
||||
"DELETE FROM current_room_state WHERE event_id = $1"
|
||||
"DELETE FROM syncapi_current_room_state WHERE event_id = $1"
|
||||
|
||||
const selectRoomIDsWithMembershipSQL = "" +
|
||||
"SELECT room_id FROM current_room_state WHERE type = 'm.room.member' AND state_key = $1 AND membership = $2"
|
||||
"SELECT room_id FROM syncapi_current_room_state WHERE type = 'm.room.member' AND state_key = $1 AND membership = $2"
|
||||
|
||||
const selectCurrentStateSQL = "" +
|
||||
"SELECT event_json FROM current_room_state WHERE room_id = $1"
|
||||
"SELECT event_json FROM syncapi_current_room_state WHERE room_id = $1"
|
||||
|
||||
const selectJoinedUsersSQL = "" +
|
||||
"SELECT room_id, state_key FROM current_room_state WHERE type = 'm.room.member' AND membership = 'join'"
|
||||
"SELECT room_id, state_key FROM syncapi_current_room_state WHERE type = 'm.room.member' AND membership = 'join'"
|
||||
|
||||
const selectStateEventSQL = "" +
|
||||
"SELECT event_json FROM current_room_state WHERE type = $1 AND room_id = $2 AND state_key = $3"
|
||||
"SELECT event_json FROM syncapi_current_room_state WHERE type = $1 AND room_id = $2 AND state_key = $3"
|
||||
|
||||
const selectEventsWithEventIDsSQL = "" +
|
||||
"SELECT added_at, event_json FROM current_room_state WHERE event_id = ANY($1)"
|
||||
"SELECT added_at, event_json FROM syncapi_current_room_state WHERE event_id = ANY($1)"
|
||||
|
||||
type currentRoomStateStatements struct {
|
||||
upsertRoomStateStmt *sql.Stmt
|
||||
|
|
|
|||
|
|
@ -25,7 +25,7 @@ import (
|
|||
|
||||
const outputRoomEventsSchema = `
|
||||
-- Stores output room events received from the roomserver.
|
||||
CREATE TABLE IF NOT EXISTS output_room_events (
|
||||
CREATE TABLE IF NOT EXISTS syncapi_output_room_events (
|
||||
-- An incrementing ID which denotes the position in the log that this event resides at.
|
||||
-- NB: 'serial' makes no guarantees to increment by 1 every time, only that it increments.
|
||||
-- This isn't a problem for us since we just want to order by this field.
|
||||
|
|
@ -42,24 +42,29 @@ CREATE TABLE IF NOT EXISTS output_room_events (
|
|||
remove_state_ids TEXT[]
|
||||
);
|
||||
-- for event selection
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS event_id_idx ON output_room_events(event_id);
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_output_room_events(event_id);
|
||||
`
|
||||
|
||||
const insertEventSQL = "" +
|
||||
"INSERT INTO output_room_events (room_id, event_id, event_json, add_state_ids, remove_state_ids) VALUES ($1, $2, $3, $4, $5) RETURNING id"
|
||||
"INSERT INTO syncapi_output_room_events (" +
|
||||
" room_id, event_id, event_json, add_state_ids, remove_state_ids" +
|
||||
") VALUES ($1, $2, $3, $4, $5) RETURNING id"
|
||||
|
||||
const selectEventsSQL = "" +
|
||||
"SELECT id, event_json FROM output_room_events WHERE event_id = ANY($1)"
|
||||
"SELECT id, event_json FROM syncapi_output_room_events WHERE event_id = ANY($1)"
|
||||
|
||||
const selectRecentEventsSQL = "" +
|
||||
"SELECT id, event_json FROM output_room_events WHERE room_id = $1 AND id > $2 AND id <= $3 ORDER BY id DESC LIMIT $4"
|
||||
"SELECT id, event_json FROM syncapi_output_room_events" +
|
||||
" WHERE room_id = $1 AND id > $2 AND id <= $3" +
|
||||
" ORDER BY id DESC LIMIT $4"
|
||||
|
||||
const selectMaxIDSQL = "" +
|
||||
"SELECT MAX(id) FROM output_room_events"
|
||||
"SELECT MAX(id) FROM syncapi_output_room_events"
|
||||
|
||||
// In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id).
|
||||
const selectStateInRangeSQL = "" +
|
||||
"SELECT id, event_json, add_state_ids, remove_state_ids FROM output_room_events" +
|
||||
"SELECT id, event_json, add_state_ids, remove_state_ids" +
|
||||
" FROM syncapi_output_room_events" +
|
||||
" WHERE (id > $1 AND id <= $2) AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" +
|
||||
" ORDER BY id ASC"
|
||||
|
||||
|
|
|
|||
|
|
@ -56,7 +56,7 @@ func NewSyncServerDatabase(dataSourceName string) (*SyncServerDatabase, error) {
|
|||
return nil, err
|
||||
}
|
||||
partitions := common.PartitionOffsetStatements{}
|
||||
if err = partitions.Prepare(db); err != nil {
|
||||
if err = partitions.Prepare(db, "syncapi"); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
accountData := accountDataStatements{}
|
||||
|
|
|
|||
Loading…
Reference in a new issue