Some review comment fixes

This commit is contained in:
Neil Alexander 2022-01-25 10:33:50 +00:00
parent 8a2c565385
commit 0bcf2970d1
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
3 changed files with 109 additions and 87 deletions

View file

@ -41,6 +41,7 @@ type RoomserverInternalAPI struct {
fsAPI fsAPI.FederationInternalAPI
asAPI asAPI.AppServiceQueryAPI
JetStream nats.JetStreamContext
Durable nats.SubOpt
InputRoomEventTopic string // JetStream topic for new input room events
OutputRoomEventTopic string // JetStream topic for new output room events
PerspectiveServerNames []gomatrixserverlib.ServerName
@ -61,6 +62,7 @@ func NewRoomserverAPI(
InputRoomEventTopic: inputRoomEventTopic,
OutputRoomEventTopic: outputRoomEventTopic,
JetStream: consumer,
Durable: cfg.Matrix.JetStream.Durable("RoomserverInputConsumer"),
ServerACLs: serverACLs,
Queryer: &query.Queryer{
DB: roomserverDB,
@ -68,15 +70,6 @@ func NewRoomserverAPI(
ServerName: cfg.Matrix.ServerName,
ServerACLs: serverACLs,
},
Inputer: &input.Inputer{
DB: roomserverDB,
InputRoomEventTopic: inputRoomEventTopic,
OutputRoomEventTopic: outputRoomEventTopic,
JetStream: consumer,
Durable: cfg.Matrix.JetStream.Durable("RoomserverInputConsumer"),
ServerName: cfg.Matrix.ServerName,
ACLs: serverACLs,
},
// perform-er structs get initialised when we have a federation sender to use
}
return a
@ -94,6 +87,7 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.FederationInternalA
InputRoomEventTopic: r.InputRoomEventTopic,
OutputRoomEventTopic: r.OutputRoomEventTopic,
JetStream: r.JetStream,
Durable: r.Durable,
ServerName: r.Cfg.Matrix.ServerName,
FSAPI: fsAPI,
KeyRing: keyRing,

View file

@ -114,7 +114,7 @@ func (r *Inputer) processRoomEvent(
missingRes := &api.QueryMissingAuthPrevEventsResponse{}
serverRes := &fedapi.QueryJoinedHostServerNamesInRoomResponse{}
if event.Type() != gomatrixserverlib.MRoomCreate {
if event.Type() != gomatrixserverlib.MRoomCreate || !event.StateKeyEquals("") {
missingReq := &api.QueryMissingAuthPrevEventsRequest{
RoomID: event.RoomID(),
AuthEventIDs: event.AuthEventIDs(),
@ -144,7 +144,7 @@ func (r *Inputer) processRoomEvent(
isRejected := false
authEvents := gomatrixserverlib.NewAuthEvents(nil)
knownEvents := map[string]*types.Event{}
if err = r.checkForMissingAuthEvents(ctx, logger, headered, &authEvents, knownEvents, serverRes.ServerNames); err != nil {
if err = r.fetchAuthEvents(ctx, logger, headered, &authEvents, knownEvents, serverRes.ServerNames); err != nil {
return fmt.Errorf("r.checkForMissingAuthEvents: %w", err)
}
@ -176,6 +176,16 @@ func (r *Inputer) processRoomEvent(
}
}
// At this point we are checking whether we know all of the prev events, and
// if we know the state before the prev events. This is necessary before we
// try to do `calculateAndSetState` on the event later, otherwise it will fail
// with missing event NIDs. If there's anything missing then we'll go and fetch
// the prev events and state from the federation. Note that we only do this if
// we weren't already told what the state before the event should be — if the
// HasState option was set and a state set was provided (as is the case in a
// typical federated room join) then we won't bother trying to fetch prev events
// because we may not be allowed to see them and we have no choice but to trust
// the state event IDs provided to us in the join instead.
missingPrev := !input.HasState && len(missingRes.MissingPrevEventIDs) > 0
if missingPrev && input.Kind == api.KindNew {
// Don't do this for KindOld events, otherwise old events that we fetch
@ -302,7 +312,14 @@ func (r *Inputer) processRoomEvent(
return nil
}
func (r *Inputer) checkForMissingAuthEvents(
// fetchAuthEvents will check to see if any of the
// auth events specified by the given event are unknown. If they are
// then we will go off and request them from the federation and then
// store them in the database. By the time this function ends, either
// we've failed to retrieve the auth chain altogether (in which case
// an error is returned) or we've successfully retrieved them all and
// they are now in the database.
func (r *Inputer) fetchAuthEvents(
ctx context.Context,
logger *logrus.Entry,
event *gomatrixserverlib.HeaderedEvent,
@ -317,90 +334,99 @@ func (r *Inputer) checkForMissingAuthEvents(
unknown := map[string]struct{}{}
authEvents, err := r.DB.EventsFromIDs(ctx, authEventIDs)
if err != nil {
return fmt.Errorf("r.DB.EventsFromIDs: %w", err)
}
for _, event := range authEvents {
if event.Event != nil {
ev := event // don't take the address of the iterated value
known[event.EventID()] = &ev
if err = auth.AddEvent(event.Event); err != nil {
return fmt.Errorf("auth.AddEvent: %w", err)
}
} else {
unknown[event.EventID()] = struct{}{}
for _, authEventID := range authEventIDs {
authEvents, err := r.DB.EventsFromIDs(ctx, []string{authEventID})
if err != nil {
return fmt.Errorf("r.DB.EventsFromIDs: %w", err)
}
if len(authEvents) == 0 || authEvents[0].Event == nil {
unknown[authEventID] = struct{}{}
continue
}
ev := authEvents[0]
known[authEventID] = &ev // don't take the pointer of the iterated event
if err = auth.AddEvent(ev.Event); err != nil {
return fmt.Errorf("auth.AddEvent: %w", err)
}
}
if len(unknown) > 0 {
var res gomatrixserverlib.RespEventAuth
var found bool
for _, serverName := range servers {
res, err = r.FSAPI.GetEventAuth(ctx, serverName, event.RoomID(), event.EventID())
if err != nil {
logger.WithError(err).Warnf("Failed to get event auth from federation for %q: %s", event.EventID(), err)
continue
}
found = true
break
// If there are no missing auth events then there is nothing more
// to do — we've loaded everything that we need.
if len(unknown) == 0 {
return nil
}
var err error
var res gomatrixserverlib.RespEventAuth
var found bool
for _, serverName := range servers {
// Request the entire auth chain for the event in question. This should
// contain all of the auth events — including ones that we already know —
// so we'll need to filter through those in the next section.
res, err = r.FSAPI.GetEventAuth(ctx, serverName, event.RoomID(), event.EventID())
if err != nil {
logger.WithError(err).Warnf("Failed to get event auth from federation for %q: %s", event.EventID(), err)
continue
}
if !found {
return fmt.Errorf("no servers provided event auth")
found = true
break
}
if !found {
return fmt.Errorf("no servers provided event auth for event ID %q, tried servers %v", event.EventID(), servers)
}
for _, authEvent := range gomatrixserverlib.ReverseTopologicalOrdering(
res.AuthEvents,
gomatrixserverlib.TopologicalOrderByAuthEvents,
) {
// If we already know about this event from the database then we don't
// need to store it again or do anything further with it, so just skip
// over it rather than wasting cycles.
if ev, ok := known[authEvent.EventID()]; ok && ev != nil {
continue
}
for _, event := range gomatrixserverlib.ReverseTopologicalOrdering(
res.AuthEvents,
gomatrixserverlib.TopologicalOrderByAuthEvents,
) {
// If we already know about this event then we don't need to store
// it or do anything further with it.
if ev, ok := known[event.EventID()]; ok && ev != nil {
continue
}
// Check the signatures of the event.
// TODO: It really makes sense for the federation API to be doing this,
// because then it can attempt another server if one serves up an event
// with an invalid signature. For now this will do.
if err := authEvent.VerifyEventSignatures(ctx, r.FSAPI.KeyRing()); err != nil {
return fmt.Errorf("event.VerifyEventSignatures: %w", err)
}
// Check the signatures of the event.
// TODO: It really makes sense for the federation API to be doing this,
// because then it can attempt another server if one serves up an event
// with an invalid signature. For now this will do.
if err := event.VerifyEventSignatures(ctx, r.FSAPI.KeyRing()); err != nil {
return fmt.Errorf("event.VerifyEventSignatures: %w", err)
// In order to store the new auth event, we need to know its auth chain
// as NIDs for the `auth_event_nids` column. Let's see if we can find those.
authEventNIDs := make([]types.EventNID, 0, len(authEvent.AuthEventIDs()))
for _, eventID := range authEvent.AuthEventIDs() {
knownEvent, ok := known[eventID]
if !ok {
return fmt.Errorf("missing auth event %s for %s", eventID, authEvent.EventID())
}
authEventNIDs = append(authEventNIDs, knownEvent.EventNID)
}
// Otherwise, we need to store, and that means we need to know the
// auth event NIDs. Let's see if we can find those.
authEventNIDs := make([]types.EventNID, 0, len(event.AuthEventIDs()))
for _, eventID := range event.AuthEventIDs() {
knownEvent, ok := known[eventID]
if !ok {
return fmt.Errorf("missing auth event %s for %s", eventID, event.EventID())
}
authEventNIDs = append(authEventNIDs, knownEvent.EventNID)
}
// Let's take a note of the fact that we now know about this event.
if err := auth.AddEvent(authEvent); err != nil {
return fmt.Errorf("auth.AddEvent: %w", err)
}
// Let's take a note of the fact that we now know about this event.
if err := auth.AddEvent(event); err != nil {
return fmt.Errorf("auth.AddEvent: %w", err)
}
// Check if the auth event should be rejected.
isRejected := false
if err := gomatrixserverlib.Allowed(authEvent, auth); err != nil {
isRejected = true
logger.WithError(err).Warnf("Auth event %s rejected", authEvent.EventID())
}
// Check if the auth event should be rejected.
isRejected := false
if err := gomatrixserverlib.Allowed(event, auth); err != nil {
isRejected = true
logger.WithError(err).Warnf("Auth event %s rejected", event.EventID())
}
// Finally, store the event in the database.
eventNID, _, _, _, _, err := r.DB.StoreEvent(ctx, authEvent, authEventNIDs, isRejected)
if err != nil {
return fmt.Errorf("r.DB.StoreEvent: %w", err)
}
// Finally, store the event in the database.
eventNID, _, _, _, _, err := r.DB.StoreEvent(ctx, event, authEventNIDs, isRejected)
if err != nil {
return fmt.Errorf("r.DB.StoreEvent: %w", err)
}
// Now we know about this event, too.
known[event.EventID()] = &types.Event{
EventNID: eventNID,
Event: event,
}
// Now we know about this event, it was stored and the signatures were OK.
known[authEvent.EventID()] = &types.Event{
EventNID: eventNID,
Event: authEvent,
}
}

View file

@ -32,6 +32,8 @@ type missingStateReq struct {
haveEventsMutex sync.Mutex
}
// processEventWithMissingState is the entrypoint for a missingStateReq
// request, as called from processRoomEvent.
func (t *missingStateReq) processEventWithMissingState(
ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion,
) error {
@ -60,7 +62,7 @@ func (t *missingStateReq) processEventWithMissingState(
return fmt.Errorf("t.getMissingEvents: %w", err)
}
if len(newEvents) == 0 {
return nil
return fmt.Errorf("expected to find missing events but didn't")
}
backwardsExtremity := newEvents[0]
@ -403,8 +405,8 @@ func (t *missingStateReq) getMissingEvents(ctx context.Context, e *gomatrixserve
// security: how we handle failures depends on whether or not this event will become the new forward extremity for the room.
// There's 2 scenarios to consider:
// - Case A: We got pushed an event and are now fetching missing prev_events. (isInboundTxn=true)
// - Case B: We are fetching missing prev_events already and now fetching some more (isInboundTxn=false)
// - Case A: We got pushed an event and are now fetching missing prev_events. (t.origin != our server name)
// - Case B: We are fetching missing prev_events already and now fetching some more (t.origin == our server name)
// In Case B, we know for sure that the event we are currently processing will not become the new forward extremity for the room,
// as it was called in response to an inbound txn which had it as a prev_event.
// In Case A, the event is a forward extremity, and could eventually become the _only_ forward extremity in the room. This is bad