diff --git a/clientapi/producers/roomserver.go b/clientapi/producers/roomserver.go index 69961cda1..06af54404 100644 --- a/clientapi/producers/roomserver.go +++ b/clientapi/producers/roomserver.go @@ -63,13 +63,13 @@ func (c *RoomserverProducer) SendEventWithState( return err } - ires := make([]api.InputRoomEvent, len(outliers)+1) - for i, outlier := range outliers { - ires[i] = api.InputRoomEvent{ + var ires []api.InputRoomEvent + for _, outlier := range outliers { + ires = append(ires, api.InputRoomEvent{ Kind: api.KindOutlier, Event: outlier.Headered(event.RoomVersion), AuthEventIDs: outlier.AuthEventIDs(), - } + }) } stateEventIDs := make([]string, len(state.StateEvents)) @@ -77,13 +77,13 @@ func (c *RoomserverProducer) SendEventWithState( stateEventIDs[i] = state.StateEvents[i].EventID() } - ires[len(outliers)] = api.InputRoomEvent{ + ires = append(ires, api.InputRoomEvent{ Kind: api.KindNew, Event: event, AuthEventIDs: event.AuthEventIDs(), HasState: true, StateEventIDs: stateEventIDs, - } + }) _, err = c.SendInputRoomEvents(ctx, ires) return err diff --git a/clientapi/routing/sendevent.go b/clientapi/routing/sendevent.go index a031c5790..5b2cd8ad4 100644 --- a/clientapi/routing/sendevent.go +++ b/clientapi/routing/sendevent.go @@ -27,6 +27,7 @@ import ( "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" + "github.com/sirupsen/logrus" ) // http://matrix.org/docs/spec/client_server/r0.2.0.html#put-matrix-client-r0-rooms-roomid-send-eventtype-txnid @@ -82,7 +83,7 @@ func SendEvent( eventID, err := producer.SendEvents( req.Context(), []gomatrixserverlib.HeaderedEvent{ - (*e).Headered(verRes.RoomVersion), + e.Headered(verRes.RoomVersion), }, cfg.Matrix.ServerName, txnAndSessionID, @@ -91,7 +92,11 @@ func SendEvent( util.GetLogger(req.Context()).WithError(err).Error("producer.SendEvents failed") return jsonerror.InternalServerError() } - util.GetLogger(req.Context()).WithField("event_id", eventID).Info("Sent event") + util.GetLogger(req.Context()).WithFields(logrus.Fields{ + "event_id": eventID, + "room_id": roomID, + "room_version": verRes.RoomVersion, + }).Info("Sent event to roomserver") res := util.JSONResponse{ Code: http.StatusOK, diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index 526480f49..9146a54cb 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -63,6 +63,8 @@ func Send( t.TransactionID = txnID t.Destination = cfg.Matrix.ServerName + util.GetLogger(httpReq.Context()).Infof("Received transaction %q containing %d PDUs, %d EDUs", txnID, len(t.PDUs), len(t.EDUs)) + resp, err := t.processTransaction() if err != nil { util.GetLogger(httpReq.Context()).WithError(err).Error("t.processTransaction failed") @@ -91,22 +93,22 @@ func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) { RoomID string `json:"room_id"` } if err := json.Unmarshal(pdu, &header); err != nil { - util.GetLogger(t.context).WithError(err).Warn("Transaction: Failed to extract room ID from event, skipping it.") - continue + util.GetLogger(t.context).WithError(err).Warn("Transaction: Failed to extract room ID from event") + return nil, err } verReq := api.QueryRoomVersionForRoomRequest{RoomID: header.RoomID} verRes := api.QueryRoomVersionForRoomResponse{} if err := t.query.QueryRoomVersionForRoom(t.context, &verReq, &verRes); err != nil { - util.GetLogger(t.context).WithError(err).Warn("Transaction: Failed to query room version for event, skipping it.") - continue + util.GetLogger(t.context).WithError(err).Warn("Transaction: Failed to query room version for room", verReq.RoomID) + return nil, err } event, err := gomatrixserverlib.NewEventFromUntrustedJSON(pdu, verRes.RoomVersion) if err != nil { - util.GetLogger(t.context).WithError(err).Warn("Transaction: Failed to parse event JSON, skipping it.") - continue + util.GetLogger(t.context).WithError(err).Warn("Transaction: Failed to parse event JSON of event %q", event.EventID()) + return nil, err } if err := gomatrixserverlib.VerifyAllEventSignatures(t.context, []gomatrixserverlib.Event{event}, t.keys); err != nil { - util.GetLogger(t.context).WithError(err).Warnf("Transaction: Couldn't validate signature of event %q, skipping it.", event.EventID()) + util.GetLogger(t.context).WithError(err).Warnf("Transaction: Couldn't validate signature of event %q", event.EventID()) return nil, err } pdus = append(pdus, event.Headered(verRes.RoomVersion)) @@ -115,7 +117,7 @@ func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) { // Process the events. results := map[string]gomatrixserverlib.PDUResult{} for _, e := range pdus { - err := t.processEvent(e.Event) + err := t.processEvent(e.Unwrap()) if err != nil { // If the error is due to the event itself being bad then we skip // it and move onto the next event. We report an error so that the @@ -150,7 +152,7 @@ func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) { } // TODO: Process the EDUs. - + util.GetLogger(t.context).Infof("Processed %d PDUs from transaction %q", len(results), t.TransactionID) return &gomatrixserverlib.RespSend{PDUs: results}, nil } @@ -192,7 +194,7 @@ func (t *txnReq) processEvent(e gomatrixserverlib.Event) error { // Check that the event is allowed by the state at the event. var events []gomatrixserverlib.Event for _, headeredEvent := range stateResp.StateEvents { - events = append(events, headeredEvent.Event) + events = append(events, headeredEvent.Unwrap()) } if err := checkAllowedByState(e, events); err != nil { return err @@ -204,7 +206,9 @@ func (t *txnReq) processEvent(e gomatrixserverlib.Event) error { // pass the event to the roomserver _, err := t.producer.SendEvents( t.context, - stateResp.StateEvents, + []gomatrixserverlib.HeaderedEvent{ + e.Headered(stateResp.RoomVersion), + }, api.DoNotSendToOtherServers, nil, ) diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index a4ec5b4b8..8ac75b47f 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -22,6 +22,7 @@ import ( "time" "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" log "github.com/sirupsen/logrus" ) @@ -81,6 +82,8 @@ func (oq *destinationQueue) backgroundSend() { // TODO: handle retries. // TODO: blacklist uncooperative servers. + util.GetLogger(context.TODO()).Infof("Sending transaction %q containing %d PDUs, %d EDUs", t.TransactionID, len(t.PDUs), len(t.EDUs)) + _, err := oq.client.SendTransaction(context.TODO(), *t) if err != nil { log.WithFields(log.Fields{ diff --git a/go.mod b/go.mod index 46722a19f..abdc8f0ec 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/matrix-org/go-http-js-libp2p v0.0.0-20200318135427-31631a9ef51f github.com/matrix-org/go-sqlite3-js v0.0.0-20200304164012-aa524245b658 github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 - github.com/matrix-org/gomatrixserverlib v0.0.0-20200326122246-d2837c742cb1 + github.com/matrix-org/gomatrixserverlib v0.0.0-20200326161918-775444162aa6 github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1 github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 github.com/mattn/go-sqlite3 v2.0.2+incompatible diff --git a/go.sum b/go.sum index e5492fa45..d26a1da58 100644 --- a/go.sum +++ b/go.sum @@ -150,6 +150,18 @@ github.com/matrix-org/gomatrixserverlib v0.0.0-20200326103633-7eddb3e7c417 h1:U7 github.com/matrix-org/gomatrixserverlib v0.0.0-20200326103633-7eddb3e7c417/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI= github.com/matrix-org/gomatrixserverlib v0.0.0-20200326122246-d2837c742cb1 h1:0DzAiIcStISdyiu49apzl58aDa9YqVblvdf0XSFsPK0= github.com/matrix-org/gomatrixserverlib v0.0.0-20200326122246-d2837c742cb1/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI= +github.com/matrix-org/gomatrixserverlib v0.0.0-20200326150303-065fdf243e25 h1:afJi+NNHtpGb7eYhFTWVO2V244QvgfZ9IawM6PIidFw= +github.com/matrix-org/gomatrixserverlib v0.0.0-20200326150303-065fdf243e25/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI= +github.com/matrix-org/gomatrixserverlib v0.0.0-20200326153629-19c3af762744 h1:Gt34JTKK/NIpQsYe/eFCbZAIvSTU5sIxMDeCq98B8pY= +github.com/matrix-org/gomatrixserverlib v0.0.0-20200326153629-19c3af762744/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI= +github.com/matrix-org/gomatrixserverlib v0.0.0-20200326154042-01fc8322855b h1:LZlsIzh5NrslFhT5qXWWBGLeN5L9RJLY+VVu+wvcXow= +github.com/matrix-org/gomatrixserverlib v0.0.0-20200326154042-01fc8322855b/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI= +github.com/matrix-org/gomatrixserverlib v0.0.0-20200326155059-37f346c4bbb2 h1:gmxij7MTLHtIQVXxmFhWRQBfFsssWOHOSa7W5fFVJOM= +github.com/matrix-org/gomatrixserverlib v0.0.0-20200326155059-37f346c4bbb2/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI= +github.com/matrix-org/gomatrixserverlib v0.0.0-20200326161655-d66cb3962b3c h1:Z1FJDOhVGVZfk/eGsLLvCPSIPv04LeIgE01GQ+ZoQ7Y= +github.com/matrix-org/gomatrixserverlib v0.0.0-20200326161655-d66cb3962b3c/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI= +github.com/matrix-org/gomatrixserverlib v0.0.0-20200326161918-775444162aa6 h1:oebFuk+wRvMBgvwfdTTtMBRBi3r3F2uO8eYQiKw25oM= +github.com/matrix-org/gomatrixserverlib v0.0.0-20200326161918-775444162aa6/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI= github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1 h1:osLoFdOy+ChQqVUn2PeTDETFftVkl4w9t/OW18g3lnk= github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1/go.mod h1:cXoYQIENbdWIQHt1SyCo6Bl3C3raHwJ0wgVrXHSqf+A= github.com/matrix-org/util v0.0.0-20171127121716-2e2df66af2f5 h1:W7l5CP4V7wPyPb4tYE11dbmeAOwtFQBTW0rf4OonOS8= diff --git a/roomserver/input/authevents.go b/roomserver/input/authevents.go index 74be2ed33..456a01c79 100644 --- a/roomserver/input/authevents.go +++ b/roomserver/input/authevents.go @@ -27,7 +27,7 @@ import ( func checkAuthEvents( ctx context.Context, db RoomEventDatabase, - event gomatrixserverlib.Event, + event gomatrixserverlib.HeaderedEvent, authEventIDs []string, ) ([]types.EventNID, error) { // Grab the numeric IDs for the supplied auth state events from the database. @@ -38,7 +38,7 @@ func checkAuthEvents( // TODO: check for duplicate state keys here. // Work out which of the state events we actually need. - stateNeeded := gomatrixserverlib.StateNeededForAuth([]gomatrixserverlib.Event{event}) + stateNeeded := gomatrixserverlib.StateNeededForAuth([]gomatrixserverlib.Event{event.Unwrap()}) // Load the actual auth events from the database. authEvents, err := loadAuthEvents(ctx, db, stateNeeded, authStateEntries) @@ -47,7 +47,7 @@ func checkAuthEvents( } // Check if the event is allowed. - if err = gomatrixserverlib.Allowed(event, &authEvents); err != nil { + if err = gomatrixserverlib.Allowed(event.Event, &authEvents); err != nil { return nil, err } diff --git a/roomserver/input/events.go b/roomserver/input/events.go index ab2bbe1cd..c75a3acd9 100644 --- a/roomserver/input/events.go +++ b/roomserver/input/events.go @@ -96,10 +96,11 @@ func processRoomEvent( input api.InputRoomEvent, ) (eventID string, err error) { // Parse and validate the event JSON - event := input.Event + headered := input.Event + event := headered.Unwrap() // Check that the event passes authentication checks and work out the numeric IDs for the auth events. - authEventNIDs, err := checkAuthEvents(ctx, db, event.Event, input.AuthEventIDs) + authEventNIDs, err := checkAuthEvents(ctx, db, headered, input.AuthEventIDs) if err != nil { return } @@ -107,7 +108,7 @@ func processRoomEvent( if input.TransactionID != nil { tdID := input.TransactionID eventID, err = db.GetTransactionEventID( - ctx, tdID.TransactionID, tdID.SessionID, input.Event.Sender(), + ctx, tdID.TransactionID, tdID.SessionID, event.Sender(), ) // On error OR event with the transaction already processed/processesing if err != nil || eventID != "" { @@ -116,7 +117,7 @@ func processRoomEvent( } // Store the event - roomNID, stateAtEvent, err := db.StoreEvent(ctx, event.Event, input.TransactionID, authEventNIDs) + roomNID, stateAtEvent, err := db.StoreEvent(ctx, event, input.TransactionID, authEventNIDs) if err != nil { return } @@ -131,7 +132,7 @@ func processRoomEvent( if stateAtEvent.BeforeStateSnapshotNID == 0 { // We haven't calculated a state for this event yet. // Lets calculate one. - err = calculateAndSetState(ctx, db, input, roomNID, &stateAtEvent, event.Event) + err = calculateAndSetState(ctx, db, input, roomNID, &stateAtEvent, event) if err != nil { return } @@ -144,7 +145,7 @@ func processRoomEvent( // Update the extremities of the event graph for the room return event.EventID(), updateLatestEvents( - ctx, db, ow, roomNID, stateAtEvent, event.Event, input.SendAsServer, input.TransactionID, + ctx, db, ow, roomNID, stateAtEvent, event, input.SendAsServer, input.TransactionID, ) } @@ -235,7 +236,8 @@ func processInviteEvent( return nil } - outputUpdates, err := updateToInviteMembership(updater, &input.Event.Event, nil) + event := input.Event.Unwrap() + outputUpdates, err := updateToInviteMembership(updater, &event, nil) if err != nil { return err } diff --git a/roomserver/query/query.go b/roomserver/query/query.go index b2b8828b3..b7cdf1507 100644 --- a/roomserver/query/query.go +++ b/roomserver/query/query.go @@ -715,6 +715,7 @@ func (r *RoomserverQueryAPI) QueryStateAndAuthChain( if err != nil { return err } + response.RoomVersion = roomVersion stateEvents, err := r.loadStateAtEventIDs(ctx, request.PrevEventIDs) if err != nil {