Make postgres work

This commit is contained in:
Kegan Dougal 2020-07-02 10:01:11 +01:00
parent 82f8afa8c9
commit 98e3e1e0aa
8 changed files with 260 additions and 10 deletions

View file

@ -33,7 +33,7 @@ func main() {
federationapi.AddPublicRoutes( federationapi.AddPublicRoutes(
base.PublicAPIMux, base.Cfg, userAPI, federation, keyRing, base.PublicAPIMux, base.Cfg, userAPI, federation, keyRing,
rsAPI, fsAPI, base.EDUServerClient(), rsAPI, fsAPI, base.EDUServerClient(), base.CurrentStateAPIClient(),
) )
base.SetupAndServeHTTP(string(base.Cfg.Bind.FederationAPI), string(base.Cfg.Listen.FederationAPI)) base.SetupAndServeHTTP(string(base.Cfg.Bind.FederationAPI), string(base.Cfg.Listen.FederationAPI))

View file

@ -173,6 +173,7 @@ func main() {
cfg.Database.RoomServer = "file:/idb/dendritejs_roomserver.db" cfg.Database.RoomServer = "file:/idb/dendritejs_roomserver.db"
cfg.Database.ServerKey = "file:/idb/dendritejs_serverkey.db" cfg.Database.ServerKey = "file:/idb/dendritejs_serverkey.db"
cfg.Database.SyncAPI = "file:/idb/dendritejs_syncapi.db" cfg.Database.SyncAPI = "file:/idb/dendritejs_syncapi.db"
cfg.Database.CurrentState = "file:/idb/dendritejs_currentstate.db"
cfg.Kafka.Topics.OutputTypingEvent = "output_typing_event" cfg.Kafka.Topics.OutputTypingEvent = "output_typing_event"
cfg.Kafka.Topics.OutputSendToDeviceEvent = "output_send_to_device_event" cfg.Kafka.Topics.OutputSendToDeviceEvent = "output_send_to_device_event"
cfg.Kafka.Topics.OutputClientData = "output_client_data" cfg.Kafka.Topics.OutputClientData = "output_client_data"

View file

@ -71,12 +71,20 @@ const selectStateEventSQL = "" +
const selectEventsWithEventIDsSQL = "" + const selectEventsWithEventIDsSQL = "" +
"SELECT headered_event_json FROM currentstate_current_room_state WHERE event_id = ANY($1)" "SELECT headered_event_json FROM currentstate_current_room_state WHERE event_id = ANY($1)"
const selectBulkStateContentSQL = "" +
"SELECT room_id, type, state_key, content_value FROM currentstate_current_room_state WHERE room_id = ANY($1) AND type = ANY($2) AND state_key = ANY($3)"
const selectBulkStateContentWildSQL = "" +
"SELECT room_id, type, state_key, content_value FROM currentstate_current_room_state WHERE room_id = ANY($1) AND type = ANY($2)"
type currentRoomStateStatements struct { type currentRoomStateStatements struct {
upsertRoomStateStmt *sql.Stmt upsertRoomStateStmt *sql.Stmt
deleteRoomStateByEventIDStmt *sql.Stmt deleteRoomStateByEventIDStmt *sql.Stmt
selectRoomIDsWithMembershipStmt *sql.Stmt selectRoomIDsWithMembershipStmt *sql.Stmt
selectEventsWithEventIDsStmt *sql.Stmt selectEventsWithEventIDsStmt *sql.Stmt
selectStateEventStmt *sql.Stmt selectStateEventStmt *sql.Stmt
selectBulkStateContentStmt *sql.Stmt
selectBulkStateContentWildStmt *sql.Stmt
} }
func NewPostgresCurrentRoomStateTable(db *sql.DB) (tables.CurrentRoomState, error) { func NewPostgresCurrentRoomStateTable(db *sql.DB) (tables.CurrentRoomState, error) {
@ -100,6 +108,12 @@ func NewPostgresCurrentRoomStateTable(db *sql.DB) (tables.CurrentRoomState, erro
if s.selectStateEventStmt, err = db.Prepare(selectStateEventSQL); err != nil { if s.selectStateEventStmt, err = db.Prepare(selectStateEventSQL); err != nil {
return nil, err return nil, err
} }
if s.selectBulkStateContentStmt, err = db.Prepare(selectBulkStateContentSQL); err != nil {
return nil, err
}
if s.selectBulkStateContentWildStmt, err = db.Prepare(selectBulkStateContentWildSQL); err != nil {
return nil, err
}
return s, nil return s, nil
} }
@ -207,5 +221,52 @@ func (s *currentRoomStateStatements) SelectStateEvent(
func (s *currentRoomStateStatements) SelectBulkStateContent( func (s *currentRoomStateStatements) SelectBulkStateContent(
ctx context.Context, roomIDs []string, tuples []gomatrixserverlib.StateKeyTuple, allowWildcards bool, ctx context.Context, roomIDs []string, tuples []gomatrixserverlib.StateKeyTuple, allowWildcards bool,
) ([]tables.StrippedEvent, error) { ) ([]tables.StrippedEvent, error) {
return nil, nil hasWildcards := false
eventTypeSet := make(map[string]bool)
stateKeySet := make(map[string]bool)
var eventTypes []string
var stateKeys []string
for _, tuple := range tuples {
if !eventTypeSet[tuple.EventType] {
eventTypeSet[tuple.EventType] = true
eventTypes = append(eventTypes, tuple.EventType)
}
if !stateKeySet[tuple.StateKey] {
stateKeySet[tuple.StateKey] = true
stateKeys = append(stateKeys, tuple.StateKey)
}
if tuple.StateKey == "*" {
hasWildcards = true
}
}
var rows *sql.Rows
var err error
if hasWildcards && allowWildcards {
rows, err = s.selectBulkStateContentWildStmt.QueryContext(ctx, pq.StringArray(roomIDs), pq.StringArray(eventTypes))
} else {
rows, err = s.selectBulkStateContentStmt.QueryContext(
ctx, pq.StringArray(roomIDs), pq.StringArray(eventTypes), pq.StringArray(stateKeys),
)
}
if err != nil {
return nil, err
}
strippedEvents := []tables.StrippedEvent{}
defer internal.CloseAndLogIfError(ctx, rows, "SelectBulkStateContent: rows.close() failed")
for rows.Next() {
var roomID string
var eventType string
var stateKey string
var contentVal string
if err = rows.Scan(&roomID, &eventType, &stateKey, &contentVal); err != nil {
return nil, err
}
strippedEvents = append(strippedEvents, tables.StrippedEvent{
RoomID: roomID,
ContentValue: contentVal,
EventType: eventType,
StateKey: stateKey,
})
}
return strippedEvents, rows.Err()
} }

View file

@ -16,6 +16,7 @@ package federationapi
import ( import (
"github.com/gorilla/mux" "github.com/gorilla/mux"
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
eduserverAPI "github.com/matrix-org/dendrite/eduserver/api" eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/config"
@ -36,11 +37,12 @@ func AddPublicRoutes(
rsAPI roomserverAPI.RoomserverInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI,
federationSenderAPI federationSenderAPI.FederationSenderInternalAPI, federationSenderAPI federationSenderAPI.FederationSenderInternalAPI,
eduAPI eduserverAPI.EDUServerInputAPI, eduAPI eduserverAPI.EDUServerInputAPI,
stateAPI currentstateAPI.CurrentStateInternalAPI,
) { ) {
routing.Setup( routing.Setup(
router, cfg, rsAPI, router, cfg, rsAPI,
eduAPI, federationSenderAPI, keyRing, eduAPI, federationSenderAPI, keyRing,
federation, userAPI, federation, userAPI, stateAPI,
) )
} }

View file

@ -31,7 +31,7 @@ func TestRoomsV3URLEscapeDoNot404(t *testing.T) {
fsAPI := base.FederationSenderHTTPClient() fsAPI := base.FederationSenderHTTPClient()
// TODO: This is pretty fragile, as if anything calls anything on these nils this test will break. // TODO: This is pretty fragile, as if anything calls anything on these nils this test will break.
// Unfortunately, it makes little sense to instantiate these dependencies when we just want to test routing. // Unfortunately, it makes little sense to instantiate these dependencies when we just want to test routing.
federationapi.AddPublicRoutes(base.PublicAPIMux, cfg, nil, nil, keyRing, nil, fsAPI, nil) federationapi.AddPublicRoutes(base.PublicAPIMux, cfg, nil, nil, keyRing, nil, fsAPI, nil, nil)
httputil.SetupHTTPAPI( httputil.SetupHTTPAPI(
base.BaseMux, base.BaseMux,
base.PublicAPIMux, base.PublicAPIMux,

View file

@ -0,0 +1,178 @@
package routing
import (
"context"
"net/http"
"strconv"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
)
type PublicRoomReq struct {
Since string `json:"since,omitempty"`
Limit int16 `json:"limit,omitempty"`
Filter filter `json:"filter,omitempty"`
}
type filter struct {
SearchTerms string `json:"generic_search_term,omitempty"`
}
// GetPostPublicRooms implements GET and POST /publicRooms
func GetPostPublicRooms(req *http.Request, rsAPI roomserverAPI.RoomserverInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI) util.JSONResponse {
var request PublicRoomReq
if fillErr := fillPublicRoomsReq(req, &request); fillErr != nil {
return *fillErr
}
if request.Limit == 0 {
request.Limit = 50
}
response, err := publicRooms(req.Context(), request, rsAPI, stateAPI)
if err != nil {
return jsonerror.InternalServerError()
}
return util.JSONResponse{
Code: http.StatusOK,
JSON: response,
}
}
func publicRooms(ctx context.Context, request PublicRoomReq, rsAPI roomserverAPI.RoomserverInternalAPI,
stateAPI currentstateAPI.CurrentStateInternalAPI) (*gomatrixserverlib.RespPublicRooms, error) {
var response gomatrixserverlib.RespPublicRooms
var limit int16
var offset int64
limit = request.Limit
offset, err := strconv.ParseInt(request.Since, 10, 64)
// ParseInt returns 0 and an error when trying to parse an empty string
// In that case, we want to assign 0 so we ignore the error
if err != nil && len(request.Since) > 0 {
util.GetLogger(ctx).WithError(err).Error("strconv.ParseInt failed")
return nil, err
}
var queryRes roomserverAPI.QueryPublishedRoomsResponse
err = rsAPI.QueryPublishedRooms(ctx, &roomserverAPI.QueryPublishedRoomsRequest{}, &queryRes)
if err != nil {
util.GetLogger(ctx).WithError(err).Error("QueryPublishedRooms failed")
return nil, err
}
response.TotalRoomCountEstimate = len(queryRes.RoomIDs)
if offset > 0 {
response.PrevBatch = strconv.Itoa(int(offset) - 1)
}
nextIndex := int(offset) + int(limit)
if response.TotalRoomCountEstimate > nextIndex {
response.NextBatch = strconv.Itoa(nextIndex)
}
if offset < 0 {
offset = 0
}
if nextIndex > len(queryRes.RoomIDs) {
nextIndex = len(queryRes.RoomIDs)
}
roomIDs := queryRes.RoomIDs[offset:nextIndex]
response.Chunk, err = fillInRooms(ctx, roomIDs, stateAPI)
return &response, err
}
// fillPublicRoomsReq fills the Limit, Since and Filter attributes of a GET or POST request
// on /publicRooms by parsing the incoming HTTP request
// Filter is only filled for POST requests
func fillPublicRoomsReq(httpReq *http.Request, request *PublicRoomReq) *util.JSONResponse {
if httpReq.Method == http.MethodGet {
limit, err := strconv.Atoi(httpReq.FormValue("limit"))
// Atoi returns 0 and an error when trying to parse an empty string
// In that case, we want to assign 0 so we ignore the error
if err != nil && len(httpReq.FormValue("limit")) > 0 {
util.GetLogger(httpReq.Context()).WithError(err).Error("strconv.Atoi failed")
reqErr := jsonerror.InternalServerError()
return &reqErr
}
request.Limit = int16(limit)
request.Since = httpReq.FormValue("since")
return nil
} else if httpReq.Method == http.MethodPost {
return httputil.UnmarshalJSONRequest(httpReq, request)
}
return &util.JSONResponse{
Code: http.StatusMethodNotAllowed,
JSON: jsonerror.NotFound("Bad method"),
}
}
// due to lots of switches
// nolint:gocyclo
func fillInRooms(ctx context.Context, roomIDs []string, stateAPI currentstateAPI.CurrentStateInternalAPI) ([]gomatrixserverlib.PublicRoom, error) {
avatarTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.avatar", StateKey: ""}
nameTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.name", StateKey: ""}
canonicalTuple := gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomCanonicalAlias, StateKey: ""}
topicTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.topic", StateKey: ""}
guestTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.guest_access", StateKey: ""}
visibilityTuple := gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomHistoryVisibility, StateKey: ""}
joinRuleTuple := gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomJoinRules, StateKey: ""}
var stateRes currentstateAPI.QueryBulkStateContentResponse
err := stateAPI.QueryBulkStateContent(ctx, &currentstateAPI.QueryBulkStateContentRequest{
RoomIDs: roomIDs,
AllowWildcards: true,
StateTuples: []gomatrixserverlib.StateKeyTuple{
nameTuple, canonicalTuple, topicTuple, guestTuple, visibilityTuple, joinRuleTuple, avatarTuple,
{EventType: gomatrixserverlib.MRoomMember, StateKey: "*"},
},
}, &stateRes)
if err != nil {
util.GetLogger(ctx).WithError(err).Error("QueryBulkStateContent failed")
return nil, err
}
util.GetLogger(ctx).Infof("room IDs: %+v", roomIDs)
util.GetLogger(ctx).Infof("State res: %+v", stateRes.Rooms)
chunk := make([]gomatrixserverlib.PublicRoom, len(roomIDs))
i := 0
for roomID, data := range stateRes.Rooms {
pub := gomatrixserverlib.PublicRoom{
RoomID: roomID,
}
joinCount := 0
var joinRule, guestAccess string
for tuple, contentVal := range data {
if tuple.EventType == gomatrixserverlib.MRoomMember && contentVal == "join" {
joinCount++
continue
}
switch tuple {
case avatarTuple:
pub.AvatarURL = contentVal
case nameTuple:
pub.Name = contentVal
case topicTuple:
pub.Topic = contentVal
case canonicalTuple:
pub.CanonicalAlias = contentVal
case visibilityTuple:
pub.WorldReadable = contentVal == "world_readable"
// need both of these to determine whether guests can join
case joinRuleTuple:
joinRule = contentVal
case guestTuple:
guestAccess = contentVal
}
}
if joinRule == gomatrixserverlib.Public && guestAccess == "can_join" {
pub.GuestCanJoin = true
}
pub.JoinedMembersCount = joinCount
chunk[i] = pub
i++
}
return chunk, nil
}

View file

@ -19,6 +19,7 @@ import (
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
eduserverAPI "github.com/matrix-org/dendrite/eduserver/api" eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/config"
@ -52,6 +53,7 @@ func Setup(
keys gomatrixserverlib.JSONVerifier, keys gomatrixserverlib.JSONVerifier,
federation *gomatrixserverlib.FederationClient, federation *gomatrixserverlib.FederationClient,
userAPI userapi.UserInternalAPI, userAPI userapi.UserInternalAPI,
stateAPI currentstateAPI.CurrentStateInternalAPI,
) { ) {
v2keysmux := publicAPIMux.PathPrefix(pathPrefixV2Keys).Subrouter() v2keysmux := publicAPIMux.PathPrefix(pathPrefixV2Keys).Subrouter()
v1fedmux := publicAPIMux.PathPrefix(pathPrefixV1Federation).Subrouter() v1fedmux := publicAPIMux.PathPrefix(pathPrefixV1Federation).Subrouter()
@ -291,4 +293,10 @@ func Setup(
return Backfill(httpReq, request, rsAPI, vars["roomID"], cfg) return Backfill(httpReq, request, rsAPI, vars["roomID"], cfg)
}, },
)).Methods(http.MethodGet) )).Methods(http.MethodGet)
v1fedmux.Handle("/publicRooms",
httputil.MakeExternalAPI("federation_public_rooms", func(req *http.Request) util.JSONResponse {
return GetPostPublicRooms(req, rsAPI, stateAPI)
}),
).Methods(http.MethodGet)
} }

View file

@ -27,7 +27,6 @@ import (
"github.com/matrix-org/dendrite/internal/transactions" "github.com/matrix-org/dendrite/internal/transactions"
"github.com/matrix-org/dendrite/keyserver" "github.com/matrix-org/dendrite/keyserver"
"github.com/matrix-org/dendrite/mediaapi" "github.com/matrix-org/dendrite/mediaapi"
"github.com/matrix-org/dendrite/publicroomsapi"
"github.com/matrix-org/dendrite/publicroomsapi/storage" "github.com/matrix-org/dendrite/publicroomsapi/storage"
"github.com/matrix-org/dendrite/publicroomsapi/types" "github.com/matrix-org/dendrite/publicroomsapi/types"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
@ -81,13 +80,14 @@ func (m *Monolith) AddAllPublicRoutes(publicMux *mux.Router) {
federationapi.AddPublicRoutes( federationapi.AddPublicRoutes(
publicMux, m.Config, m.UserAPI, m.FedClient, publicMux, m.Config, m.UserAPI, m.FedClient,
m.KeyRing, m.RoomserverAPI, m.FederationSenderAPI, m.KeyRing, m.RoomserverAPI, m.FederationSenderAPI,
m.EDUInternalAPI, m.EDUInternalAPI, m.StateAPI,
) )
mediaapi.AddPublicRoutes(publicMux, m.Config, m.UserAPI, m.Client) mediaapi.AddPublicRoutes(publicMux, m.Config, m.UserAPI, m.Client)
/*
publicroomsapi.AddPublicRoutes( publicroomsapi.AddPublicRoutes(
publicMux, m.Config, m.KafkaConsumer, m.UserAPI, m.PublicRoomsDB, m.RoomserverAPI, m.FedClient, publicMux, m.Config, m.KafkaConsumer, m.UserAPI, m.PublicRoomsDB, m.RoomserverAPI, m.FedClient,
m.ExtPublicRoomsProvider, m.ExtPublicRoomsProvider,
) ) */
syncapi.AddPublicRoutes( syncapi.AddPublicRoutes(
publicMux, m.KafkaConsumer, m.UserAPI, m.RoomserverAPI, m.FedClient, m.Config, publicMux, m.KafkaConsumer, m.UserAPI, m.RoomserverAPI, m.FedClient, m.Config,
) )