Look for missing auth events in RS input
This commit is contained in:
parent
9c327d431c
commit
18cef55d04
|
@ -23,6 +23,7 @@ type FederationClient interface {
|
||||||
MSC2836EventRelationships(ctx context.Context, dst gomatrixserverlib.ServerName, r gomatrixserverlib.MSC2836EventRelationshipsRequest, roomVersion gomatrixserverlib.RoomVersion) (res gomatrixserverlib.MSC2836EventRelationshipsResponse, err error)
|
MSC2836EventRelationships(ctx context.Context, dst gomatrixserverlib.ServerName, r gomatrixserverlib.MSC2836EventRelationshipsRequest, roomVersion gomatrixserverlib.RoomVersion) (res gomatrixserverlib.MSC2836EventRelationshipsResponse, err error)
|
||||||
MSC2946Spaces(ctx context.Context, dst gomatrixserverlib.ServerName, roomID string, r gomatrixserverlib.MSC2946SpacesRequest) (res gomatrixserverlib.MSC2946SpacesResponse, err error)
|
MSC2946Spaces(ctx context.Context, dst gomatrixserverlib.ServerName, roomID string, r gomatrixserverlib.MSC2946SpacesRequest) (res gomatrixserverlib.MSC2946SpacesResponse, err error)
|
||||||
LookupServerKeys(ctx context.Context, s gomatrixserverlib.ServerName, keyRequests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp) ([]gomatrixserverlib.ServerKeys, error)
|
LookupServerKeys(ctx context.Context, s gomatrixserverlib.ServerName, keyRequests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp) ([]gomatrixserverlib.ServerKeys, error)
|
||||||
|
GetEventAuth(ctx context.Context, s gomatrixserverlib.ServerName, roomID, eventID string) (res gomatrixserverlib.RespEventAuth, err error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// FederationClientError is returned from FederationClient methods in the event of a problem.
|
// FederationClientError is returned from FederationClient methods in the event of a problem.
|
||||||
|
|
|
@ -10,6 +10,20 @@ import (
|
||||||
// Functions here are "proxying" calls to the gomatrixserverlib federation
|
// Functions here are "proxying" calls to the gomatrixserverlib federation
|
||||||
// client.
|
// client.
|
||||||
|
|
||||||
|
func (a *FederationInternalAPI) GetEventAuth(
|
||||||
|
ctx context.Context, s gomatrixserverlib.ServerName, roomID, eventID string,
|
||||||
|
) (res gomatrixserverlib.RespEventAuth, err error) {
|
||||||
|
ctx, cancel := context.WithTimeout(ctx, time.Second*30)
|
||||||
|
defer cancel()
|
||||||
|
ires, err := a.doRequest(s, func() (interface{}, error) {
|
||||||
|
return a.federation.GetEventAuth(ctx, s, roomID, eventID)
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return gomatrixserverlib.RespEventAuth{}, err
|
||||||
|
}
|
||||||
|
return ires.(gomatrixserverlib.RespEventAuth), nil
|
||||||
|
}
|
||||||
|
|
||||||
func (a *FederationInternalAPI) GetUserDevices(
|
func (a *FederationInternalAPI) GetUserDevices(
|
||||||
ctx context.Context, s gomatrixserverlib.ServerName, userID string,
|
ctx context.Context, s gomatrixserverlib.ServerName, userID string,
|
||||||
) (gomatrixserverlib.RespUserDevices, error) {
|
) (gomatrixserverlib.RespUserDevices, error) {
|
||||||
|
|
|
@ -36,6 +36,7 @@ const (
|
||||||
FederationAPILookupServerKeysPath = "/federationapi/client/lookupServerKeys"
|
FederationAPILookupServerKeysPath = "/federationapi/client/lookupServerKeys"
|
||||||
FederationAPIEventRelationshipsPath = "/federationapi/client/msc2836eventRelationships"
|
FederationAPIEventRelationshipsPath = "/federationapi/client/msc2836eventRelationships"
|
||||||
FederationAPISpacesSummaryPath = "/federationapi/client/msc2946spacesSummary"
|
FederationAPISpacesSummaryPath = "/federationapi/client/msc2946spacesSummary"
|
||||||
|
FederationAPIGetEventAuthPath = "/federationapi/client/getEventAuth"
|
||||||
|
|
||||||
FederationAPIInputPublicKeyPath = "/federationapi/inputPublicKey"
|
FederationAPIInputPublicKeyPath = "/federationapi/inputPublicKey"
|
||||||
FederationAPIQueryPublicKeyPath = "/federationapi/queryPublicKey"
|
FederationAPIQueryPublicKeyPath = "/federationapi/queryPublicKey"
|
||||||
|
@ -382,6 +383,37 @@ func (h *httpFederationInternalAPI) GetEvent(
|
||||||
return *response.Res, nil
|
return *response.Res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type getEventAuth struct {
|
||||||
|
S gomatrixserverlib.ServerName
|
||||||
|
RoomID string
|
||||||
|
EventID string
|
||||||
|
Res *gomatrixserverlib.RespEventAuth
|
||||||
|
Err *api.FederationClientError
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *httpFederationInternalAPI) GetEventAuth(
|
||||||
|
ctx context.Context, s gomatrixserverlib.ServerName, roomID, eventID string,
|
||||||
|
) (gomatrixserverlib.RespEventAuth, error) {
|
||||||
|
span, ctx := opentracing.StartSpanFromContext(ctx, "GetEventAuth")
|
||||||
|
defer span.Finish()
|
||||||
|
|
||||||
|
request := getEventAuth{
|
||||||
|
S: s,
|
||||||
|
RoomID: roomID,
|
||||||
|
EventID: eventID,
|
||||||
|
}
|
||||||
|
var response getEventAuth
|
||||||
|
apiURL := h.federationAPIURL + FederationAPIGetEventAuthPath
|
||||||
|
err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, &request, &response)
|
||||||
|
if err != nil {
|
||||||
|
return gomatrixserverlib.RespEventAuth{}, err
|
||||||
|
}
|
||||||
|
if response.Err != nil {
|
||||||
|
return gomatrixserverlib.RespEventAuth{}, response.Err
|
||||||
|
}
|
||||||
|
return *response.Res, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (h *httpFederationInternalAPI) QueryServerKeys(
|
func (h *httpFederationInternalAPI) QueryServerKeys(
|
||||||
ctx context.Context, req *api.QueryServerKeysRequest, res *api.QueryServerKeysResponse,
|
ctx context.Context, req *api.QueryServerKeysRequest, res *api.QueryServerKeysResponse,
|
||||||
) error {
|
) error {
|
||||||
|
|
|
@ -263,6 +263,28 @@ func AddRoutes(intAPI api.FederationInternalAPI, internalAPIMux *mux.Router) {
|
||||||
return util.JSONResponse{Code: http.StatusOK, JSON: request}
|
return util.JSONResponse{Code: http.StatusOK, JSON: request}
|
||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
|
internalAPIMux.Handle(
|
||||||
|
FederationAPIGetEventAuthPath,
|
||||||
|
httputil.MakeInternalAPI("GetEventAuth", func(req *http.Request) util.JSONResponse {
|
||||||
|
var request getEventAuth
|
||||||
|
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
|
||||||
|
return util.MessageResponse(http.StatusBadRequest, err.Error())
|
||||||
|
}
|
||||||
|
res, err := intAPI.GetEventAuth(req.Context(), request.S, request.RoomID, request.EventID)
|
||||||
|
if err != nil {
|
||||||
|
ferr, ok := err.(*api.FederationClientError)
|
||||||
|
if ok {
|
||||||
|
request.Err = ferr
|
||||||
|
} else {
|
||||||
|
request.Err = &api.FederationClientError{
|
||||||
|
Err: err.Error(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
request.Res = &res
|
||||||
|
return util.JSONResponse{Code: http.StatusOK, JSON: request}
|
||||||
|
}),
|
||||||
|
)
|
||||||
internalAPIMux.Handle(
|
internalAPIMux.Handle(
|
||||||
FederationAPIQueryServerKeysPath,
|
FederationAPIQueryServerKeysPath,
|
||||||
httputil.MakeInternalAPI("QueryServerKeys", func(req *http.Request) util.JSONResponse {
|
httputil.MakeInternalAPI("QueryServerKeys", func(req *http.Request) util.JSONResponse {
|
||||||
|
|
|
@ -37,6 +37,7 @@ type RoomserverInternalAPI struct {
|
||||||
Cache caching.RoomServerCaches
|
Cache caching.RoomServerCaches
|
||||||
ServerName gomatrixserverlib.ServerName
|
ServerName gomatrixserverlib.ServerName
|
||||||
KeyRing gomatrixserverlib.JSONVerifier
|
KeyRing gomatrixserverlib.JSONVerifier
|
||||||
|
ServerACLs *acls.ServerACLs
|
||||||
fsAPI fsAPI.FederationInternalAPI
|
fsAPI fsAPI.FederationInternalAPI
|
||||||
asAPI asAPI.AppServiceQueryAPI
|
asAPI asAPI.AppServiceQueryAPI
|
||||||
OutputRoomEventTopic string // Kafka topic for new output room events
|
OutputRoomEventTopic string // Kafka topic for new output room events
|
||||||
|
@ -55,6 +56,9 @@ func NewRoomserverAPI(
|
||||||
Cache: caches,
|
Cache: caches,
|
||||||
ServerName: cfg.Matrix.ServerName,
|
ServerName: cfg.Matrix.ServerName,
|
||||||
PerspectiveServerNames: perspectiveServerNames,
|
PerspectiveServerNames: perspectiveServerNames,
|
||||||
|
OutputRoomEventTopic: outputRoomEventTopic,
|
||||||
|
Producer: producer,
|
||||||
|
ServerACLs: serverACLs,
|
||||||
Queryer: &query.Queryer{
|
Queryer: &query.Queryer{
|
||||||
DB: roomserverDB,
|
DB: roomserverDB,
|
||||||
Cache: caches,
|
Cache: caches,
|
||||||
|
@ -80,6 +84,15 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.FederationInternalA
|
||||||
r.fsAPI = fsAPI
|
r.fsAPI = fsAPI
|
||||||
r.KeyRing = keyRing
|
r.KeyRing = keyRing
|
||||||
|
|
||||||
|
r.Inputer = &input.Inputer{
|
||||||
|
DB: r.DB,
|
||||||
|
OutputRoomEventTopic: r.OutputRoomEventTopic,
|
||||||
|
Producer: r.Producer,
|
||||||
|
ServerName: r.Cfg.Matrix.ServerName,
|
||||||
|
FSAPI: fsAPI,
|
||||||
|
KeyRing: keyRing,
|
||||||
|
ACLs: r.ServerACLs,
|
||||||
|
}
|
||||||
r.Inviter = &perform.Inviter{
|
r.Inviter = &perform.Inviter{
|
||||||
DB: r.DB,
|
DB: r.DB,
|
||||||
Cfg: r.Cfg,
|
Cfg: r.Cfg,
|
||||||
|
|
|
@ -23,6 +23,7 @@ import (
|
||||||
|
|
||||||
"github.com/Shopify/sarama"
|
"github.com/Shopify/sarama"
|
||||||
"github.com/getsentry/sentry-go"
|
"github.com/getsentry/sentry-go"
|
||||||
|
fedapi "github.com/matrix-org/dendrite/federationapi/api"
|
||||||
"github.com/matrix-org/dendrite/internal/hooks"
|
"github.com/matrix-org/dendrite/internal/hooks"
|
||||||
"github.com/matrix-org/dendrite/roomserver/acls"
|
"github.com/matrix-org/dendrite/roomserver/acls"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
|
@ -44,6 +45,8 @@ type Inputer struct {
|
||||||
DB storage.Database
|
DB storage.Database
|
||||||
Producer sarama.SyncProducer
|
Producer sarama.SyncProducer
|
||||||
ServerName gomatrixserverlib.ServerName
|
ServerName gomatrixserverlib.ServerName
|
||||||
|
FSAPI fedapi.FederationInternalAPI
|
||||||
|
KeyRing gomatrixserverlib.JSONVerifier
|
||||||
ACLs *acls.ServerACLs
|
ACLs *acls.ServerACLs
|
||||||
OutputRoomEventTopic string
|
OutputRoomEventTopic string
|
||||||
workers sync.Map // room ID -> *inputWorker
|
workers sync.Map // room ID -> *inputWorker
|
||||||
|
|
|
@ -22,6 +22,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
fedapi "github.com/matrix-org/dendrite/federationapi/api"
|
||||||
"github.com/matrix-org/dendrite/internal/eventutil"
|
"github.com/matrix-org/dendrite/internal/eventutil"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/dendrite/roomserver/internal/helpers"
|
"github.com/matrix-org/dendrite/roomserver/internal/helpers"
|
||||||
|
@ -98,13 +99,28 @@ func (r *Inputer) processRoomEvent(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check that the event passes authentication checks and work out
|
// First of all, check that the auth events of the event are known.
|
||||||
// the numeric IDs for the auth events.
|
// If they aren't then we will ask the federation API for them.
|
||||||
isRejected := false
|
isRejected := false
|
||||||
authEventNIDs, rejectionErr := helpers.CheckAuthEvents(ctx, r.DB, headered, input.AuthEventIDs)
|
authEvents := gomatrixserverlib.NewAuthEvents(nil)
|
||||||
if rejectionErr != nil {
|
knownAuthEvents := map[string]types.Event{}
|
||||||
logrus.WithError(rejectionErr).WithField("event_id", event.EventID()).WithField("auth_event_ids", input.AuthEventIDs).Error("helpers.CheckAuthEvents failed for event, rejecting event")
|
if err = r.checkForMissingAuthEvents(ctx, input.Event, &authEvents, knownAuthEvents); err != nil {
|
||||||
|
return "", fmt.Errorf("r.checkForMissingAuthEvents: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if the event is allowed by its auth events. If it isn't then
|
||||||
|
// we consider the event to be "rejected" — it will still be persisted.
|
||||||
|
var rejectionErr error
|
||||||
|
if rejectionErr = gomatrixserverlib.Allowed(event, &authEvents); rejectionErr != nil {
|
||||||
isRejected = true
|
isRejected = true
|
||||||
|
logrus.WithError(rejectionErr).Warnf("Event %s rejected", event.EventID())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Accumulate the auth event NIDs.
|
||||||
|
authEventIDs := event.AuthEventIDs()
|
||||||
|
authEventNIDs := make([]types.EventNID, 0, len(authEventIDs))
|
||||||
|
for _, authEventID := range authEventIDs {
|
||||||
|
authEventNIDs = append(authEventNIDs, knownAuthEvents[authEventID].EventNID)
|
||||||
}
|
}
|
||||||
|
|
||||||
var softfail bool
|
var softfail bool
|
||||||
|
@ -228,6 +244,116 @@ func (r *Inputer) processRoomEvent(
|
||||||
return event.EventID(), nil
|
return event.EventID(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *Inputer) checkForMissingAuthEvents(
|
||||||
|
ctx context.Context,
|
||||||
|
event *gomatrixserverlib.HeaderedEvent,
|
||||||
|
auth *gomatrixserverlib.AuthEvents,
|
||||||
|
known map[string]types.Event,
|
||||||
|
) error {
|
||||||
|
authEventIDs := event.AuthEventIDs()
|
||||||
|
if len(authEventIDs) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
unknown := map[string]struct{}{}
|
||||||
|
|
||||||
|
authEvents, err := r.DB.EventsFromIDs(ctx, authEventIDs)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("r.DB.EventsFromIDs: %w", err)
|
||||||
|
}
|
||||||
|
for _, event := range authEvents {
|
||||||
|
if event.Event != nil {
|
||||||
|
known[event.EventID()] = event
|
||||||
|
if err = auth.AddEvent(event.Event); err != nil {
|
||||||
|
return fmt.Errorf("auth.AddEvent: %w", err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
unknown[event.EventID()] = struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(unknown) > 0 {
|
||||||
|
serverReq := &fedapi.QueryJoinedHostServerNamesInRoomRequest{
|
||||||
|
RoomID: event.RoomID(),
|
||||||
|
}
|
||||||
|
serverRes := &fedapi.QueryJoinedHostServerNamesInRoomResponse{}
|
||||||
|
if err = r.FSAPI.QueryJoinedHostServerNamesInRoom(ctx, serverReq, serverRes); err != nil {
|
||||||
|
return fmt.Errorf("r.FSAPI.QueryJoinedHostServerNamesInRoom: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var res gomatrixserverlib.RespEventAuth
|
||||||
|
var found bool
|
||||||
|
for _, serverName := range serverRes.ServerNames {
|
||||||
|
res, err = r.FSAPI.GetEventAuth(ctx, serverName, event.RoomID(), event.EventID())
|
||||||
|
if err != nil {
|
||||||
|
logrus.WithError(err).Warnf("Failed to get event auth from federation for %q: %s", event.EventID(), err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
found = true
|
||||||
|
}
|
||||||
|
if !found {
|
||||||
|
return fmt.Errorf("no servers provided event auth")
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, event := range gomatrixserverlib.ReverseTopologicalOrdering(
|
||||||
|
res.AuthEvents,
|
||||||
|
gomatrixserverlib.TopologicalOrderByAuthEvents,
|
||||||
|
) {
|
||||||
|
// If we already know about this event then we don't need to store
|
||||||
|
// it or do anything further with it.
|
||||||
|
if _, ok := known[event.EventID()]; ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check the signatures of the event.
|
||||||
|
// TODO: It really makes sense for the federation API to be doing this,
|
||||||
|
// because then it can attempt another server if one serves up an event
|
||||||
|
// with an invalid signature. For now this will do.
|
||||||
|
if err := event.VerifyEventSignatures(ctx, r.FSAPI.KeyRing()); err != nil {
|
||||||
|
return fmt.Errorf("event.VerifyEventSignatures: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Otherwise, we need to store, and that means we need to know the
|
||||||
|
// auth event NIDs. Let's see if we can find those.
|
||||||
|
authEventNIDs := make([]types.EventNID, 0, len(event.AuthEventIDs()))
|
||||||
|
for _, eventID := range event.AuthEventIDs() {
|
||||||
|
knownEvent, ok := known[eventID]
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("missing auth event %s for %s", eventID, event.EventID())
|
||||||
|
}
|
||||||
|
authEventNIDs = append(authEventNIDs, knownEvent.EventNID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Let's take a note of the fact that we now know about this event.
|
||||||
|
known[event.EventID()] = types.Event{}
|
||||||
|
if err := auth.AddEvent(event); err != nil {
|
||||||
|
return fmt.Errorf("auth.AddEvent: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if the auth event should be rejected.
|
||||||
|
isRejected := false
|
||||||
|
if err := gomatrixserverlib.Allowed(event, auth); err != nil {
|
||||||
|
isRejected = true
|
||||||
|
logrus.WithError(err).Warnf("Auth event %s rejected", event.EventID())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Finally, store the event in the database.
|
||||||
|
eventNID, _, _, _, _, err := r.DB.StoreEvent(ctx, event, authEventNIDs, isRejected)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("r.DB.StoreEvent: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now we know about this event, too.
|
||||||
|
known[event.EventID()] = types.Event{
|
||||||
|
EventNID: eventNID,
|
||||||
|
Event: event,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (r *Inputer) calculateAndSetState(
|
func (r *Inputer) calculateAndSetState(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
input *api.InputRoomEvent,
|
input *api.InputRoomEvent,
|
||||||
|
|
Loading…
Reference in a new issue