Maybe fix federation

This commit is contained in:
Neil Alexander 2020-03-26 17:10:00 +00:00
parent ca63b969e9
commit f97343d409
9 changed files with 57 additions and 30 deletions

View file

@ -63,13 +63,13 @@ func (c *RoomserverProducer) SendEventWithState(
return err return err
} }
ires := make([]api.InputRoomEvent, len(outliers)+1) var ires []api.InputRoomEvent
for i, outlier := range outliers { for _, outlier := range outliers {
ires[i] = api.InputRoomEvent{ ires = append(ires, api.InputRoomEvent{
Kind: api.KindOutlier, Kind: api.KindOutlier,
Event: outlier.Headered(event.RoomVersion), Event: outlier.Headered(event.RoomVersion),
AuthEventIDs: outlier.AuthEventIDs(), AuthEventIDs: outlier.AuthEventIDs(),
} })
} }
stateEventIDs := make([]string, len(state.StateEvents)) stateEventIDs := make([]string, len(state.StateEvents))
@ -77,13 +77,13 @@ func (c *RoomserverProducer) SendEventWithState(
stateEventIDs[i] = state.StateEvents[i].EventID() stateEventIDs[i] = state.StateEvents[i].EventID()
} }
ires[len(outliers)] = api.InputRoomEvent{ ires = append(ires, api.InputRoomEvent{
Kind: api.KindNew, Kind: api.KindNew,
Event: event, Event: event,
AuthEventIDs: event.AuthEventIDs(), AuthEventIDs: event.AuthEventIDs(),
HasState: true, HasState: true,
StateEventIDs: stateEventIDs, StateEventIDs: stateEventIDs,
} })
_, err = c.SendInputRoomEvents(ctx, ires) _, err = c.SendInputRoomEvents(ctx, ires)
return err return err

View file

@ -27,6 +27,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
"github.com/sirupsen/logrus"
) )
// http://matrix.org/docs/spec/client_server/r0.2.0.html#put-matrix-client-r0-rooms-roomid-send-eventtype-txnid // http://matrix.org/docs/spec/client_server/r0.2.0.html#put-matrix-client-r0-rooms-roomid-send-eventtype-txnid
@ -82,7 +83,7 @@ func SendEvent(
eventID, err := producer.SendEvents( eventID, err := producer.SendEvents(
req.Context(), req.Context(),
[]gomatrixserverlib.HeaderedEvent{ []gomatrixserverlib.HeaderedEvent{
(*e).Headered(verRes.RoomVersion), e.Headered(verRes.RoomVersion),
}, },
cfg.Matrix.ServerName, cfg.Matrix.ServerName,
txnAndSessionID, txnAndSessionID,
@ -91,7 +92,11 @@ func SendEvent(
util.GetLogger(req.Context()).WithError(err).Error("producer.SendEvents failed") util.GetLogger(req.Context()).WithError(err).Error("producer.SendEvents failed")
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
} }
util.GetLogger(req.Context()).WithField("event_id", eventID).Info("Sent event") util.GetLogger(req.Context()).WithFields(logrus.Fields{
"event_id": eventID,
"room_id": roomID,
"room_version": verRes.RoomVersion,
}).Info("Sent event to roomserver")
res := util.JSONResponse{ res := util.JSONResponse{
Code: http.StatusOK, Code: http.StatusOK,

View file

@ -63,6 +63,8 @@ func Send(
t.TransactionID = txnID t.TransactionID = txnID
t.Destination = cfg.Matrix.ServerName t.Destination = cfg.Matrix.ServerName
util.GetLogger(httpReq.Context()).Infof("Received transaction %q containing %d PDUs, %d EDUs", txnID, len(t.PDUs), len(t.EDUs))
resp, err := t.processTransaction() resp, err := t.processTransaction()
if err != nil { if err != nil {
util.GetLogger(httpReq.Context()).WithError(err).Error("t.processTransaction failed") util.GetLogger(httpReq.Context()).WithError(err).Error("t.processTransaction failed")
@ -91,22 +93,22 @@ func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) {
RoomID string `json:"room_id"` RoomID string `json:"room_id"`
} }
if err := json.Unmarshal(pdu, &header); err != nil { if err := json.Unmarshal(pdu, &header); err != nil {
util.GetLogger(t.context).WithError(err).Warn("Transaction: Failed to extract room ID from event, skipping it.") util.GetLogger(t.context).WithError(err).Warn("Transaction: Failed to extract room ID from event")
continue return nil, err
} }
verReq := api.QueryRoomVersionForRoomRequest{RoomID: header.RoomID} verReq := api.QueryRoomVersionForRoomRequest{RoomID: header.RoomID}
verRes := api.QueryRoomVersionForRoomResponse{} verRes := api.QueryRoomVersionForRoomResponse{}
if err := t.query.QueryRoomVersionForRoom(t.context, &verReq, &verRes); err != nil { if err := t.query.QueryRoomVersionForRoom(t.context, &verReq, &verRes); err != nil {
util.GetLogger(t.context).WithError(err).Warn("Transaction: Failed to query room version for event, skipping it.") util.GetLogger(t.context).WithError(err).Warn("Transaction: Failed to query room version for room", verReq.RoomID)
continue return nil, err
} }
event, err := gomatrixserverlib.NewEventFromUntrustedJSON(pdu, verRes.RoomVersion) event, err := gomatrixserverlib.NewEventFromUntrustedJSON(pdu, verRes.RoomVersion)
if err != nil { if err != nil {
util.GetLogger(t.context).WithError(err).Warn("Transaction: Failed to parse event JSON, skipping it.") util.GetLogger(t.context).WithError(err).Warn("Transaction: Failed to parse event JSON of event %q", event.EventID())
continue return nil, err
} }
if err := gomatrixserverlib.VerifyAllEventSignatures(t.context, []gomatrixserverlib.Event{event}, t.keys); err != nil { if err := gomatrixserverlib.VerifyAllEventSignatures(t.context, []gomatrixserverlib.Event{event}, t.keys); err != nil {
util.GetLogger(t.context).WithError(err).Warnf("Transaction: Couldn't validate signature of event %q, skipping it.", event.EventID()) util.GetLogger(t.context).WithError(err).Warnf("Transaction: Couldn't validate signature of event %q", event.EventID())
return nil, err return nil, err
} }
pdus = append(pdus, event.Headered(verRes.RoomVersion)) pdus = append(pdus, event.Headered(verRes.RoomVersion))
@ -115,7 +117,7 @@ func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) {
// Process the events. // Process the events.
results := map[string]gomatrixserverlib.PDUResult{} results := map[string]gomatrixserverlib.PDUResult{}
for _, e := range pdus { for _, e := range pdus {
err := t.processEvent(e.Event) err := t.processEvent(e.Unwrap())
if err != nil { if err != nil {
// If the error is due to the event itself being bad then we skip // If the error is due to the event itself being bad then we skip
// it and move onto the next event. We report an error so that the // it and move onto the next event. We report an error so that the
@ -150,7 +152,7 @@ func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) {
} }
// TODO: Process the EDUs. // TODO: Process the EDUs.
util.GetLogger(t.context).Infof("Processed %d PDUs from transaction %q", len(results), t.TransactionID)
return &gomatrixserverlib.RespSend{PDUs: results}, nil return &gomatrixserverlib.RespSend{PDUs: results}, nil
} }
@ -192,7 +194,7 @@ func (t *txnReq) processEvent(e gomatrixserverlib.Event) error {
// Check that the event is allowed by the state at the event. // Check that the event is allowed by the state at the event.
var events []gomatrixserverlib.Event var events []gomatrixserverlib.Event
for _, headeredEvent := range stateResp.StateEvents { for _, headeredEvent := range stateResp.StateEvents {
events = append(events, headeredEvent.Event) events = append(events, headeredEvent.Unwrap())
} }
if err := checkAllowedByState(e, events); err != nil { if err := checkAllowedByState(e, events); err != nil {
return err return err
@ -204,7 +206,9 @@ func (t *txnReq) processEvent(e gomatrixserverlib.Event) error {
// pass the event to the roomserver // pass the event to the roomserver
_, err := t.producer.SendEvents( _, err := t.producer.SendEvents(
t.context, t.context,
stateResp.StateEvents, []gomatrixserverlib.HeaderedEvent{
e.Headered(stateResp.RoomVersion),
},
api.DoNotSendToOtherServers, api.DoNotSendToOtherServers,
nil, nil,
) )

View file

@ -22,6 +22,7 @@ import (
"time" "time"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
@ -81,6 +82,8 @@ func (oq *destinationQueue) backgroundSend() {
// TODO: handle retries. // TODO: handle retries.
// TODO: blacklist uncooperative servers. // TODO: blacklist uncooperative servers.
util.GetLogger(context.TODO()).Infof("Sending transaction %q containing %d PDUs, %d EDUs", t.TransactionID, len(t.PDUs), len(t.EDUs))
_, err := oq.client.SendTransaction(context.TODO(), *t) _, err := oq.client.SendTransaction(context.TODO(), *t)
if err != nil { if err != nil {
log.WithFields(log.Fields{ log.WithFields(log.Fields{

2
go.mod
View file

@ -9,7 +9,7 @@ require (
github.com/matrix-org/go-http-js-libp2p v0.0.0-20200318135427-31631a9ef51f github.com/matrix-org/go-http-js-libp2p v0.0.0-20200318135427-31631a9ef51f
github.com/matrix-org/go-sqlite3-js v0.0.0-20200304164012-aa524245b658 github.com/matrix-org/go-sqlite3-js v0.0.0-20200304164012-aa524245b658
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26
github.com/matrix-org/gomatrixserverlib v0.0.0-20200326122246-d2837c742cb1 github.com/matrix-org/gomatrixserverlib v0.0.0-20200326161918-775444162aa6
github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1 github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7
github.com/mattn/go-sqlite3 v2.0.2+incompatible github.com/mattn/go-sqlite3 v2.0.2+incompatible

12
go.sum
View file

@ -150,6 +150,18 @@ github.com/matrix-org/gomatrixserverlib v0.0.0-20200326103633-7eddb3e7c417 h1:U7
github.com/matrix-org/gomatrixserverlib v0.0.0-20200326103633-7eddb3e7c417/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI= github.com/matrix-org/gomatrixserverlib v0.0.0-20200326103633-7eddb3e7c417/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200326122246-d2837c742cb1 h1:0DzAiIcStISdyiu49apzl58aDa9YqVblvdf0XSFsPK0= github.com/matrix-org/gomatrixserverlib v0.0.0-20200326122246-d2837c742cb1 h1:0DzAiIcStISdyiu49apzl58aDa9YqVblvdf0XSFsPK0=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200326122246-d2837c742cb1/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI= github.com/matrix-org/gomatrixserverlib v0.0.0-20200326122246-d2837c742cb1/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200326150303-065fdf243e25 h1:afJi+NNHtpGb7eYhFTWVO2V244QvgfZ9IawM6PIidFw=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200326150303-065fdf243e25/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200326153629-19c3af762744 h1:Gt34JTKK/NIpQsYe/eFCbZAIvSTU5sIxMDeCq98B8pY=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200326153629-19c3af762744/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200326154042-01fc8322855b h1:LZlsIzh5NrslFhT5qXWWBGLeN5L9RJLY+VVu+wvcXow=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200326154042-01fc8322855b/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200326155059-37f346c4bbb2 h1:gmxij7MTLHtIQVXxmFhWRQBfFsssWOHOSa7W5fFVJOM=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200326155059-37f346c4bbb2/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200326161655-d66cb3962b3c h1:Z1FJDOhVGVZfk/eGsLLvCPSIPv04LeIgE01GQ+ZoQ7Y=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200326161655-d66cb3962b3c/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200326161918-775444162aa6 h1:oebFuk+wRvMBgvwfdTTtMBRBi3r3F2uO8eYQiKw25oM=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200326161918-775444162aa6/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI=
github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1 h1:osLoFdOy+ChQqVUn2PeTDETFftVkl4w9t/OW18g3lnk= github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1 h1:osLoFdOy+ChQqVUn2PeTDETFftVkl4w9t/OW18g3lnk=
github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1/go.mod h1:cXoYQIENbdWIQHt1SyCo6Bl3C3raHwJ0wgVrXHSqf+A= github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1/go.mod h1:cXoYQIENbdWIQHt1SyCo6Bl3C3raHwJ0wgVrXHSqf+A=
github.com/matrix-org/util v0.0.0-20171127121716-2e2df66af2f5 h1:W7l5CP4V7wPyPb4tYE11dbmeAOwtFQBTW0rf4OonOS8= github.com/matrix-org/util v0.0.0-20171127121716-2e2df66af2f5 h1:W7l5CP4V7wPyPb4tYE11dbmeAOwtFQBTW0rf4OonOS8=

View file

@ -27,7 +27,7 @@ import (
func checkAuthEvents( func checkAuthEvents(
ctx context.Context, ctx context.Context,
db RoomEventDatabase, db RoomEventDatabase,
event gomatrixserverlib.Event, event gomatrixserverlib.HeaderedEvent,
authEventIDs []string, authEventIDs []string,
) ([]types.EventNID, error) { ) ([]types.EventNID, error) {
// Grab the numeric IDs for the supplied auth state events from the database. // Grab the numeric IDs for the supplied auth state events from the database.
@ -38,7 +38,7 @@ func checkAuthEvents(
// TODO: check for duplicate state keys here. // TODO: check for duplicate state keys here.
// Work out which of the state events we actually need. // Work out which of the state events we actually need.
stateNeeded := gomatrixserverlib.StateNeededForAuth([]gomatrixserverlib.Event{event}) stateNeeded := gomatrixserverlib.StateNeededForAuth([]gomatrixserverlib.Event{event.Unwrap()})
// Load the actual auth events from the database. // Load the actual auth events from the database.
authEvents, err := loadAuthEvents(ctx, db, stateNeeded, authStateEntries) authEvents, err := loadAuthEvents(ctx, db, stateNeeded, authStateEntries)
@ -47,7 +47,7 @@ func checkAuthEvents(
} }
// Check if the event is allowed. // Check if the event is allowed.
if err = gomatrixserverlib.Allowed(event, &authEvents); err != nil { if err = gomatrixserverlib.Allowed(event.Event, &authEvents); err != nil {
return nil, err return nil, err
} }

View file

@ -96,10 +96,11 @@ func processRoomEvent(
input api.InputRoomEvent, input api.InputRoomEvent,
) (eventID string, err error) { ) (eventID string, err error) {
// Parse and validate the event JSON // Parse and validate the event JSON
event := input.Event headered := input.Event
event := headered.Unwrap()
// Check that the event passes authentication checks and work out the numeric IDs for the auth events. // Check that the event passes authentication checks and work out the numeric IDs for the auth events.
authEventNIDs, err := checkAuthEvents(ctx, db, event.Event, input.AuthEventIDs) authEventNIDs, err := checkAuthEvents(ctx, db, headered, input.AuthEventIDs)
if err != nil { if err != nil {
return return
} }
@ -107,7 +108,7 @@ func processRoomEvent(
if input.TransactionID != nil { if input.TransactionID != nil {
tdID := input.TransactionID tdID := input.TransactionID
eventID, err = db.GetTransactionEventID( eventID, err = db.GetTransactionEventID(
ctx, tdID.TransactionID, tdID.SessionID, input.Event.Sender(), ctx, tdID.TransactionID, tdID.SessionID, event.Sender(),
) )
// On error OR event with the transaction already processed/processesing // On error OR event with the transaction already processed/processesing
if err != nil || eventID != "" { if err != nil || eventID != "" {
@ -116,7 +117,7 @@ func processRoomEvent(
} }
// Store the event // Store the event
roomNID, stateAtEvent, err := db.StoreEvent(ctx, event.Event, input.TransactionID, authEventNIDs) roomNID, stateAtEvent, err := db.StoreEvent(ctx, event, input.TransactionID, authEventNIDs)
if err != nil { if err != nil {
return return
} }
@ -131,7 +132,7 @@ func processRoomEvent(
if stateAtEvent.BeforeStateSnapshotNID == 0 { if stateAtEvent.BeforeStateSnapshotNID == 0 {
// We haven't calculated a state for this event yet. // We haven't calculated a state for this event yet.
// Lets calculate one. // Lets calculate one.
err = calculateAndSetState(ctx, db, input, roomNID, &stateAtEvent, event.Event) err = calculateAndSetState(ctx, db, input, roomNID, &stateAtEvent, event)
if err != nil { if err != nil {
return return
} }
@ -144,7 +145,7 @@ func processRoomEvent(
// Update the extremities of the event graph for the room // Update the extremities of the event graph for the room
return event.EventID(), updateLatestEvents( return event.EventID(), updateLatestEvents(
ctx, db, ow, roomNID, stateAtEvent, event.Event, input.SendAsServer, input.TransactionID, ctx, db, ow, roomNID, stateAtEvent, event, input.SendAsServer, input.TransactionID,
) )
} }
@ -235,7 +236,8 @@ func processInviteEvent(
return nil return nil
} }
outputUpdates, err := updateToInviteMembership(updater, &input.Event.Event, nil) event := input.Event.Unwrap()
outputUpdates, err := updateToInviteMembership(updater, &event, nil)
if err != nil { if err != nil {
return err return err
} }

View file

@ -715,6 +715,7 @@ func (r *RoomserverQueryAPI) QueryStateAndAuthChain(
if err != nil { if err != nil {
return err return err
} }
response.RoomVersion = roomVersion
stateEvents, err := r.loadStateAtEventIDs(ctx, request.PrevEventIDs) stateEvents, err := r.loadStateAtEventIDs(ctx, request.PrevEventIDs)
if err != nil { if err != nil {