diff --git a/roomserver/internal/api.go b/roomserver/internal/api.go index 7a116e300..5b87e623d 100644 --- a/roomserver/internal/api.go +++ b/roomserver/internal/api.go @@ -41,6 +41,7 @@ type RoomserverInternalAPI struct { fsAPI fsAPI.FederationInternalAPI asAPI asAPI.AppServiceQueryAPI JetStream nats.JetStreamContext + Durable nats.SubOpt InputRoomEventTopic string // JetStream topic for new input room events OutputRoomEventTopic string // JetStream topic for new output room events PerspectiveServerNames []gomatrixserverlib.ServerName @@ -61,6 +62,7 @@ func NewRoomserverAPI( InputRoomEventTopic: inputRoomEventTopic, OutputRoomEventTopic: outputRoomEventTopic, JetStream: consumer, + Durable: cfg.Matrix.JetStream.Durable("RoomserverInputConsumer"), ServerACLs: serverACLs, Queryer: &query.Queryer{ DB: roomserverDB, @@ -68,15 +70,6 @@ func NewRoomserverAPI( ServerName: cfg.Matrix.ServerName, ServerACLs: serverACLs, }, - Inputer: &input.Inputer{ - DB: roomserverDB, - InputRoomEventTopic: inputRoomEventTopic, - OutputRoomEventTopic: outputRoomEventTopic, - JetStream: consumer, - Durable: cfg.Matrix.JetStream.Durable("RoomserverInputConsumer"), - ServerName: cfg.Matrix.ServerName, - ACLs: serverACLs, - }, // perform-er structs get initialised when we have a federation sender to use } return a @@ -94,6 +87,7 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.FederationInternalA InputRoomEventTopic: r.InputRoomEventTopic, OutputRoomEventTopic: r.OutputRoomEventTopic, JetStream: r.JetStream, + Durable: r.Durable, ServerName: r.Cfg.Matrix.ServerName, FSAPI: fsAPI, KeyRing: keyRing, diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go index acdab04a8..692cabb53 100644 --- a/roomserver/internal/input/input_events.go +++ b/roomserver/internal/input/input_events.go @@ -114,7 +114,7 @@ func (r *Inputer) processRoomEvent( missingRes := &api.QueryMissingAuthPrevEventsResponse{} serverRes := &fedapi.QueryJoinedHostServerNamesInRoomResponse{} - if event.Type() != gomatrixserverlib.MRoomCreate { + if event.Type() != gomatrixserverlib.MRoomCreate || !event.StateKeyEquals("") { missingReq := &api.QueryMissingAuthPrevEventsRequest{ RoomID: event.RoomID(), AuthEventIDs: event.AuthEventIDs(), @@ -144,7 +144,7 @@ func (r *Inputer) processRoomEvent( isRejected := false authEvents := gomatrixserverlib.NewAuthEvents(nil) knownEvents := map[string]*types.Event{} - if err = r.checkForMissingAuthEvents(ctx, logger, headered, &authEvents, knownEvents, serverRes.ServerNames); err != nil { + if err = r.fetchAuthEvents(ctx, logger, headered, &authEvents, knownEvents, serverRes.ServerNames); err != nil { return fmt.Errorf("r.checkForMissingAuthEvents: %w", err) } @@ -176,6 +176,16 @@ func (r *Inputer) processRoomEvent( } } + // At this point we are checking whether we know all of the prev events, and + // if we know the state before the prev events. This is necessary before we + // try to do `calculateAndSetState` on the event later, otherwise it will fail + // with missing event NIDs. If there's anything missing then we'll go and fetch + // the prev events and state from the federation. Note that we only do this if + // we weren't already told what the state before the event should be — if the + // HasState option was set and a state set was provided (as is the case in a + // typical federated room join) then we won't bother trying to fetch prev events + // because we may not be allowed to see them and we have no choice but to trust + // the state event IDs provided to us in the join instead. missingPrev := !input.HasState && len(missingRes.MissingPrevEventIDs) > 0 if missingPrev && input.Kind == api.KindNew { // Don't do this for KindOld events, otherwise old events that we fetch @@ -302,7 +312,14 @@ func (r *Inputer) processRoomEvent( return nil } -func (r *Inputer) checkForMissingAuthEvents( +// fetchAuthEvents will check to see if any of the +// auth events specified by the given event are unknown. If they are +// then we will go off and request them from the federation and then +// store them in the database. By the time this function ends, either +// we've failed to retrieve the auth chain altogether (in which case +// an error is returned) or we've successfully retrieved them all and +// they are now in the database. +func (r *Inputer) fetchAuthEvents( ctx context.Context, logger *logrus.Entry, event *gomatrixserverlib.HeaderedEvent, @@ -317,90 +334,99 @@ func (r *Inputer) checkForMissingAuthEvents( unknown := map[string]struct{}{} - authEvents, err := r.DB.EventsFromIDs(ctx, authEventIDs) - if err != nil { - return fmt.Errorf("r.DB.EventsFromIDs: %w", err) - } - for _, event := range authEvents { - if event.Event != nil { - ev := event // don't take the address of the iterated value - known[event.EventID()] = &ev - if err = auth.AddEvent(event.Event); err != nil { - return fmt.Errorf("auth.AddEvent: %w", err) - } - } else { - unknown[event.EventID()] = struct{}{} + for _, authEventID := range authEventIDs { + authEvents, err := r.DB.EventsFromIDs(ctx, []string{authEventID}) + if err != nil { + return fmt.Errorf("r.DB.EventsFromIDs: %w", err) + } + if len(authEvents) == 0 || authEvents[0].Event == nil { + unknown[authEventID] = struct{}{} + continue + } + ev := authEvents[0] + known[authEventID] = &ev // don't take the pointer of the iterated event + if err = auth.AddEvent(ev.Event); err != nil { + return fmt.Errorf("auth.AddEvent: %w", err) } } - if len(unknown) > 0 { - var res gomatrixserverlib.RespEventAuth - var found bool - for _, serverName := range servers { - res, err = r.FSAPI.GetEventAuth(ctx, serverName, event.RoomID(), event.EventID()) - if err != nil { - logger.WithError(err).Warnf("Failed to get event auth from federation for %q: %s", event.EventID(), err) - continue - } - found = true - break + // If there are no missing auth events then there is nothing more + // to do — we've loaded everything that we need. + if len(unknown) == 0 { + return nil + } + + var err error + var res gomatrixserverlib.RespEventAuth + var found bool + for _, serverName := range servers { + // Request the entire auth chain for the event in question. This should + // contain all of the auth events — including ones that we already know — + // so we'll need to filter through those in the next section. + res, err = r.FSAPI.GetEventAuth(ctx, serverName, event.RoomID(), event.EventID()) + if err != nil { + logger.WithError(err).Warnf("Failed to get event auth from federation for %q: %s", event.EventID(), err) + continue } - if !found { - return fmt.Errorf("no servers provided event auth") + found = true + break + } + if !found { + return fmt.Errorf("no servers provided event auth for event ID %q, tried servers %v", event.EventID(), servers) + } + + for _, authEvent := range gomatrixserverlib.ReverseTopologicalOrdering( + res.AuthEvents, + gomatrixserverlib.TopologicalOrderByAuthEvents, + ) { + // If we already know about this event from the database then we don't + // need to store it again or do anything further with it, so just skip + // over it rather than wasting cycles. + if ev, ok := known[authEvent.EventID()]; ok && ev != nil { + continue } - for _, event := range gomatrixserverlib.ReverseTopologicalOrdering( - res.AuthEvents, - gomatrixserverlib.TopologicalOrderByAuthEvents, - ) { - // If we already know about this event then we don't need to store - // it or do anything further with it. - if ev, ok := known[event.EventID()]; ok && ev != nil { - continue - } + // Check the signatures of the event. + // TODO: It really makes sense for the federation API to be doing this, + // because then it can attempt another server if one serves up an event + // with an invalid signature. For now this will do. + if err := authEvent.VerifyEventSignatures(ctx, r.FSAPI.KeyRing()); err != nil { + return fmt.Errorf("event.VerifyEventSignatures: %w", err) + } - // Check the signatures of the event. - // TODO: It really makes sense for the federation API to be doing this, - // because then it can attempt another server if one serves up an event - // with an invalid signature. For now this will do. - if err := event.VerifyEventSignatures(ctx, r.FSAPI.KeyRing()); err != nil { - return fmt.Errorf("event.VerifyEventSignatures: %w", err) + // In order to store the new auth event, we need to know its auth chain + // as NIDs for the `auth_event_nids` column. Let's see if we can find those. + authEventNIDs := make([]types.EventNID, 0, len(authEvent.AuthEventIDs())) + for _, eventID := range authEvent.AuthEventIDs() { + knownEvent, ok := known[eventID] + if !ok { + return fmt.Errorf("missing auth event %s for %s", eventID, authEvent.EventID()) } + authEventNIDs = append(authEventNIDs, knownEvent.EventNID) + } - // Otherwise, we need to store, and that means we need to know the - // auth event NIDs. Let's see if we can find those. - authEventNIDs := make([]types.EventNID, 0, len(event.AuthEventIDs())) - for _, eventID := range event.AuthEventIDs() { - knownEvent, ok := known[eventID] - if !ok { - return fmt.Errorf("missing auth event %s for %s", eventID, event.EventID()) - } - authEventNIDs = append(authEventNIDs, knownEvent.EventNID) - } + // Let's take a note of the fact that we now know about this event. + if err := auth.AddEvent(authEvent); err != nil { + return fmt.Errorf("auth.AddEvent: %w", err) + } - // Let's take a note of the fact that we now know about this event. - if err := auth.AddEvent(event); err != nil { - return fmt.Errorf("auth.AddEvent: %w", err) - } + // Check if the auth event should be rejected. + isRejected := false + if err := gomatrixserverlib.Allowed(authEvent, auth); err != nil { + isRejected = true + logger.WithError(err).Warnf("Auth event %s rejected", authEvent.EventID()) + } - // Check if the auth event should be rejected. - isRejected := false - if err := gomatrixserverlib.Allowed(event, auth); err != nil { - isRejected = true - logger.WithError(err).Warnf("Auth event %s rejected", event.EventID()) - } + // Finally, store the event in the database. + eventNID, _, _, _, _, err := r.DB.StoreEvent(ctx, authEvent, authEventNIDs, isRejected) + if err != nil { + return fmt.Errorf("r.DB.StoreEvent: %w", err) + } - // Finally, store the event in the database. - eventNID, _, _, _, _, err := r.DB.StoreEvent(ctx, event, authEventNIDs, isRejected) - if err != nil { - return fmt.Errorf("r.DB.StoreEvent: %w", err) - } - - // Now we know about this event, too. - known[event.EventID()] = &types.Event{ - EventNID: eventNID, - Event: event, - } + // Now we know about this event, it was stored and the signatures were OK. + known[authEvent.EventID()] = &types.Event{ + EventNID: eventNID, + Event: authEvent, } } diff --git a/roomserver/internal/input/input_missing.go b/roomserver/internal/input/input_missing.go index 79a76af96..10f79675f 100644 --- a/roomserver/internal/input/input_missing.go +++ b/roomserver/internal/input/input_missing.go @@ -32,6 +32,8 @@ type missingStateReq struct { haveEventsMutex sync.Mutex } +// processEventWithMissingState is the entrypoint for a missingStateReq +// request, as called from processRoomEvent. func (t *missingStateReq) processEventWithMissingState( ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion, ) error { @@ -60,7 +62,7 @@ func (t *missingStateReq) processEventWithMissingState( return fmt.Errorf("t.getMissingEvents: %w", err) } if len(newEvents) == 0 { - return nil + return fmt.Errorf("expected to find missing events but didn't") } backwardsExtremity := newEvents[0] @@ -403,8 +405,8 @@ func (t *missingStateReq) getMissingEvents(ctx context.Context, e *gomatrixserve // security: how we handle failures depends on whether or not this event will become the new forward extremity for the room. // There's 2 scenarios to consider: - // - Case A: We got pushed an event and are now fetching missing prev_events. (isInboundTxn=true) - // - Case B: We are fetching missing prev_events already and now fetching some more (isInboundTxn=false) + // - Case A: We got pushed an event and are now fetching missing prev_events. (t.origin != our server name) + // - Case B: We are fetching missing prev_events already and now fetching some more (t.origin == our server name) // In Case B, we know for sure that the event we are currently processing will not become the new forward extremity for the room, // as it was called in response to an inbound txn which had it as a prev_event. // In Case A, the event is a forward extremity, and could eventually become the _only_ forward extremity in the room. This is bad