mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-17 03:43:11 -06:00
whoops commit federation API too
This commit is contained in:
parent
9e00f0e828
commit
b54d33a3e8
|
|
@ -48,7 +48,7 @@ func Send(
|
||||||
}
|
}
|
||||||
|
|
||||||
var txnEvents struct {
|
var txnEvents struct {
|
||||||
Events []json.RawMessage `json:"events"`
|
Events []json.RawMessage `json:"pdus"`
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := json.Unmarshal(request.Content(), &txnEvents); err != nil {
|
if err := json.Unmarshal(request.Content(), &txnEvents); err != nil {
|
||||||
|
|
@ -58,6 +58,7 @@ func Send(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
t.PDUs = txnEvents.Events
|
||||||
t.Origin = request.Origin()
|
t.Origin = request.Origin()
|
||||||
t.TransactionID = txnID
|
t.TransactionID = txnID
|
||||||
t.Destination = cfg.Matrix.ServerName
|
t.Destination = cfg.Matrix.ServerName
|
||||||
|
|
@ -84,17 +85,16 @@ type txnReq struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) {
|
func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) {
|
||||||
var pdus []gomatrixserverlib.Event
|
var pdus []gomatrixserverlib.HeaderedEvent
|
||||||
|
|
||||||
for _, pdu := range t.PDUs {
|
for _, pdu := range t.PDUs {
|
||||||
var header struct {
|
var header struct {
|
||||||
roomID string `json:"room_id"`
|
RoomID string `json:"room_id"`
|
||||||
}
|
}
|
||||||
if err := json.Unmarshal(pdu, &header); err != nil {
|
if err := json.Unmarshal(pdu, &header); err != nil {
|
||||||
util.GetLogger(t.context).WithError(err).Warn("Transaction: Failed to extract room ID from event, skipping it.")
|
util.GetLogger(t.context).WithError(err).Warn("Transaction: Failed to extract room ID from event, skipping it.")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
verReq := api.QueryRoomVersionForRoomRequest{RoomID: header.roomID}
|
verReq := api.QueryRoomVersionForRoomRequest{RoomID: header.RoomID}
|
||||||
verRes := api.QueryRoomVersionForRoomResponse{}
|
verRes := api.QueryRoomVersionForRoomResponse{}
|
||||||
if err := t.query.QueryRoomVersionForRoom(t.context, &verReq, &verRes); err != nil {
|
if err := t.query.QueryRoomVersionForRoom(t.context, &verReq, &verRes); err != nil {
|
||||||
util.GetLogger(t.context).WithError(err).Warn("Transaction: Failed to query room version for event, skipping it.")
|
util.GetLogger(t.context).WithError(err).Warn("Transaction: Failed to query room version for event, skipping it.")
|
||||||
|
|
@ -105,18 +105,17 @@ func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) {
|
||||||
util.GetLogger(t.context).WithError(err).Warn("Transaction: Failed to parse event JSON, skipping it.")
|
util.GetLogger(t.context).WithError(err).Warn("Transaction: Failed to parse event JSON, skipping it.")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
pdus = append(pdus, event)
|
if err := gomatrixserverlib.VerifyAllEventSignatures(t.context, []gomatrixserverlib.Event{event}, t.keys); err != nil {
|
||||||
}
|
util.GetLogger(t.context).WithError(err).Warnf("Transaction: Couldn't validate signature of event %q, skipping it.", event.EventID())
|
||||||
|
|
||||||
// Check the event signatures
|
|
||||||
if err := gomatrixserverlib.VerifyAllEventSignatures(t.context, pdus, t.keys); err != nil {
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
pdus = append(pdus, event.Headered(verRes.RoomVersion))
|
||||||
|
}
|
||||||
|
|
||||||
// Process the events.
|
// Process the events.
|
||||||
results := map[string]gomatrixserverlib.PDUResult{}
|
results := map[string]gomatrixserverlib.PDUResult{}
|
||||||
for _, e := range pdus {
|
for _, e := range pdus {
|
||||||
err := t.processEvent(e)
|
err := t.processEvent(e.Event)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// If the error is due to the event itself being bad then we skip
|
// If the error is due to the event itself being bad then we skip
|
||||||
// it and move onto the next event. We report an error so that the
|
// it and move onto the next event. We report an error so that the
|
||||||
|
|
@ -187,7 +186,7 @@ func (t *txnReq) processEvent(e gomatrixserverlib.Event) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
if !stateResp.PrevEventsExist {
|
if !stateResp.PrevEventsExist {
|
||||||
return t.processEventWithMissingState(e)
|
return t.processEventWithMissingState(e, stateResp.RoomVersion)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check that the event is allowed by the state at the event.
|
// Check that the event is allowed by the state at the event.
|
||||||
|
|
@ -223,7 +222,7 @@ func checkAllowedByState(e gomatrixserverlib.Event, stateEvents []gomatrixserver
|
||||||
return gomatrixserverlib.Allowed(e, &authUsingState)
|
return gomatrixserverlib.Allowed(e, &authUsingState)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *txnReq) processEventWithMissingState(e gomatrixserverlib.Event) error {
|
func (t *txnReq) processEventWithMissingState(e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) error {
|
||||||
// We are missing the previous events for this events.
|
// We are missing the previous events for this events.
|
||||||
// This means that there is a gap in our view of the history of the
|
// This means that there is a gap in our view of the history of the
|
||||||
// room. There two ways that we can handle such a gap:
|
// room. There two ways that we can handle such a gap:
|
||||||
|
|
@ -240,7 +239,7 @@ func (t *txnReq) processEventWithMissingState(e gomatrixserverlib.Event) error {
|
||||||
// need to fallback to /state.
|
// need to fallback to /state.
|
||||||
// TODO: Attempt to fill in the gap using /get_missing_events
|
// TODO: Attempt to fill in the gap using /get_missing_events
|
||||||
// TODO: Attempt to fetch the state using /state_ids and /events
|
// TODO: Attempt to fetch the state using /state_ids and /events
|
||||||
state, err := t.federation.LookupState(t.context, t.Origin, e.RoomID(), e.EventID())
|
state, err := t.federation.LookupState(t.context, t.Origin, e.RoomID(), e.EventID(), roomVersion)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
@ -258,7 +257,7 @@ retryAllowedState:
|
||||||
if s.EventID() != missing.AuthEventID {
|
if s.EventID() != missing.AuthEventID {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
err = t.processEventWithMissingState(s)
|
err = t.processEventWithMissingState(s, roomVersion)
|
||||||
// If there was no error retrieving the event from federation then
|
// If there was no error retrieving the event from federation then
|
||||||
// we assume that it succeeded, so retry the original state check
|
// we assume that it succeeded, so retry the original state check
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
|
@ -270,12 +269,6 @@ retryAllowedState:
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
verReq := api.QueryRoomVersionForRoomRequest{RoomID: e.RoomID()}
|
|
||||||
verRes := api.QueryRoomVersionForRoomResponse{}
|
|
||||||
if err := t.query.QueryRoomVersionForRoom(context.Background(), &verReq, &verRes); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// pass the event along with the state to the roomserver
|
// pass the event along with the state to the roomserver
|
||||||
return t.producer.SendEventWithState(t.context, state, e.Headered(verRes.RoomVersion))
|
return t.producer.SendEventWithState(t.context, state, e.Headered(roomVersion))
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue