From c888ac2bd44407ae9f36a78832132b8f60c2c5af Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 25 Mar 2020 15:13:32 +0000 Subject: [PATCH] Handle transaction marshalling/unmarshalling within Dendrite --- federationapi/routing/backfill.go | 8 ++++++- federationapi/routing/send.go | 28 ++++++++++++++++++++-- federationsender/queue/destinationqueue.go | 5 ++-- go.mod | 2 +- go.sum | 2 ++ syncapi/routing/messages.go | 10 +++++--- 6 files changed, 46 insertions(+), 9 deletions(-) diff --git a/federationapi/routing/backfill.go b/federationapi/routing/backfill.go index a4bc3c67d..72ce0c669 100644 --- a/federationapi/routing/backfill.go +++ b/federationapi/routing/backfill.go @@ -15,6 +15,7 @@ package routing import ( + "encoding/json" "net/http" "strconv" "time" @@ -91,9 +92,14 @@ func Backfill( } } + var eventJSONs []json.RawMessage + for _, e := range evs { + eventJSONs = append(eventJSONs, e.JSON()) + } + txn := gomatrixserverlib.Transaction{ Origin: cfg.Matrix.ServerName, - PDUs: evs, + PDUs: eventJSONs, OriginServerTS: gomatrixserverlib.AsTimestamp(time.Now()), } diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index 709ac0a3b..55899e61d 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -84,14 +84,38 @@ type txnReq struct { } func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) { + var pdus []gomatrixserverlib.Event + + for _, pdu := range t.PDUs { + var header struct { + roomID string `json:"room_id"` + } + if err := json.Unmarshal(pdu, &header); err != nil { + util.GetLogger(t.context).WithError(err).Warn("Transaction: Failed to extract room ID from event, skipping it.") + continue + } + verReq := api.QueryRoomVersionForRoomRequest{RoomID: header.roomID} + verRes := api.QueryRoomVersionForRoomResponse{} + if err := t.query.QueryRoomVersionForRoom(t.context, &verReq, &verRes); err != nil { + util.GetLogger(t.context).WithError(err).Warn("Transaction: Failed to query room version for event, skipping it.") + continue + } + event, err := gomatrixserverlib.NewEventFromUntrustedJSON(pdu, verRes.RoomVersion) + if err != nil { + util.GetLogger(t.context).WithError(err).Warn("Transaction: Failed to parse event JSON, skipping it.") + continue + } + pdus = append(pdus, event) + } + // Check the event signatures - if err := gomatrixserverlib.VerifyAllEventSignatures(t.context, t.PDUs, t.keys); err != nil { + if err := gomatrixserverlib.VerifyAllEventSignatures(t.context, pdus, t.keys); err != nil { return nil, err } // Process the events. results := map[string]gomatrixserverlib.PDUResult{} - for _, e := range t.PDUs { + for _, e := range pdus { err := t.processEvent(e) if err != nil { // If the error is due to the event itself being bad then we skip diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index f412cb19b..5397611cd 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -16,6 +16,7 @@ package queue import ( "context" + "encoding/json" "fmt" "sync" "time" @@ -103,7 +104,7 @@ func (oq *destinationQueue) next() *gomatrixserverlib.Transaction { } t := gomatrixserverlib.Transaction{ - PDUs: []gomatrixserverlib.Event{}, + PDUs: []json.RawMessage{}, EDUs: []gomatrixserverlib.EDU{}, } now := gomatrixserverlib.AsTimestamp(time.Now()) @@ -119,7 +120,7 @@ func (oq *destinationQueue) next() *gomatrixserverlib.Transaction { oq.lastTransactionIDs = []gomatrixserverlib.TransactionID{t.TransactionID} for _, pdu := range oq.pendingEvents { - t.PDUs = append(t.PDUs, *pdu) + t.PDUs = append(t.PDUs, (*pdu).JSON()) } oq.pendingEvents = nil oq.sentCounter += len(t.PDUs) diff --git a/go.mod b/go.mod index a44e7c8aa..e524fd15d 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/matrix-org/go-http-js-libp2p v0.0.0-20200318135427-31631a9ef51f github.com/matrix-org/go-sqlite3-js v0.0.0-20200304164012-aa524245b658 github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 - github.com/matrix-org/gomatrixserverlib v0.0.0-20200325133651-ab73f35cb35c + github.com/matrix-org/gomatrixserverlib v0.0.0-20200325145347-4c5d0cbc87b4 github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1 github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 github.com/mattn/go-sqlite3 v2.0.2+incompatible diff --git a/go.sum b/go.sum index e9ed63830..d76a6cfa3 100644 --- a/go.sum +++ b/go.sum @@ -130,6 +130,8 @@ github.com/matrix-org/gomatrixserverlib v0.0.0-20200124100636-0c2ec91d1df5 h1:km github.com/matrix-org/gomatrixserverlib v0.0.0-20200124100636-0c2ec91d1df5/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI= github.com/matrix-org/gomatrixserverlib v0.0.0-20200325133651-ab73f35cb35c h1:vYMzfIA+7U9CTiZhmGXbXnzSZlGzE/HbyiDJT20yJT8= github.com/matrix-org/gomatrixserverlib v0.0.0-20200325133651-ab73f35cb35c/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI= +github.com/matrix-org/gomatrixserverlib v0.0.0-20200325145347-4c5d0cbc87b4 h1:fAR14tVvVNXS4MZf+DZSRA4G0+TkvN0Rb3isPG3+UxA= +github.com/matrix-org/gomatrixserverlib v0.0.0-20200325145347-4c5d0cbc87b4/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI= github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1 h1:osLoFdOy+ChQqVUn2PeTDETFftVkl4w9t/OW18g3lnk= github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1/go.mod h1:cXoYQIENbdWIQHt1SyCo6Bl3C3raHwJ0wgVrXHSqf+A= github.com/matrix-org/util v0.0.0-20171127121716-2e2df66af2f5 h1:W7l5CP4V7wPyPb4tYE11dbmeAOwtFQBTW0rf4OonOS8= diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go index 4e064f46e..dbe6d7dbf 100644 --- a/syncapi/routing/messages.go +++ b/syncapi/routing/messages.go @@ -386,16 +386,20 @@ func (r *messagesReq) backfill(fromEventIDs []string, limit int) ([]gomatrixserv } for _, p := range txn.PDUs { - pdus = append(pdus, p.Headered(verRes.RoomVersion)) + event, e := gomatrixserverlib.NewEventFromUntrustedJSON(p, verRes.RoomVersion) + if e != nil { + continue + } + pdus = append(pdus, event.Headered(verRes.RoomVersion)) } util.GetLogger(r.ctx).WithField("server", srvToBackfillFrom).WithField("new_events", len(pdus)).Info("Storing new events from backfill") // Store the events in the database, while marking them as unfit to show // up in responses to sync requests. - for _, pdu := range pdus { + for i := range pdus { if _, err = r.db.WriteEvent( r.ctx, - &pdu, + &pdus[i], []gomatrixserverlib.HeaderedEvent{}, []string{}, []string{},