Merge branch 'master' into neilalexander/httpapi

This commit is contained in:
Neil Alexander 2020-10-07 15:31:12 +01:00 committed by GitHub
commit 4454dde876
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 174 additions and 111 deletions

View file

@ -16,9 +16,11 @@ package routing
import ( import (
"net/http" "net/http"
"time"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/storage/accounts" "github.com/matrix-org/dendrite/userapi/storage/accounts"
@ -74,16 +76,32 @@ func JoinRoomByIDOrAlias(
} }
// Ask the roomserver to perform the join. // Ask the roomserver to perform the join.
rsAPI.PerformJoin(req.Context(), &joinReq, &joinRes) done := make(chan util.JSONResponse, 1)
if joinRes.Error != nil { go func() {
return joinRes.Error.JSONResponse() defer close(done)
} rsAPI.PerformJoin(req.Context(), &joinReq, &joinRes)
if joinRes.Error != nil {
done <- joinRes.Error.JSONResponse()
} else {
done <- util.JSONResponse{
Code: http.StatusOK,
// TODO: Put the response struct somewhere internal.
JSON: struct {
RoomID string `json:"room_id"`
}{joinRes.RoomID},
}
}
}()
return util.JSONResponse{ // Wait either for the join to finish, or for us to hit a reasonable
Code: http.StatusOK, // timeout, at which point we'll just return a 200 to placate clients.
// TODO: Put the response struct somewhere internal. select {
JSON: struct { case <-time.After(time.Second * 20):
RoomID string `json:"room_id"` return util.JSONResponse{
}{joinRes.RoomID}, Code: http.StatusAccepted,
JSON: jsonerror.Unknown("The room join will continue in the background."),
}
case result := <-done:
return result
} }
} }

View file

@ -20,6 +20,7 @@ var requestFrom = flag.String("from", "", "the server name that the request shou
var requestKey = flag.String("key", "matrix_key.pem", "the private key to use when signing the request") var requestKey = flag.String("key", "matrix_key.pem", "the private key to use when signing the request")
var requestPost = flag.Bool("post", false, "send a POST request instead of GET (pipe input into stdin or type followed by Ctrl-D)") var requestPost = flag.Bool("post", false, "send a POST request instead of GET (pipe input into stdin or type followed by Ctrl-D)")
// nolint:gocyclo
func main() { func main() {
flag.Parse() flag.Parse()

View file

@ -32,9 +32,6 @@ const (
// there was a new event that references an event that we don't // there was a new event that references an event that we don't
// have a copy of. // have a copy of.
KindNew = 2 KindNew = 2
// KindBackfill event extend the contiguous graph going backwards.
// They always have state.
KindBackfill = 3
) )
// DoNotSendToOtherServers tells us not to send the event to other matrix // DoNotSendToOtherServers tells us not to send the event to other matrix

View file

@ -26,28 +26,30 @@ type RoomserverInternalAPI struct {
*perform.Leaver *perform.Leaver
*perform.Publisher *perform.Publisher
*perform.Backfiller *perform.Backfiller
DB storage.Database DB storage.Database
Cfg *config.RoomServer Cfg *config.RoomServer
Producer sarama.SyncProducer Producer sarama.SyncProducer
Cache caching.RoomServerCaches Cache caching.RoomServerCaches
ServerName gomatrixserverlib.ServerName ServerName gomatrixserverlib.ServerName
KeyRing gomatrixserverlib.JSONVerifier KeyRing gomatrixserverlib.JSONVerifier
fsAPI fsAPI.FederationSenderInternalAPI fsAPI fsAPI.FederationSenderInternalAPI
OutputRoomEventTopic string // Kafka topic for new output room events OutputRoomEventTopic string // Kafka topic for new output room events
PerspectiveServerNames []gomatrixserverlib.ServerName
} }
func NewRoomserverAPI( func NewRoomserverAPI(
cfg *config.RoomServer, roomserverDB storage.Database, producer sarama.SyncProducer, cfg *config.RoomServer, roomserverDB storage.Database, producer sarama.SyncProducer,
outputRoomEventTopic string, caches caching.RoomServerCaches, outputRoomEventTopic string, caches caching.RoomServerCaches,
keyRing gomatrixserverlib.JSONVerifier, keyRing gomatrixserverlib.JSONVerifier, perspectiveServerNames []gomatrixserverlib.ServerName,
) *RoomserverInternalAPI { ) *RoomserverInternalAPI {
serverACLs := acls.NewServerACLs(roomserverDB) serverACLs := acls.NewServerACLs(roomserverDB)
a := &RoomserverInternalAPI{ a := &RoomserverInternalAPI{
DB: roomserverDB, DB: roomserverDB,
Cfg: cfg, Cfg: cfg,
Cache: caches, Cache: caches,
ServerName: cfg.Matrix.ServerName, ServerName: cfg.Matrix.ServerName,
KeyRing: keyRing, PerspectiveServerNames: perspectiveServerNames,
KeyRing: keyRing,
Queryer: &query.Queryer{ Queryer: &query.Queryer{
DB: roomserverDB, DB: roomserverDB,
Cache: caches, Cache: caches,
@ -105,6 +107,10 @@ func (r *RoomserverInternalAPI) SetFederationSenderAPI(fsAPI fsAPI.FederationSen
DB: r.DB, DB: r.DB,
FSAPI: r.fsAPI, FSAPI: r.fsAPI,
KeyRing: r.KeyRing, KeyRing: r.KeyRing,
// Perspective servers are trusted to not lie about server keys, so we will also
// prefer these servers when backfilling (assuming they are in the room) rather
// than trying random servers
PreferServers: r.PerspectiveServerNames,
} }
} }

View file

@ -2,6 +2,8 @@ package helpers
import ( import (
"context" "context"
"database/sql"
"errors"
"fmt" "fmt"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
@ -217,6 +219,9 @@ func CheckServerAllowedToSeeEvent(
roomState := state.NewStateResolution(db, info) roomState := state.NewStateResolution(db, info)
stateEntries, err := roomState.LoadStateAtEvent(ctx, eventID) stateEntries, err := roomState.LoadStateAtEvent(ctx, eventID)
if err != nil { if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return false, nil
}
return false, err return false, err
} }

View file

@ -54,7 +54,7 @@ func (r *Inputer) processRoomEvent(
} }
var softfail bool var softfail bool
if input.Kind == api.KindBackfill || input.Kind == api.KindNew { if input.Kind == api.KindNew {
// Check that the event passes authentication checks based on the // Check that the event passes authentication checks based on the
// current room state. // current room state.
softfail, err = helpers.CheckForSoftFail(ctx, r.DB, headered, input.StateEventIDs) softfail, err = helpers.CheckForSoftFail(ctx, r.DB, headered, input.StateEventIDs)

View file

@ -28,6 +28,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
"github.com/sirupsen/logrus"
) )
// updateLatestEvents updates the list of latest events for this room in the database and writes the // updateLatestEvents updates the list of latest events for this room in the database and writes the
@ -116,7 +117,6 @@ type latestEventsUpdater struct {
} }
func (u *latestEventsUpdater) doUpdateLatestEvents() error { func (u *latestEventsUpdater) doUpdateLatestEvents() error {
prevEvents := u.event.PrevEvents()
u.lastEventIDSent = u.updater.LastEventIDSent() u.lastEventIDSent = u.updater.LastEventIDSent()
u.oldStateNID = u.updater.CurrentStateSnapshotNID() u.oldStateNID = u.updater.CurrentStateSnapshotNID()
@ -140,30 +140,12 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error {
return nil return nil
} }
// Update the roomserver_previous_events table with references. This // Work out what the latest events are. This will include the new
// is effectively tracking the structure of the DAG. // event if it is not already referenced.
if err = u.updater.StorePreviousEvents(u.stateAtEvent.EventNID, prevEvents); err != nil { u.calculateLatest(
return fmt.Errorf("u.updater.StorePreviousEvents: %w", err)
}
// Get the event reference for our new event. This will be used when
// determining if the event is referenced by an existing event.
eventReference := u.event.EventReference()
// Check if our new event is already referenced by an existing event
// in the room. If it is then it isn't a latest event.
alreadyReferenced, err := u.updater.IsReferenced(eventReference)
if err != nil {
return fmt.Errorf("u.updater.IsReferenced: %w", err)
}
// Work out what the latest events are.
u.latest = calculateLatest(
oldLatest, oldLatest,
alreadyReferenced,
prevEvents,
types.StateAtEventAndReference{ types.StateAtEventAndReference{
EventReference: eventReference, EventReference: u.event.EventReference(),
StateAtEvent: u.stateAtEvent, StateAtEvent: u.stateAtEvent,
}, },
) )
@ -215,27 +197,12 @@ func (u *latestEventsUpdater) latestState() error {
var err error var err error
roomState := state.NewStateResolution(u.api.DB, *u.roomInfo) roomState := state.NewStateResolution(u.api.DB, *u.roomInfo)
// Get a list of the current room state events if available. // Get a list of the current latest events. This may or may not
var currentState []types.StateEntry // include the new event from the input path, depending on whether
if u.roomInfo.StateSnapshotNID != 0 { // it is a forward extremity or not.
currentState, _ = roomState.LoadStateAtSnapshot(u.ctx, u.roomInfo.StateSnapshotNID) latestStateAtEvents := make([]types.StateAtEvent, len(u.latest))
}
// Get a list of the current latest events. This will include both
// the current room state and the latest events after the input event.
// The idea is that we will perform state resolution on this set and
// any conflicting events will be resolved properly.
latestStateAtEvents := make([]types.StateAtEvent, len(u.latest)+len(currentState))
offset := 0
for i := range currentState {
latestStateAtEvents[i] = types.StateAtEvent{
BeforeStateSnapshotNID: u.roomInfo.StateSnapshotNID,
StateEntry: currentState[i],
}
offset++
}
for i := range u.latest { for i := range u.latest {
latestStateAtEvents[offset+i] = u.latest[i].StateAtEvent latestStateAtEvents[i] = u.latest[i].StateAtEvent
} }
// Takes the NIDs of the latest events and creates a state snapshot // Takes the NIDs of the latest events and creates a state snapshot
@ -266,6 +233,14 @@ func (u *latestEventsUpdater) latestState() error {
if err != nil { if err != nil {
return fmt.Errorf("roomState.DifferenceBetweenStateSnapshots: %w", err) return fmt.Errorf("roomState.DifferenceBetweenStateSnapshots: %w", err)
} }
if len(u.removed) > len(u.added) {
// This really shouldn't happen.
// TODO: What is ultimately the best way to handle this situation?
return fmt.Errorf(
"invalid state delta wants to remove %d state but only add %d state (between state snapshots %d and %d)",
len(u.removed), len(u.added), u.oldStateNID, u.newStateNID,
)
}
// Also work out the state before the event removes and the event // Also work out the state before the event removes and the event
// adds. // adds.
@ -279,42 +254,49 @@ func (u *latestEventsUpdater) latestState() error {
return nil return nil
} }
func calculateLatest( func (u *latestEventsUpdater) calculateLatest(
oldLatest []types.StateAtEventAndReference, oldLatest []types.StateAtEventAndReference,
alreadyReferenced bool,
prevEvents []gomatrixserverlib.EventReference,
newEvent types.StateAtEventAndReference, newEvent types.StateAtEventAndReference,
) []types.StateAtEventAndReference { ) {
var alreadyInLatest bool
var newLatest []types.StateAtEventAndReference var newLatest []types.StateAtEventAndReference
// First of all, let's see if any of the existing forward extremities
// now have entries in the previous events table. If they do then we
// will no longer include them as forward extremities.
for _, l := range oldLatest { for _, l := range oldLatest {
keep := true referenced, err := u.updater.IsReferenced(l.EventReference)
for _, prevEvent := range prevEvents { if err != nil {
if l.EventID == prevEvent.EventID && bytes.Equal(l.EventSHA256, prevEvent.EventSHA256) { logrus.WithError(err).Errorf("Failed to retrieve event reference for %q", l.EventID)
// This event can be removed from the latest events cause we've found an event that references it. } else if !referenced {
// (If an event is referenced by another event then it can't be one of the latest events in the room
// because we have an event that comes after it)
keep = false
break
}
}
if l.EventNID == newEvent.EventNID {
alreadyInLatest = true
}
if keep {
// Keep the event in the latest events.
newLatest = append(newLatest, l) newLatest = append(newLatest, l)
} }
} }
if !alreadyReferenced && !alreadyInLatest { // Then check and see if our new event is already included in that set.
// This event is not referenced by any of the events in the room // This ordinarily won't happen but it covers the edge-case that we've
// and the event is not already in the latest events. // already seen this event before and it's a forward extremity, so rather
// Add it to the latest events // than adding a duplicate, we'll just return the set as complete.
for _, l := range newLatest {
if l.EventReference.EventID == newEvent.EventReference.EventID && bytes.Equal(l.EventReference.EventSHA256, newEvent.EventReference.EventSHA256) {
// We've already referenced this new event so we can just return
// the newly completed extremities at this point.
u.latest = newLatest
return
}
}
// At this point we've processed the old extremities, and we've checked
// that our new event isn't already in that set. Therefore now we can
// check if our *new* event is a forward extremity, and if it is, add
// it in.
referenced, err := u.updater.IsReferenced(newEvent.EventReference)
if err != nil {
logrus.WithError(err).Errorf("Failed to retrieve event reference for %q", newEvent.EventReference.EventID)
} else if !referenced {
newLatest = append(newLatest, newEvent) newLatest = append(newLatest, newEvent)
} }
return newLatest u.latest = newLatest
} }
func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) { func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) {

View file

@ -30,11 +30,19 @@ import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
// the max number of servers to backfill from per request. If this is too low we may fail to backfill when
// we could've from another server. If this is too high we may take far too long to successfully backfill
// as we try dead servers.
const maxBackfillServers = 5
type Backfiller struct { type Backfiller struct {
ServerName gomatrixserverlib.ServerName ServerName gomatrixserverlib.ServerName
DB storage.Database DB storage.Database
FSAPI federationSenderAPI.FederationSenderInternalAPI FSAPI federationSenderAPI.FederationSenderInternalAPI
KeyRing gomatrixserverlib.JSONVerifier KeyRing gomatrixserverlib.JSONVerifier
// The servers which should be preferred above other servers when backfilling
PreferServers []gomatrixserverlib.ServerName
} }
// PerformBackfill implements api.RoomServerQueryAPI // PerformBackfill implements api.RoomServerQueryAPI
@ -96,7 +104,7 @@ func (r *Backfiller) backfillViaFederation(ctx context.Context, req *api.Perform
if info == nil || info.IsStub { if info == nil || info.IsStub {
return fmt.Errorf("backfillViaFederation: missing room info for room %s", req.RoomID) return fmt.Errorf("backfillViaFederation: missing room info for room %s", req.RoomID)
} }
requester := newBackfillRequester(r.DB, r.FSAPI, r.ServerName, req.BackwardsExtremities) requester := newBackfillRequester(r.DB, r.FSAPI, r.ServerName, req.BackwardsExtremities, r.PreferServers)
// Request 100 items regardless of what the query asks for. // Request 100 items regardless of what the query asks for.
// We don't want to go much higher than this. // We don't want to go much higher than this.
// We can't honour exactly the limit as some sytests rely on requesting more for tests to pass // We can't honour exactly the limit as some sytests rely on requesting more for tests to pass
@ -195,7 +203,7 @@ func (r *Backfiller) fetchAndStoreMissingEvents(ctx context.Context, roomVer gom
logger.Infof("returned %d PDUs which made events %+v", len(res.PDUs), result) logger.Infof("returned %d PDUs which made events %+v", len(res.PDUs), result)
for _, res := range result { for _, res := range result {
if res.Error != nil { if res.Error != nil {
logger.WithError(err).Warn("event failed PDU checks") logger.WithError(res.Error).Warn("event failed PDU checks")
continue continue
} }
missingMap[id] = res.Event missingMap[id] = res.Event
@ -215,10 +223,11 @@ func (r *Backfiller) fetchAndStoreMissingEvents(ctx context.Context, roomVer gom
// backfillRequester implements gomatrixserverlib.BackfillRequester // backfillRequester implements gomatrixserverlib.BackfillRequester
type backfillRequester struct { type backfillRequester struct {
db storage.Database db storage.Database
fsAPI federationSenderAPI.FederationSenderInternalAPI fsAPI federationSenderAPI.FederationSenderInternalAPI
thisServer gomatrixserverlib.ServerName thisServer gomatrixserverlib.ServerName
bwExtrems map[string][]string preferServer map[gomatrixserverlib.ServerName]bool
bwExtrems map[string][]string
// per-request state // per-request state
servers []gomatrixserverlib.ServerName servers []gomatrixserverlib.ServerName
@ -226,7 +235,14 @@ type backfillRequester struct {
eventIDMap map[string]gomatrixserverlib.Event eventIDMap map[string]gomatrixserverlib.Event
} }
func newBackfillRequester(db storage.Database, fsAPI federationSenderAPI.FederationSenderInternalAPI, thisServer gomatrixserverlib.ServerName, bwExtrems map[string][]string) *backfillRequester { func newBackfillRequester(
db storage.Database, fsAPI federationSenderAPI.FederationSenderInternalAPI, thisServer gomatrixserverlib.ServerName,
bwExtrems map[string][]string, preferServers []gomatrixserverlib.ServerName,
) *backfillRequester {
preferServer := make(map[gomatrixserverlib.ServerName]bool)
for _, p := range preferServers {
preferServer[p] = true
}
return &backfillRequester{ return &backfillRequester{
db: db, db: db,
fsAPI: fsAPI, fsAPI: fsAPI,
@ -234,6 +250,7 @@ func newBackfillRequester(db storage.Database, fsAPI federationSenderAPI.Federat
eventIDToBeforeStateIDs: make(map[string][]string), eventIDToBeforeStateIDs: make(map[string][]string),
eventIDMap: make(map[string]gomatrixserverlib.Event), eventIDMap: make(map[string]gomatrixserverlib.Event),
bwExtrems: bwExtrems, bwExtrems: bwExtrems,
preferServer: preferServer,
} }
} }
@ -436,8 +453,16 @@ FindSuccessor:
if server == b.thisServer { if server == b.thisServer {
continue continue
} }
servers = append(servers, server) if b.preferServer[server] { // insert at the front
servers = append([]gomatrixserverlib.ServerName{server}, servers...)
} else { // insert at the back
servers = append(servers, server)
}
} }
if len(servers) > maxBackfillServers {
servers = servers[:maxBackfillServers]
}
b.servers = servers b.servers = servers
return servers return servers
} }

View file

@ -41,6 +41,11 @@ func NewInternalAPI(
) api.RoomserverInternalAPI { ) api.RoomserverInternalAPI {
cfg := &base.Cfg.RoomServer cfg := &base.Cfg.RoomServer
var perspectiveServerNames []gomatrixserverlib.ServerName
for _, kp := range base.Cfg.ServerKeyAPI.KeyPerspectives {
perspectiveServerNames = append(perspectiveServerNames, kp.ServerName)
}
roomserverDB, err := storage.Open(&cfg.Database, base.Caches) roomserverDB, err := storage.Open(&cfg.Database, base.Caches)
if err != nil { if err != nil {
logrus.WithError(err).Panicf("failed to connect to room server db") logrus.WithError(err).Panicf("failed to connect to room server db")
@ -48,6 +53,6 @@ func NewInternalAPI(
return internal.NewRoomserverAPI( return internal.NewRoomserverAPI(
cfg, roomserverDB, base.KafkaProducer, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)), cfg, roomserverDB, base.KafkaProducer, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)),
base.Caches, keyRing, base.Caches, keyRing, perspectiveServerNames,
) )
} }

View file

@ -474,6 +474,32 @@ func (d *Database) StoreEvent(
return 0, types.StateAtEvent{}, nil, "", fmt.Errorf("d.Writer.Do: %w", err) return 0, types.StateAtEvent{}, nil, "", fmt.Errorf("d.Writer.Do: %w", err)
} }
// We should attempt to update the previous events table with any
// references that this new event makes. We do this using a latest
// events updater because it somewhat works as a mutex, ensuring
// that there's a row-level lock on the latest room events (well,
// on Postgres at least).
var roomInfo *types.RoomInfo
var updater *LatestEventsUpdater
if prevEvents := event.PrevEvents(); len(prevEvents) > 0 {
roomInfo, err = d.RoomInfo(ctx, event.RoomID())
if err != nil {
return 0, types.StateAtEvent{}, nil, "", fmt.Errorf("d.RoomInfo: %w", err)
}
if roomInfo == nil && len(prevEvents) > 0 {
return 0, types.StateAtEvent{}, nil, "", fmt.Errorf("expected room %q to exist", event.RoomID())
}
updater, err = d.GetLatestEventsForUpdate(ctx, *roomInfo)
if err != nil {
return 0, types.StateAtEvent{}, nil, "", fmt.Errorf("NewLatestEventsUpdater: %w", err)
}
if err = updater.StorePreviousEvents(eventNID, prevEvents); err != nil {
return 0, types.StateAtEvent{}, nil, "", fmt.Errorf("updater.StorePreviousEvents: %w", err)
}
succeeded := true
err = sqlutil.EndTransaction(updater, &succeeded)
}
return roomNID, types.StateAtEvent{ return roomNID, types.StateAtEvent{
BeforeStateSnapshotNID: stateNID, BeforeStateSnapshotNID: stateNID,
StateEntry: types.StateEntry{ StateEntry: types.StateEntry{
@ -483,7 +509,7 @@ func (d *Database) StoreEvent(
}, },
EventNID: eventNID, EventNID: eventNID,
}, },
}, redactionEvent, redactedEventID, nil }, redactionEvent, redactedEventID, err
} }
func (d *Database) PublishRoom(ctx context.Context, roomID string, publish bool) error { func (d *Database) PublishRoom(ctx context.Context, roomID string, publish bool) error {

View file

@ -98,10 +98,6 @@ func (s *ServerKeyAPI) FetchKeys(
// we've failed to satisfy it from local keys, database keys or from // we've failed to satisfy it from local keys, database keys or from
// all of the fetchers. Report an error. // all of the fetchers. Report an error.
logrus.Warnf("Failed to retrieve key %q for server %q", req.KeyID, req.ServerName) logrus.Warnf("Failed to retrieve key %q for server %q", req.KeyID, req.ServerName)
return results, fmt.Errorf(
"server key API failed to satisfy key request for server %q key ID %q",
req.ServerName, req.KeyID,
)
} }
} }

View file

@ -473,4 +473,6 @@ Inbound federation rejects invites which include invalid JSON for room version 6
Inbound federation rejects invite rejections which include invalid JSON for room version 6 Inbound federation rejects invite rejections which include invalid JSON for room version 6
GET /capabilities is present and well formed for registered user GET /capabilities is present and well formed for registered user
m.room.history_visibility == "joined" allows/forbids appropriately for Guest users m.room.history_visibility == "joined" allows/forbids appropriately for Guest users
m.room.history_visibility == "joined" allows/forbids appropriately for Real users m.room.history_visibility == "joined" allows/forbids appropriately for Real users
Users cannot kick users who have already left a room
A prev_batch token from incremental sync can be used in the v1 messages API