mirror of
https://github.com/matrix-org/dendrite.git
synced 2024-11-22 22:31:56 -06:00
1414922026
* sytest: Make 'Inbound federation can backfill events' pass This breaks 'Outbound federation can backfill events' because now we are returning the right number of events, which the previous test was relying on. Previously, /messages was backfilling the membership event, causing the test to pass. Now we are no longer backfilling the membership event due to the change in this commit, causing the test to fail. The test should instead be returning the membership event locally from synacpis database, but it doesn't do it fast enough, resulting in a no-op /sync response with a next_batch=s0_0 which will never pick up the local membership event when it rolls in. The test does attempt to retry, but doesn't take the new next_batch=s1_0 resulting in it missing from the /messages response. * Linting
967 lines
29 KiB
Go
967 lines
29 KiB
Go
// Copyright 2017 Vector Creations Ltd
|
|
// Copyright 2018 New Vector Ltd
|
|
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package internal
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
|
|
"github.com/matrix-org/dendrite/roomserver/api"
|
|
"github.com/matrix-org/dendrite/roomserver/auth"
|
|
"github.com/matrix-org/dendrite/roomserver/state"
|
|
"github.com/matrix-org/dendrite/roomserver/storage"
|
|
"github.com/matrix-org/dendrite/roomserver/types"
|
|
"github.com/matrix-org/dendrite/roomserver/version"
|
|
"github.com/matrix-org/gomatrixserverlib"
|
|
"github.com/matrix-org/util"
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
// QueryLatestEventsAndState implements api.RoomserverInternalAPI
|
|
func (r *RoomserverInternalAPI) QueryLatestEventsAndState(
|
|
ctx context.Context,
|
|
request *api.QueryLatestEventsAndStateRequest,
|
|
response *api.QueryLatestEventsAndStateResponse,
|
|
) error {
|
|
roomVersion, err := r.DB.GetRoomVersionForRoom(ctx, request.RoomID)
|
|
if err != nil {
|
|
response.RoomExists = false
|
|
return nil
|
|
}
|
|
|
|
roomState := state.NewStateResolution(r.DB)
|
|
|
|
response.QueryLatestEventsAndStateRequest = *request
|
|
roomNID, err := r.DB.RoomNIDExcludingStubs(ctx, request.RoomID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if roomNID == 0 {
|
|
return nil
|
|
}
|
|
response.RoomExists = true
|
|
response.RoomVersion = roomVersion
|
|
|
|
var currentStateSnapshotNID types.StateSnapshotNID
|
|
response.LatestEvents, currentStateSnapshotNID, response.Depth, err =
|
|
r.DB.LatestEventIDs(ctx, roomNID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var stateEntries []types.StateEntry
|
|
if len(request.StateToFetch) == 0 {
|
|
// Look up all room state.
|
|
stateEntries, err = roomState.LoadStateAtSnapshot(
|
|
ctx, currentStateSnapshotNID,
|
|
)
|
|
} else {
|
|
// Look up the current state for the requested tuples.
|
|
stateEntries, err = roomState.LoadStateAtSnapshotForStringTuples(
|
|
ctx, currentStateSnapshotNID, request.StateToFetch,
|
|
)
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
stateEvents, err := r.loadStateEvents(ctx, stateEntries)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, event := range stateEvents {
|
|
response.StateEvents = append(response.StateEvents, event.Headered(roomVersion))
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// QueryStateAfterEvents implements api.RoomserverInternalAPI
|
|
func (r *RoomserverInternalAPI) QueryStateAfterEvents(
|
|
ctx context.Context,
|
|
request *api.QueryStateAfterEventsRequest,
|
|
response *api.QueryStateAfterEventsResponse,
|
|
) error {
|
|
roomVersion, err := r.DB.GetRoomVersionForRoom(ctx, request.RoomID)
|
|
if err != nil {
|
|
response.RoomExists = false
|
|
return nil
|
|
}
|
|
|
|
roomState := state.NewStateResolution(r.DB)
|
|
|
|
response.QueryStateAfterEventsRequest = *request
|
|
roomNID, err := r.DB.RoomNIDExcludingStubs(ctx, request.RoomID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if roomNID == 0 {
|
|
return nil
|
|
}
|
|
response.RoomExists = true
|
|
response.RoomVersion = roomVersion
|
|
|
|
prevStates, err := r.DB.StateAtEventIDs(ctx, request.PrevEventIDs)
|
|
if err != nil {
|
|
switch err.(type) {
|
|
case types.MissingEventError:
|
|
return nil
|
|
default:
|
|
return err
|
|
}
|
|
}
|
|
response.PrevEventsExist = true
|
|
|
|
// Look up the currrent state for the requested tuples.
|
|
stateEntries, err := roomState.LoadStateAfterEventsForStringTuples(
|
|
ctx, roomNID, prevStates, request.StateToFetch,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
stateEvents, err := r.loadStateEvents(ctx, stateEntries)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, event := range stateEvents {
|
|
response.StateEvents = append(response.StateEvents, event.Headered(roomVersion))
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// QueryEventsByID implements api.RoomserverInternalAPI
|
|
func (r *RoomserverInternalAPI) QueryEventsByID(
|
|
ctx context.Context,
|
|
request *api.QueryEventsByIDRequest,
|
|
response *api.QueryEventsByIDResponse,
|
|
) error {
|
|
response.QueryEventsByIDRequest = *request
|
|
|
|
eventNIDMap, err := r.DB.EventNIDs(ctx, request.EventIDs)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var eventNIDs []types.EventNID
|
|
for _, nid := range eventNIDMap {
|
|
eventNIDs = append(eventNIDs, nid)
|
|
}
|
|
|
|
events, err := r.loadEvents(ctx, eventNIDs)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, event := range events {
|
|
roomVersion, verr := r.DB.GetRoomVersionForRoom(ctx, event.RoomID())
|
|
if verr != nil {
|
|
return verr
|
|
}
|
|
|
|
response.Events = append(response.Events, event.Headered(roomVersion))
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *RoomserverInternalAPI) loadStateEvents(
|
|
ctx context.Context, stateEntries []types.StateEntry,
|
|
) ([]gomatrixserverlib.Event, error) {
|
|
eventNIDs := make([]types.EventNID, len(stateEntries))
|
|
for i := range stateEntries {
|
|
eventNIDs[i] = stateEntries[i].EventNID
|
|
}
|
|
return r.loadEvents(ctx, eventNIDs)
|
|
}
|
|
|
|
func (r *RoomserverInternalAPI) loadEvents(
|
|
ctx context.Context, eventNIDs []types.EventNID,
|
|
) ([]gomatrixserverlib.Event, error) {
|
|
stateEvents, err := r.DB.Events(ctx, eventNIDs)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
result := make([]gomatrixserverlib.Event, len(stateEvents))
|
|
for i := range stateEvents {
|
|
result[i] = stateEvents[i].Event
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
// QueryMembershipForUser implements api.RoomserverInternalAPI
|
|
func (r *RoomserverInternalAPI) QueryMembershipForUser(
|
|
ctx context.Context,
|
|
request *api.QueryMembershipForUserRequest,
|
|
response *api.QueryMembershipForUserResponse,
|
|
) error {
|
|
roomNID, err := r.DB.RoomNID(ctx, request.RoomID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
membershipEventNID, stillInRoom, err := r.DB.GetMembership(ctx, roomNID, request.UserID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if membershipEventNID == 0 {
|
|
response.HasBeenInRoom = false
|
|
return nil
|
|
}
|
|
|
|
response.IsInRoom = stillInRoom
|
|
eventIDMap, err := r.DB.EventIDs(ctx, []types.EventNID{membershipEventNID})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
response.EventID = eventIDMap[membershipEventNID]
|
|
return nil
|
|
}
|
|
|
|
// QueryMembershipsForRoom implements api.RoomserverInternalAPI
|
|
func (r *RoomserverInternalAPI) QueryMembershipsForRoom(
|
|
ctx context.Context,
|
|
request *api.QueryMembershipsForRoomRequest,
|
|
response *api.QueryMembershipsForRoomResponse,
|
|
) error {
|
|
roomNID, err := r.DB.RoomNID(ctx, request.RoomID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
membershipEventNID, stillInRoom, err := r.DB.GetMembership(ctx, roomNID, request.Sender)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if membershipEventNID == 0 {
|
|
response.HasBeenInRoom = false
|
|
response.JoinEvents = nil
|
|
return nil
|
|
}
|
|
|
|
response.HasBeenInRoom = true
|
|
response.JoinEvents = []gomatrixserverlib.ClientEvent{}
|
|
|
|
var events []types.Event
|
|
var stateEntries []types.StateEntry
|
|
if stillInRoom {
|
|
var eventNIDs []types.EventNID
|
|
eventNIDs, err = r.DB.GetMembershipEventNIDsForRoom(ctx, roomNID, request.JoinedOnly)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
events, err = r.DB.Events(ctx, eventNIDs)
|
|
} else {
|
|
stateEntries, err = stateBeforeEvent(ctx, r.DB, membershipEventNID)
|
|
if err != nil {
|
|
logrus.WithField("membership_event_nid", membershipEventNID).WithError(err).Error("failed to load state before event")
|
|
return err
|
|
}
|
|
events, err = getMembershipsAtState(ctx, r.DB, stateEntries, request.JoinedOnly)
|
|
}
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, event := range events {
|
|
clientEvent := gomatrixserverlib.ToClientEvent(event.Event, gomatrixserverlib.FormatAll)
|
|
response.JoinEvents = append(response.JoinEvents, clientEvent)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func stateBeforeEvent(ctx context.Context, db storage.Database, eventNID types.EventNID) ([]types.StateEntry, error) {
|
|
roomState := state.NewStateResolution(db)
|
|
// Lookup the event NID
|
|
eIDs, err := db.EventIDs(ctx, []types.EventNID{eventNID})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
eventIDs := []string{eIDs[eventNID]}
|
|
|
|
prevState, err := db.StateAtEventIDs(ctx, eventIDs)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Fetch the state as it was when this event was fired
|
|
return roomState.LoadCombinedStateAfterEvents(ctx, prevState)
|
|
}
|
|
|
|
// getMembershipsAtState filters the state events to
|
|
// only keep the "m.room.member" events with a "join" membership. These events are returned.
|
|
// Returns an error if there was an issue fetching the events.
|
|
func getMembershipsAtState(
|
|
ctx context.Context, db storage.Database, stateEntries []types.StateEntry, joinedOnly bool,
|
|
) ([]types.Event, error) {
|
|
|
|
var eventNIDs []types.EventNID
|
|
for _, entry := range stateEntries {
|
|
// Filter the events to retrieve to only keep the membership events
|
|
if entry.EventTypeNID == types.MRoomMemberNID {
|
|
eventNIDs = append(eventNIDs, entry.EventNID)
|
|
}
|
|
}
|
|
|
|
// Get all of the events in this state
|
|
stateEvents, err := db.Events(ctx, eventNIDs)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if !joinedOnly {
|
|
return stateEvents, nil
|
|
}
|
|
|
|
// Filter the events to only keep the "join" membership events
|
|
var events []types.Event
|
|
for _, event := range stateEvents {
|
|
membership, err := event.Membership()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if membership == gomatrixserverlib.Join {
|
|
events = append(events, event)
|
|
}
|
|
}
|
|
|
|
return events, nil
|
|
}
|
|
|
|
// QueryInvitesForUser implements api.RoomserverInternalAPI
|
|
func (r *RoomserverInternalAPI) QueryInvitesForUser(
|
|
ctx context.Context,
|
|
request *api.QueryInvitesForUserRequest,
|
|
response *api.QueryInvitesForUserResponse,
|
|
) error {
|
|
roomNID, err := r.DB.RoomNID(ctx, request.RoomID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
targetUserNIDs, err := r.DB.EventStateKeyNIDs(ctx, []string{request.TargetUserID})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
targetUserNID := targetUserNIDs[request.TargetUserID]
|
|
|
|
senderUserNIDs, err := r.DB.GetInvitesForUser(ctx, roomNID, targetUserNID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
senderUserIDs, err := r.DB.EventStateKeys(ctx, senderUserNIDs)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, senderUserID := range senderUserIDs {
|
|
response.InviteSenderUserIDs = append(response.InviteSenderUserIDs, senderUserID)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// QueryServerAllowedToSeeEvent implements api.RoomserverInternalAPI
|
|
func (r *RoomserverInternalAPI) QueryServerAllowedToSeeEvent(
|
|
ctx context.Context,
|
|
request *api.QueryServerAllowedToSeeEventRequest,
|
|
response *api.QueryServerAllowedToSeeEventResponse,
|
|
) (err error) {
|
|
events, err := r.DB.EventsFromIDs(ctx, []string{request.EventID})
|
|
if err != nil {
|
|
return
|
|
}
|
|
if len(events) == 0 {
|
|
response.AllowedToSeeEvent = false // event doesn't exist so not allowed to see
|
|
return
|
|
}
|
|
isServerInRoom, err := r.isServerCurrentlyInRoom(ctx, request.ServerName, events[0].RoomID())
|
|
if err != nil {
|
|
return
|
|
}
|
|
response.AllowedToSeeEvent, err = r.checkServerAllowedToSeeEvent(
|
|
ctx, request.EventID, request.ServerName, isServerInRoom,
|
|
)
|
|
return
|
|
}
|
|
|
|
func (r *RoomserverInternalAPI) checkServerAllowedToSeeEvent(
|
|
ctx context.Context, eventID string, serverName gomatrixserverlib.ServerName, isServerInRoom bool,
|
|
) (bool, error) {
|
|
roomState := state.NewStateResolution(r.DB)
|
|
stateEntries, err := roomState.LoadStateAtEvent(ctx, eventID)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
// TODO: We probably want to make it so that we don't have to pull
|
|
// out all the state if possible.
|
|
stateAtEvent, err := r.loadStateEvents(ctx, stateEntries)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
return auth.IsServerAllowed(serverName, isServerInRoom, stateAtEvent), nil
|
|
}
|
|
|
|
// QueryMissingEvents implements api.RoomserverInternalAPI
|
|
func (r *RoomserverInternalAPI) QueryMissingEvents(
|
|
ctx context.Context,
|
|
request *api.QueryMissingEventsRequest,
|
|
response *api.QueryMissingEventsResponse,
|
|
) error {
|
|
var front []string
|
|
eventsToFilter := make(map[string]bool, len(request.LatestEvents))
|
|
visited := make(map[string]bool, request.Limit) // request.Limit acts as a hint to size.
|
|
for _, id := range request.EarliestEvents {
|
|
visited[id] = true
|
|
}
|
|
|
|
for _, id := range request.LatestEvents {
|
|
if !visited[id] {
|
|
front = append(front, id)
|
|
eventsToFilter[id] = true
|
|
}
|
|
}
|
|
|
|
resultNIDs, err := r.scanEventTree(ctx, front, visited, request.Limit, request.ServerName)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
loadedEvents, err := r.loadEvents(ctx, resultNIDs)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
response.Events = make([]gomatrixserverlib.HeaderedEvent, 0, len(loadedEvents)-len(eventsToFilter))
|
|
for _, event := range loadedEvents {
|
|
if !eventsToFilter[event.EventID()] {
|
|
roomVersion, verr := r.DB.GetRoomVersionForRoom(ctx, event.RoomID())
|
|
if verr != nil {
|
|
return verr
|
|
}
|
|
|
|
response.Events = append(response.Events, event.Headered(roomVersion))
|
|
}
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
// QueryBackfill implements api.RoomServerQueryAPI
|
|
func (r *RoomserverInternalAPI) QueryBackfill(
|
|
ctx context.Context,
|
|
request *api.QueryBackfillRequest,
|
|
response *api.QueryBackfillResponse,
|
|
) error {
|
|
// if we are requesting the backfill then we need to do a federation hit
|
|
// TODO: we could be more sensible and fetch as many events we already have then request the rest
|
|
// which is what the syncapi does already.
|
|
if request.ServerName == r.ServerName {
|
|
return r.backfillViaFederation(ctx, request, response)
|
|
}
|
|
// someone else is requesting the backfill, try to service their request.
|
|
var err error
|
|
var front []string
|
|
|
|
// The limit defines the maximum number of events to retrieve, so it also
|
|
// defines the highest number of elements in the map below.
|
|
visited := make(map[string]bool, request.Limit)
|
|
|
|
// this will include these events which is what we want
|
|
front = request.PrevEventIDs()
|
|
|
|
// Scan the event tree for events to send back.
|
|
resultNIDs, err := r.scanEventTree(ctx, front, visited, request.Limit, request.ServerName)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Retrieve events from the list that was filled previously.
|
|
var loadedEvents []gomatrixserverlib.Event
|
|
loadedEvents, err = r.loadEvents(ctx, resultNIDs)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, event := range loadedEvents {
|
|
roomVersion, verr := r.DB.GetRoomVersionForRoom(ctx, event.RoomID())
|
|
if verr != nil {
|
|
return verr
|
|
}
|
|
|
|
response.Events = append(response.Events, event.Headered(roomVersion))
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func (r *RoomserverInternalAPI) backfillViaFederation(ctx context.Context, req *api.QueryBackfillRequest, res *api.QueryBackfillResponse) error {
|
|
roomVer, err := r.DB.GetRoomVersionForRoom(ctx, req.RoomID)
|
|
if err != nil {
|
|
return fmt.Errorf("backfillViaFederation: unknown room version for room %s : %w", req.RoomID, err)
|
|
}
|
|
requester := newBackfillRequester(r.DB, r.FedClient, r.ServerName, req.BackwardsExtremities)
|
|
// Request 100 items regardless of what the query asks for.
|
|
// We don't want to go much higher than this.
|
|
// We can't honour exactly the limit as some sytests rely on requesting more for tests to pass
|
|
// (so we don't need to hit /state_ids which the test has no listener for)
|
|
// Specifically the test "Outbound federation can backfill events"
|
|
events, err := gomatrixserverlib.RequestBackfill(
|
|
ctx, requester,
|
|
r.KeyRing, req.RoomID, roomVer, req.PrevEventIDs(), 100)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
logrus.WithField("room_id", req.RoomID).Infof("backfilled %d events", len(events))
|
|
|
|
// persist these new events - auth checks have already been done
|
|
roomNID, backfilledEventMap := persistEvents(ctx, r.DB, events)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, ev := range backfilledEventMap {
|
|
// now add state for these events
|
|
stateIDs, ok := requester.eventIDToBeforeStateIDs[ev.EventID()]
|
|
if !ok {
|
|
// this should be impossible as all events returned must have pass Step 5 of the PDU checks
|
|
// which requires a list of state IDs.
|
|
logrus.WithError(err).WithField("event_id", ev.EventID()).Error("backfillViaFederation: failed to find state IDs for event which passed auth checks")
|
|
continue
|
|
}
|
|
var entries []types.StateEntry
|
|
if entries, err = r.DB.StateEntriesForEventIDs(ctx, stateIDs); err != nil {
|
|
// attempt to fetch the missing events
|
|
r.fetchAndStoreMissingEvents(ctx, roomVer, requester, stateIDs)
|
|
// try again
|
|
entries, err = r.DB.StateEntriesForEventIDs(ctx, stateIDs)
|
|
if err != nil {
|
|
logrus.WithError(err).WithField("event_id", ev.EventID()).Error("backfillViaFederation: failed to get state entries for event")
|
|
return err
|
|
}
|
|
}
|
|
|
|
var beforeStateSnapshotNID types.StateSnapshotNID
|
|
if beforeStateSnapshotNID, err = r.DB.AddState(ctx, roomNID, nil, entries); err != nil {
|
|
logrus.WithError(err).WithField("event_id", ev.EventID()).Error("backfillViaFederation: failed to persist state entries to get snapshot nid")
|
|
return err
|
|
}
|
|
if err = r.DB.SetState(ctx, ev.EventNID, beforeStateSnapshotNID); err != nil {
|
|
logrus.WithError(err).WithField("event_id", ev.EventID()).Error("backfillViaFederation: failed to persist snapshot nid")
|
|
}
|
|
}
|
|
|
|
// TODO: update backwards extremities, as that should be moved from syncapi to roomserver at some point.
|
|
|
|
res.Events = events
|
|
return nil
|
|
}
|
|
|
|
func (r *RoomserverInternalAPI) isServerCurrentlyInRoom(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID string) (bool, error) {
|
|
roomNID, err := r.DB.RoomNID(ctx, roomID)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
eventNIDs, err := r.DB.GetMembershipEventNIDsForRoom(ctx, roomNID, true)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
events, err := r.DB.Events(ctx, eventNIDs)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
gmslEvents := make([]gomatrixserverlib.Event, len(events))
|
|
for i := range events {
|
|
gmslEvents[i] = events[i].Event
|
|
}
|
|
return auth.IsAnyUserOnServerWithMembership(serverName, gmslEvents, gomatrixserverlib.Join), nil
|
|
}
|
|
|
|
// fetchAndStoreMissingEvents does a best-effort fetch and store of missing events specified in stateIDs. Returns no error as it is just
|
|
// best effort.
|
|
func (r *RoomserverInternalAPI) fetchAndStoreMissingEvents(ctx context.Context, roomVer gomatrixserverlib.RoomVersion,
|
|
backfillRequester *backfillRequester, stateIDs []string) {
|
|
|
|
servers := backfillRequester.servers
|
|
|
|
// work out which are missing
|
|
nidMap, err := r.DB.EventNIDs(ctx, stateIDs)
|
|
if err != nil {
|
|
util.GetLogger(ctx).WithError(err).Warn("cannot query missing events")
|
|
return
|
|
}
|
|
missingMap := make(map[string]*gomatrixserverlib.HeaderedEvent) // id -> event
|
|
for _, id := range stateIDs {
|
|
if _, ok := nidMap[id]; !ok {
|
|
missingMap[id] = nil
|
|
}
|
|
}
|
|
util.GetLogger(ctx).Infof("Fetching %d missing state events (from %d possible servers)", len(missingMap), len(servers))
|
|
|
|
// fetch the events from federation. Loop the servers first so if we find one that works we stick with them
|
|
for _, srv := range servers {
|
|
for id, ev := range missingMap {
|
|
if ev != nil {
|
|
continue // already found
|
|
}
|
|
logger := util.GetLogger(ctx).WithField("server", srv).WithField("event_id", id)
|
|
res, err := r.FedClient.GetEvent(ctx, srv, id)
|
|
if err != nil {
|
|
logger.WithError(err).Warn("failed to get event from server")
|
|
continue
|
|
}
|
|
loader := gomatrixserverlib.NewEventsLoader(roomVer, r.KeyRing, backfillRequester, backfillRequester.ProvideEvents, false)
|
|
result, err := loader.LoadAndVerify(ctx, res.PDUs, gomatrixserverlib.TopologicalOrderByPrevEvents)
|
|
if err != nil {
|
|
logger.WithError(err).Warn("failed to load and verify event")
|
|
continue
|
|
}
|
|
logger.Infof("returned %d PDUs which made events %+v", len(res.PDUs), result)
|
|
for _, res := range result {
|
|
if res.Error != nil {
|
|
logger.WithError(err).Warn("event failed PDU checks")
|
|
continue
|
|
}
|
|
missingMap[id] = res.Event
|
|
}
|
|
}
|
|
}
|
|
|
|
var newEvents []gomatrixserverlib.HeaderedEvent
|
|
for _, ev := range missingMap {
|
|
if ev != nil {
|
|
newEvents = append(newEvents, *ev)
|
|
}
|
|
}
|
|
util.GetLogger(ctx).Infof("Persisting %d new events", len(newEvents))
|
|
persistEvents(ctx, r.DB, newEvents)
|
|
}
|
|
|
|
// TODO: Remove this when we have tests to assert correctness of this function
|
|
// nolint:gocyclo
|
|
func (r *RoomserverInternalAPI) scanEventTree(
|
|
ctx context.Context, front []string, visited map[string]bool, limit int,
|
|
serverName gomatrixserverlib.ServerName,
|
|
) ([]types.EventNID, error) {
|
|
var resultNIDs []types.EventNID
|
|
var err error
|
|
var allowed bool
|
|
var events []types.Event
|
|
var next []string
|
|
var pre string
|
|
|
|
// TODO: add tests for this function to ensure it meets the contract that callers expect (and doc what that is supposed to be)
|
|
// Currently, callers like QueryBackfill will call scanEventTree with a pre-populated `visited` map, assuming that by doing
|
|
// so means that the events in that map will NOT be returned from this function. That is not currently true, resulting in
|
|
// duplicate events being sent in response to /backfill requests.
|
|
initialIgnoreList := make(map[string]bool, len(visited))
|
|
for k, v := range visited {
|
|
initialIgnoreList[k] = v
|
|
}
|
|
|
|
resultNIDs = make([]types.EventNID, 0, limit)
|
|
|
|
var checkedServerInRoom bool
|
|
var isServerInRoom bool
|
|
|
|
// Loop through the event IDs to retrieve the requested events and go
|
|
// through the whole tree (up to the provided limit) using the events'
|
|
// "prev_event" key.
|
|
BFSLoop:
|
|
for len(front) > 0 {
|
|
// Prevent unnecessary allocations: reset the slice only when not empty.
|
|
if len(next) > 0 {
|
|
next = make([]string, 0)
|
|
}
|
|
// Retrieve the events to process from the database.
|
|
events, err = r.DB.EventsFromIDs(ctx, front)
|
|
if err != nil {
|
|
return resultNIDs, err
|
|
}
|
|
|
|
if !checkedServerInRoom && len(events) > 0 {
|
|
// It's nasty that we have to extract the room ID from an event, but many federation requests
|
|
// only talk in event IDs, no room IDs at all (!!!)
|
|
ev := events[0]
|
|
isServerInRoom, err = r.isServerCurrentlyInRoom(ctx, serverName, ev.RoomID())
|
|
if err != nil {
|
|
util.GetLogger(ctx).WithError(err).Error("Failed to check if server is currently in room, assuming not.")
|
|
}
|
|
checkedServerInRoom = true
|
|
}
|
|
|
|
for _, ev := range events {
|
|
// Break out of the loop if the provided limit is reached.
|
|
if len(resultNIDs) == limit {
|
|
break BFSLoop
|
|
}
|
|
|
|
if !initialIgnoreList[ev.EventID()] {
|
|
// Update the list of events to retrieve.
|
|
resultNIDs = append(resultNIDs, ev.EventNID)
|
|
}
|
|
// Loop through the event's parents.
|
|
for _, pre = range ev.PrevEventIDs() {
|
|
// Only add an event to the list of next events to process if it
|
|
// hasn't been seen before.
|
|
if !visited[pre] {
|
|
visited[pre] = true
|
|
allowed, err = r.checkServerAllowedToSeeEvent(ctx, pre, serverName, isServerInRoom)
|
|
if err != nil {
|
|
util.GetLogger(ctx).WithField("server", serverName).WithField("event_id", pre).WithError(err).Error(
|
|
"Error checking if allowed to see event",
|
|
)
|
|
return resultNIDs, err
|
|
}
|
|
|
|
// If the event hasn't been seen before and the HS
|
|
// requesting to retrieve it is allowed to do so, add it to
|
|
// the list of events to retrieve.
|
|
if allowed {
|
|
next = append(next, pre)
|
|
} else {
|
|
util.GetLogger(ctx).WithField("server", serverName).WithField("event_id", pre).Info("Not allowed to see event")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
// Repeat the same process with the parent events we just processed.
|
|
front = next
|
|
}
|
|
|
|
return resultNIDs, err
|
|
}
|
|
|
|
// QueryStateAndAuthChain implements api.RoomserverInternalAPI
|
|
func (r *RoomserverInternalAPI) QueryStateAndAuthChain(
|
|
ctx context.Context,
|
|
request *api.QueryStateAndAuthChainRequest,
|
|
response *api.QueryStateAndAuthChainResponse,
|
|
) error {
|
|
response.QueryStateAndAuthChainRequest = *request
|
|
roomNID, err := r.DB.RoomNIDExcludingStubs(ctx, request.RoomID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if roomNID == 0 {
|
|
return nil
|
|
}
|
|
response.RoomExists = true
|
|
|
|
roomVersion, err := r.DB.GetRoomVersionForRoom(ctx, request.RoomID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
response.RoomVersion = roomVersion
|
|
|
|
stateEvents, err := r.loadStateAtEventIDs(ctx, request.PrevEventIDs)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
response.PrevEventsExist = true
|
|
|
|
// add the auth event IDs for the current state events too
|
|
var authEventIDs []string
|
|
authEventIDs = append(authEventIDs, request.AuthEventIDs...)
|
|
for _, se := range stateEvents {
|
|
authEventIDs = append(authEventIDs, se.AuthEventIDs()...)
|
|
}
|
|
authEventIDs = util.UniqueStrings(authEventIDs) // de-dupe
|
|
|
|
authEvents, err := getAuthChain(ctx, r.DB.EventsFromIDs, authEventIDs)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if request.ResolveState {
|
|
if stateEvents, err = state.ResolveConflictsAdhoc(
|
|
roomVersion, stateEvents, authEvents,
|
|
); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
for _, event := range stateEvents {
|
|
response.StateEvents = append(response.StateEvents, event.Headered(roomVersion))
|
|
}
|
|
|
|
for _, event := range authEvents {
|
|
response.AuthChainEvents = append(response.AuthChainEvents, event.Headered(roomVersion))
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func (r *RoomserverInternalAPI) loadStateAtEventIDs(ctx context.Context, eventIDs []string) ([]gomatrixserverlib.Event, error) {
|
|
roomState := state.NewStateResolution(r.DB)
|
|
prevStates, err := r.DB.StateAtEventIDs(ctx, eventIDs)
|
|
if err != nil {
|
|
switch err.(type) {
|
|
case types.MissingEventError:
|
|
return nil, nil
|
|
default:
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
// Look up the currrent state for the requested tuples.
|
|
stateEntries, err := roomState.LoadCombinedStateAfterEvents(
|
|
ctx, prevStates,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return r.loadStateEvents(ctx, stateEntries)
|
|
}
|
|
|
|
type eventsFromIDs func(context.Context, []string) ([]types.Event, error)
|
|
|
|
// getAuthChain fetches the auth chain for the given auth events. An auth chain
|
|
// is the list of all events that are referenced in the auth_events section, and
|
|
// all their auth_events, recursively. The returned set of events contain the
|
|
// given events. Will *not* error if we don't have all auth events.
|
|
func getAuthChain(
|
|
ctx context.Context, fn eventsFromIDs, authEventIDs []string,
|
|
) ([]gomatrixserverlib.Event, error) {
|
|
// List of event IDs to fetch. On each pass, these events will be requested
|
|
// from the database and the `eventsToFetch` will be updated with any new
|
|
// events that we have learned about and need to find. When `eventsToFetch`
|
|
// is eventually empty, we should have reached the end of the chain.
|
|
eventsToFetch := authEventIDs
|
|
authEventsMap := make(map[string]gomatrixserverlib.Event)
|
|
|
|
for len(eventsToFetch) > 0 {
|
|
// Try to retrieve the events from the database.
|
|
events, err := fn(ctx, eventsToFetch)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// We've now fetched these events so clear out `eventsToFetch`. Soon we may
|
|
// add newly discovered events to this for the next pass.
|
|
eventsToFetch = eventsToFetch[:0]
|
|
|
|
for _, event := range events {
|
|
// Store the event in the event map - this prevents us from requesting it
|
|
// from the database again.
|
|
authEventsMap[event.EventID()] = event.Event
|
|
|
|
// Extract all of the auth events from the newly obtained event. If we
|
|
// don't already have a record of the event, record it in the list of
|
|
// events we want to request for the next pass.
|
|
for _, authEvent := range event.AuthEvents() {
|
|
if _, ok := authEventsMap[authEvent.EventID]; !ok {
|
|
eventsToFetch = append(eventsToFetch, authEvent.EventID)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// We've now retrieved all of the events we can. Flatten them down into an
|
|
// array and return them.
|
|
var authEvents []gomatrixserverlib.Event
|
|
for _, event := range authEventsMap {
|
|
authEvents = append(authEvents, event)
|
|
}
|
|
|
|
return authEvents, nil
|
|
}
|
|
|
|
func persistEvents(ctx context.Context, db storage.Database, events []gomatrixserverlib.HeaderedEvent) (types.RoomNID, map[string]types.Event) {
|
|
var roomNID types.RoomNID
|
|
backfilledEventMap := make(map[string]types.Event)
|
|
for _, ev := range events {
|
|
nidMap, err := db.EventNIDs(ctx, ev.AuthEventIDs())
|
|
if err != nil { // this shouldn't happen as RequestBackfill already found them
|
|
logrus.WithError(err).WithField("auth_events", ev.AuthEventIDs()).Error("Failed to find one or more auth events")
|
|
continue
|
|
}
|
|
authNids := make([]types.EventNID, len(nidMap))
|
|
i := 0
|
|
for _, nid := range nidMap {
|
|
authNids[i] = nid
|
|
i++
|
|
}
|
|
var stateAtEvent types.StateAtEvent
|
|
roomNID, stateAtEvent, err = db.StoreEvent(ctx, ev.Unwrap(), nil, authNids)
|
|
if err != nil {
|
|
logrus.WithError(err).WithField("event_id", ev.EventID()).Error("Failed to persist event")
|
|
continue
|
|
}
|
|
backfilledEventMap[ev.EventID()] = types.Event{
|
|
EventNID: stateAtEvent.StateEntry.EventNID,
|
|
Event: ev.Unwrap(),
|
|
}
|
|
}
|
|
return roomNID, backfilledEventMap
|
|
}
|
|
|
|
// QueryRoomVersionCapabilities implements api.RoomserverInternalAPI
|
|
func (r *RoomserverInternalAPI) QueryRoomVersionCapabilities(
|
|
ctx context.Context,
|
|
request *api.QueryRoomVersionCapabilitiesRequest,
|
|
response *api.QueryRoomVersionCapabilitiesResponse,
|
|
) error {
|
|
response.DefaultRoomVersion = version.DefaultRoomVersion()
|
|
response.AvailableRoomVersions = make(map[gomatrixserverlib.RoomVersion]string)
|
|
for v, desc := range version.SupportedRoomVersions() {
|
|
if desc.Stable {
|
|
response.AvailableRoomVersions[v] = "stable"
|
|
} else {
|
|
response.AvailableRoomVersions[v] = "unstable"
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// QueryRoomVersionCapabilities implements api.RoomserverInternalAPI
|
|
func (r *RoomserverInternalAPI) QueryRoomVersionForRoom(
|
|
ctx context.Context,
|
|
request *api.QueryRoomVersionForRoomRequest,
|
|
response *api.QueryRoomVersionForRoomResponse,
|
|
) error {
|
|
if roomVersion, ok := r.ImmutableCache.GetRoomVersion(request.RoomID); ok {
|
|
response.RoomVersion = roomVersion
|
|
return nil
|
|
}
|
|
|
|
roomVersion, err := r.DB.GetRoomVersionForRoom(ctx, request.RoomID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
response.RoomVersion = roomVersion
|
|
r.ImmutableCache.StoreRoomVersion(request.RoomID, response.RoomVersion)
|
|
return nil
|
|
}
|