diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index 55899e61d..526480f49 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -48,7 +48,7 @@ func Send( } var txnEvents struct { - Events []json.RawMessage `json:"events"` + Events []json.RawMessage `json:"pdus"` } if err := json.Unmarshal(request.Content(), &txnEvents); err != nil { @@ -58,6 +58,7 @@ func Send( } } + t.PDUs = txnEvents.Events t.Origin = request.Origin() t.TransactionID = txnID t.Destination = cfg.Matrix.ServerName @@ -84,17 +85,16 @@ type txnReq struct { } func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) { - var pdus []gomatrixserverlib.Event - + var pdus []gomatrixserverlib.HeaderedEvent for _, pdu := range t.PDUs { var header struct { - roomID string `json:"room_id"` + RoomID string `json:"room_id"` } if err := json.Unmarshal(pdu, &header); err != nil { util.GetLogger(t.context).WithError(err).Warn("Transaction: Failed to extract room ID from event, skipping it.") continue } - verReq := api.QueryRoomVersionForRoomRequest{RoomID: header.roomID} + verReq := api.QueryRoomVersionForRoomRequest{RoomID: header.RoomID} verRes := api.QueryRoomVersionForRoomResponse{} if err := t.query.QueryRoomVersionForRoom(t.context, &verReq, &verRes); err != nil { util.GetLogger(t.context).WithError(err).Warn("Transaction: Failed to query room version for event, skipping it.") @@ -105,18 +105,17 @@ func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) { util.GetLogger(t.context).WithError(err).Warn("Transaction: Failed to parse event JSON, skipping it.") continue } - pdus = append(pdus, event) - } - - // Check the event signatures - if err := gomatrixserverlib.VerifyAllEventSignatures(t.context, pdus, t.keys); err != nil { - return nil, err + if err := gomatrixserverlib.VerifyAllEventSignatures(t.context, []gomatrixserverlib.Event{event}, t.keys); err != nil { + util.GetLogger(t.context).WithError(err).Warnf("Transaction: Couldn't validate signature of event %q, skipping it.", event.EventID()) + return nil, err + } + pdus = append(pdus, event.Headered(verRes.RoomVersion)) } // Process the events. results := map[string]gomatrixserverlib.PDUResult{} for _, e := range pdus { - err := t.processEvent(e) + err := t.processEvent(e.Event) if err != nil { // If the error is due to the event itself being bad then we skip // it and move onto the next event. We report an error so that the @@ -187,7 +186,7 @@ func (t *txnReq) processEvent(e gomatrixserverlib.Event) error { } if !stateResp.PrevEventsExist { - return t.processEventWithMissingState(e) + return t.processEventWithMissingState(e, stateResp.RoomVersion) } // Check that the event is allowed by the state at the event. @@ -223,7 +222,7 @@ func checkAllowedByState(e gomatrixserverlib.Event, stateEvents []gomatrixserver return gomatrixserverlib.Allowed(e, &authUsingState) } -func (t *txnReq) processEventWithMissingState(e gomatrixserverlib.Event) error { +func (t *txnReq) processEventWithMissingState(e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) error { // We are missing the previous events for this events. // This means that there is a gap in our view of the history of the // room. There two ways that we can handle such a gap: @@ -240,7 +239,7 @@ func (t *txnReq) processEventWithMissingState(e gomatrixserverlib.Event) error { // need to fallback to /state. // TODO: Attempt to fill in the gap using /get_missing_events // TODO: Attempt to fetch the state using /state_ids and /events - state, err := t.federation.LookupState(t.context, t.Origin, e.RoomID(), e.EventID()) + state, err := t.federation.LookupState(t.context, t.Origin, e.RoomID(), e.EventID(), roomVersion) if err != nil { return err } @@ -258,7 +257,7 @@ retryAllowedState: if s.EventID() != missing.AuthEventID { continue } - err = t.processEventWithMissingState(s) + err = t.processEventWithMissingState(s, roomVersion) // If there was no error retrieving the event from federation then // we assume that it succeeded, so retry the original state check if err == nil { @@ -270,12 +269,6 @@ retryAllowedState: return err } - verReq := api.QueryRoomVersionForRoomRequest{RoomID: e.RoomID()} - verRes := api.QueryRoomVersionForRoomResponse{} - if err := t.query.QueryRoomVersionForRoom(context.Background(), &verReq, &verRes); err != nil { - return err - } - // pass the event along with the state to the roomserver - return t.producer.SendEventWithState(t.context, state, e.Headered(verRes.RoomVersion)) + return t.producer.SendEventWithState(t.context, state, e.Headered(roomVersion)) }