diff --git a/federationapi/api/api.go b/federationapi/api/api.go index 5d4eb8848..0cab7d5a4 100644 --- a/federationapi/api/api.go +++ b/federationapi/api/api.go @@ -23,6 +23,7 @@ type FederationClient interface { MSC2836EventRelationships(ctx context.Context, dst gomatrixserverlib.ServerName, r gomatrixserverlib.MSC2836EventRelationshipsRequest, roomVersion gomatrixserverlib.RoomVersion) (res gomatrixserverlib.MSC2836EventRelationshipsResponse, err error) MSC2946Spaces(ctx context.Context, dst gomatrixserverlib.ServerName, roomID string, r gomatrixserverlib.MSC2946SpacesRequest) (res gomatrixserverlib.MSC2946SpacesResponse, err error) LookupServerKeys(ctx context.Context, s gomatrixserverlib.ServerName, keyRequests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp) ([]gomatrixserverlib.ServerKeys, error) + GetEventAuth(ctx context.Context, s gomatrixserverlib.ServerName, roomID, eventID string) (res gomatrixserverlib.RespEventAuth, err error) } // FederationClientError is returned from FederationClient methods in the event of a problem. diff --git a/federationapi/internal/federationclient.go b/federationapi/internal/federationclient.go index f7acfb10b..66611fe29 100644 --- a/federationapi/internal/federationclient.go +++ b/federationapi/internal/federationclient.go @@ -10,6 +10,20 @@ import ( // Functions here are "proxying" calls to the gomatrixserverlib federation // client. +func (a *FederationInternalAPI) GetEventAuth( + ctx context.Context, s gomatrixserverlib.ServerName, roomID, eventID string, +) (res gomatrixserverlib.RespEventAuth, err error) { + ctx, cancel := context.WithTimeout(ctx, time.Second*30) + defer cancel() + ires, err := a.doRequest(s, func() (interface{}, error) { + return a.federation.GetEventAuth(ctx, s, roomID, eventID) + }) + if err != nil { + return gomatrixserverlib.RespEventAuth{}, err + } + return ires.(gomatrixserverlib.RespEventAuth), nil +} + func (a *FederationInternalAPI) GetUserDevices( ctx context.Context, s gomatrixserverlib.ServerName, userID string, ) (gomatrixserverlib.RespUserDevices, error) { diff --git a/federationapi/inthttp/client.go b/federationapi/inthttp/client.go index af6b801b3..fa9fae343 100644 --- a/federationapi/inthttp/client.go +++ b/federationapi/inthttp/client.go @@ -36,6 +36,7 @@ const ( FederationAPILookupServerKeysPath = "/federationapi/client/lookupServerKeys" FederationAPIEventRelationshipsPath = "/federationapi/client/msc2836eventRelationships" FederationAPISpacesSummaryPath = "/federationapi/client/msc2946spacesSummary" + FederationAPIGetEventAuthPath = "/federationapi/client/getEventAuth" FederationAPIInputPublicKeyPath = "/federationapi/inputPublicKey" FederationAPIQueryPublicKeyPath = "/federationapi/queryPublicKey" @@ -382,6 +383,37 @@ func (h *httpFederationInternalAPI) GetEvent( return *response.Res, nil } +type getEventAuth struct { + S gomatrixserverlib.ServerName + RoomID string + EventID string + Res *gomatrixserverlib.RespEventAuth + Err *api.FederationClientError +} + +func (h *httpFederationInternalAPI) GetEventAuth( + ctx context.Context, s gomatrixserverlib.ServerName, roomID, eventID string, +) (gomatrixserverlib.RespEventAuth, error) { + span, ctx := opentracing.StartSpanFromContext(ctx, "GetEventAuth") + defer span.Finish() + + request := getEventAuth{ + S: s, + RoomID: roomID, + EventID: eventID, + } + var response getEventAuth + apiURL := h.federationAPIURL + FederationAPIGetEventAuthPath + err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, &request, &response) + if err != nil { + return gomatrixserverlib.RespEventAuth{}, err + } + if response.Err != nil { + return gomatrixserverlib.RespEventAuth{}, response.Err + } + return *response.Res, nil +} + func (h *httpFederationInternalAPI) QueryServerKeys( ctx context.Context, req *api.QueryServerKeysRequest, res *api.QueryServerKeysResponse, ) error { diff --git a/federationapi/inthttp/server.go b/federationapi/inthttp/server.go index 7133eddd0..4a5af0706 100644 --- a/federationapi/inthttp/server.go +++ b/federationapi/inthttp/server.go @@ -263,6 +263,28 @@ func AddRoutes(intAPI api.FederationInternalAPI, internalAPIMux *mux.Router) { return util.JSONResponse{Code: http.StatusOK, JSON: request} }), ) + internalAPIMux.Handle( + FederationAPIGetEventAuthPath, + httputil.MakeInternalAPI("GetEventAuth", func(req *http.Request) util.JSONResponse { + var request getEventAuth + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.MessageResponse(http.StatusBadRequest, err.Error()) + } + res, err := intAPI.GetEventAuth(req.Context(), request.S, request.RoomID, request.EventID) + if err != nil { + ferr, ok := err.(*api.FederationClientError) + if ok { + request.Err = ferr + } else { + request.Err = &api.FederationClientError{ + Err: err.Error(), + } + } + } + request.Res = &res + return util.JSONResponse{Code: http.StatusOK, JSON: request} + }), + ) internalAPIMux.Handle( FederationAPIQueryServerKeysPath, httputil.MakeInternalAPI("QueryServerKeys", func(req *http.Request) util.JSONResponse { diff --git a/roomserver/internal/api.go b/roomserver/internal/api.go index 67bbc7aba..c1de3c9cb 100644 --- a/roomserver/internal/api.go +++ b/roomserver/internal/api.go @@ -37,6 +37,7 @@ type RoomserverInternalAPI struct { Cache caching.RoomServerCaches ServerName gomatrixserverlib.ServerName KeyRing gomatrixserverlib.JSONVerifier + ServerACLs *acls.ServerACLs fsAPI fsAPI.FederationInternalAPI asAPI asAPI.AppServiceQueryAPI OutputRoomEventTopic string // Kafka topic for new output room events @@ -55,6 +56,9 @@ func NewRoomserverAPI( Cache: caches, ServerName: cfg.Matrix.ServerName, PerspectiveServerNames: perspectiveServerNames, + OutputRoomEventTopic: outputRoomEventTopic, + Producer: producer, + ServerACLs: serverACLs, Queryer: &query.Queryer{ DB: roomserverDB, Cache: caches, @@ -80,6 +84,15 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.FederationInternalA r.fsAPI = fsAPI r.KeyRing = keyRing + r.Inputer = &input.Inputer{ + DB: r.DB, + OutputRoomEventTopic: r.OutputRoomEventTopic, + Producer: r.Producer, + ServerName: r.Cfg.Matrix.ServerName, + FSAPI: fsAPI, + KeyRing: keyRing, + ACLs: r.ServerACLs, + } r.Inviter = &perform.Inviter{ DB: r.DB, Cfg: r.Cfg, diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index de40e133d..bc9046cf5 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -23,6 +23,7 @@ import ( "github.com/Shopify/sarama" "github.com/getsentry/sentry-go" + fedapi "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/internal/hooks" "github.com/matrix-org/dendrite/roomserver/acls" "github.com/matrix-org/dendrite/roomserver/api" @@ -44,6 +45,8 @@ type Inputer struct { DB storage.Database Producer sarama.SyncProducer ServerName gomatrixserverlib.ServerName + FSAPI fedapi.FederationInternalAPI + KeyRing gomatrixserverlib.JSONVerifier ACLs *acls.ServerACLs OutputRoomEventTopic string workers sync.Map // room ID -> *inputWorker diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go index fc712f47b..c38de7368 100644 --- a/roomserver/internal/input/input_events.go +++ b/roomserver/internal/input/input_events.go @@ -22,6 +22,7 @@ import ( "fmt" "time" + fedapi "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/internal/helpers" @@ -98,13 +99,28 @@ func (r *Inputer) processRoomEvent( } } - // Check that the event passes authentication checks and work out - // the numeric IDs for the auth events. + // First of all, check that the auth events of the event are known. + // If they aren't then we will ask the federation API for them. isRejected := false - authEventNIDs, rejectionErr := helpers.CheckAuthEvents(ctx, r.DB, headered, input.AuthEventIDs) - if rejectionErr != nil { - logrus.WithError(rejectionErr).WithField("event_id", event.EventID()).WithField("auth_event_ids", input.AuthEventIDs).Error("helpers.CheckAuthEvents failed for event, rejecting event") + authEvents := gomatrixserverlib.NewAuthEvents(nil) + knownAuthEvents := map[string]types.Event{} + if err = r.checkForMissingAuthEvents(ctx, input.Event, &authEvents, knownAuthEvents); err != nil { + return "", fmt.Errorf("r.checkForMissingAuthEvents: %w", err) + } + + // Check if the event is allowed by its auth events. If it isn't then + // we consider the event to be "rejected" — it will still be persisted. + var rejectionErr error + if rejectionErr = gomatrixserverlib.Allowed(event, &authEvents); rejectionErr != nil { isRejected = true + logrus.WithError(rejectionErr).Warnf("Event %s rejected", event.EventID()) + } + + // Accumulate the auth event NIDs. + authEventIDs := event.AuthEventIDs() + authEventNIDs := make([]types.EventNID, 0, len(authEventIDs)) + for _, authEventID := range authEventIDs { + authEventNIDs = append(authEventNIDs, knownAuthEvents[authEventID].EventNID) } var softfail bool @@ -228,6 +244,116 @@ func (r *Inputer) processRoomEvent( return event.EventID(), nil } +func (r *Inputer) checkForMissingAuthEvents( + ctx context.Context, + event *gomatrixserverlib.HeaderedEvent, + auth *gomatrixserverlib.AuthEvents, + known map[string]types.Event, +) error { + authEventIDs := event.AuthEventIDs() + if len(authEventIDs) == 0 { + return nil + } + + unknown := map[string]struct{}{} + + authEvents, err := r.DB.EventsFromIDs(ctx, authEventIDs) + if err != nil { + return fmt.Errorf("r.DB.EventsFromIDs: %w", err) + } + for _, event := range authEvents { + if event.Event != nil { + known[event.EventID()] = event + if err = auth.AddEvent(event.Event); err != nil { + return fmt.Errorf("auth.AddEvent: %w", err) + } + } else { + unknown[event.EventID()] = struct{}{} + } + } + + if len(unknown) > 0 { + serverReq := &fedapi.QueryJoinedHostServerNamesInRoomRequest{ + RoomID: event.RoomID(), + } + serverRes := &fedapi.QueryJoinedHostServerNamesInRoomResponse{} + if err = r.FSAPI.QueryJoinedHostServerNamesInRoom(ctx, serverReq, serverRes); err != nil { + return fmt.Errorf("r.FSAPI.QueryJoinedHostServerNamesInRoom: %w", err) + } + + var res gomatrixserverlib.RespEventAuth + var found bool + for _, serverName := range serverRes.ServerNames { + res, err = r.FSAPI.GetEventAuth(ctx, serverName, event.RoomID(), event.EventID()) + if err != nil { + logrus.WithError(err).Warnf("Failed to get event auth from federation for %q: %s", event.EventID(), err) + continue + } + found = true + } + if !found { + return fmt.Errorf("no servers provided event auth") + } + + for _, event := range gomatrixserverlib.ReverseTopologicalOrdering( + res.AuthEvents, + gomatrixserverlib.TopologicalOrderByAuthEvents, + ) { + // If we already know about this event then we don't need to store + // it or do anything further with it. + if _, ok := known[event.EventID()]; ok { + continue + } + + // Check the signatures of the event. + // TODO: It really makes sense for the federation API to be doing this, + // because then it can attempt another server if one serves up an event + // with an invalid signature. For now this will do. + if err := event.VerifyEventSignatures(ctx, r.FSAPI.KeyRing()); err != nil { + return fmt.Errorf("event.VerifyEventSignatures: %w", err) + } + + // Otherwise, we need to store, and that means we need to know the + // auth event NIDs. Let's see if we can find those. + authEventNIDs := make([]types.EventNID, 0, len(event.AuthEventIDs())) + for _, eventID := range event.AuthEventIDs() { + knownEvent, ok := known[eventID] + if !ok { + return fmt.Errorf("missing auth event %s for %s", eventID, event.EventID()) + } + authEventNIDs = append(authEventNIDs, knownEvent.EventNID) + } + + // Let's take a note of the fact that we now know about this event. + known[event.EventID()] = types.Event{} + if err := auth.AddEvent(event); err != nil { + return fmt.Errorf("auth.AddEvent: %w", err) + } + + // Check if the auth event should be rejected. + isRejected := false + if err := gomatrixserverlib.Allowed(event, auth); err != nil { + isRejected = true + logrus.WithError(err).Warnf("Auth event %s rejected", event.EventID()) + } + + // Finally, store the event in the database. + eventNID, _, _, _, _, err := r.DB.StoreEvent(ctx, event, authEventNIDs, isRejected) + if err != nil { + return fmt.Errorf("r.DB.StoreEvent: %w", err) + } + + // Now we know about this event, too. + known[event.EventID()] = types.Event{ + EventNID: eventNID, + Event: event, + } + } + } + + return nil +} + func (r *Inputer) calculateAndSetState( ctx context.Context, input *api.InputRoomEvent,