Handle transaction marshalling/unmarshalling within Dendrite

This commit is contained in:
Neil Alexander 2020-03-25 15:13:32 +00:00
parent 153c9924b0
commit c888ac2bd4
6 changed files with 46 additions and 9 deletions

View file

@ -15,6 +15,7 @@
package routing package routing
import ( import (
"encoding/json"
"net/http" "net/http"
"strconv" "strconv"
"time" "time"
@ -91,9 +92,14 @@ func Backfill(
} }
} }
var eventJSONs []json.RawMessage
for _, e := range evs {
eventJSONs = append(eventJSONs, e.JSON())
}
txn := gomatrixserverlib.Transaction{ txn := gomatrixserverlib.Transaction{
Origin: cfg.Matrix.ServerName, Origin: cfg.Matrix.ServerName,
PDUs: evs, PDUs: eventJSONs,
OriginServerTS: gomatrixserverlib.AsTimestamp(time.Now()), OriginServerTS: gomatrixserverlib.AsTimestamp(time.Now()),
} }

View file

@ -84,14 +84,38 @@ type txnReq struct {
} }
func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) { func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) {
var pdus []gomatrixserverlib.Event
for _, pdu := range t.PDUs {
var header struct {
roomID string `json:"room_id"`
}
if err := json.Unmarshal(pdu, &header); err != nil {
util.GetLogger(t.context).WithError(err).Warn("Transaction: Failed to extract room ID from event, skipping it.")
continue
}
verReq := api.QueryRoomVersionForRoomRequest{RoomID: header.roomID}
verRes := api.QueryRoomVersionForRoomResponse{}
if err := t.query.QueryRoomVersionForRoom(t.context, &verReq, &verRes); err != nil {
util.GetLogger(t.context).WithError(err).Warn("Transaction: Failed to query room version for event, skipping it.")
continue
}
event, err := gomatrixserverlib.NewEventFromUntrustedJSON(pdu, verRes.RoomVersion)
if err != nil {
util.GetLogger(t.context).WithError(err).Warn("Transaction: Failed to parse event JSON, skipping it.")
continue
}
pdus = append(pdus, event)
}
// Check the event signatures // Check the event signatures
if err := gomatrixserverlib.VerifyAllEventSignatures(t.context, t.PDUs, t.keys); err != nil { if err := gomatrixserverlib.VerifyAllEventSignatures(t.context, pdus, t.keys); err != nil {
return nil, err return nil, err
} }
// Process the events. // Process the events.
results := map[string]gomatrixserverlib.PDUResult{} results := map[string]gomatrixserverlib.PDUResult{}
for _, e := range t.PDUs { for _, e := range pdus {
err := t.processEvent(e) err := t.processEvent(e)
if err != nil { if err != nil {
// If the error is due to the event itself being bad then we skip // If the error is due to the event itself being bad then we skip

View file

@ -16,6 +16,7 @@ package queue
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"sync" "sync"
"time" "time"
@ -103,7 +104,7 @@ func (oq *destinationQueue) next() *gomatrixserverlib.Transaction {
} }
t := gomatrixserverlib.Transaction{ t := gomatrixserverlib.Transaction{
PDUs: []gomatrixserverlib.Event{}, PDUs: []json.RawMessage{},
EDUs: []gomatrixserverlib.EDU{}, EDUs: []gomatrixserverlib.EDU{},
} }
now := gomatrixserverlib.AsTimestamp(time.Now()) now := gomatrixserverlib.AsTimestamp(time.Now())
@ -119,7 +120,7 @@ func (oq *destinationQueue) next() *gomatrixserverlib.Transaction {
oq.lastTransactionIDs = []gomatrixserverlib.TransactionID{t.TransactionID} oq.lastTransactionIDs = []gomatrixserverlib.TransactionID{t.TransactionID}
for _, pdu := range oq.pendingEvents { for _, pdu := range oq.pendingEvents {
t.PDUs = append(t.PDUs, *pdu) t.PDUs = append(t.PDUs, (*pdu).JSON())
} }
oq.pendingEvents = nil oq.pendingEvents = nil
oq.sentCounter += len(t.PDUs) oq.sentCounter += len(t.PDUs)

2
go.mod
View file

@ -9,7 +9,7 @@ require (
github.com/matrix-org/go-http-js-libp2p v0.0.0-20200318135427-31631a9ef51f github.com/matrix-org/go-http-js-libp2p v0.0.0-20200318135427-31631a9ef51f
github.com/matrix-org/go-sqlite3-js v0.0.0-20200304164012-aa524245b658 github.com/matrix-org/go-sqlite3-js v0.0.0-20200304164012-aa524245b658
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26
github.com/matrix-org/gomatrixserverlib v0.0.0-20200325133651-ab73f35cb35c github.com/matrix-org/gomatrixserverlib v0.0.0-20200325145347-4c5d0cbc87b4
github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1 github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7
github.com/mattn/go-sqlite3 v2.0.2+incompatible github.com/mattn/go-sqlite3 v2.0.2+incompatible

2
go.sum
View file

@ -130,6 +130,8 @@ github.com/matrix-org/gomatrixserverlib v0.0.0-20200124100636-0c2ec91d1df5 h1:km
github.com/matrix-org/gomatrixserverlib v0.0.0-20200124100636-0c2ec91d1df5/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI= github.com/matrix-org/gomatrixserverlib v0.0.0-20200124100636-0c2ec91d1df5/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200325133651-ab73f35cb35c h1:vYMzfIA+7U9CTiZhmGXbXnzSZlGzE/HbyiDJT20yJT8= github.com/matrix-org/gomatrixserverlib v0.0.0-20200325133651-ab73f35cb35c h1:vYMzfIA+7U9CTiZhmGXbXnzSZlGzE/HbyiDJT20yJT8=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200325133651-ab73f35cb35c/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI= github.com/matrix-org/gomatrixserverlib v0.0.0-20200325133651-ab73f35cb35c/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200325145347-4c5d0cbc87b4 h1:fAR14tVvVNXS4MZf+DZSRA4G0+TkvN0Rb3isPG3+UxA=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200325145347-4c5d0cbc87b4/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI=
github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1 h1:osLoFdOy+ChQqVUn2PeTDETFftVkl4w9t/OW18g3lnk= github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1 h1:osLoFdOy+ChQqVUn2PeTDETFftVkl4w9t/OW18g3lnk=
github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1/go.mod h1:cXoYQIENbdWIQHt1SyCo6Bl3C3raHwJ0wgVrXHSqf+A= github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1/go.mod h1:cXoYQIENbdWIQHt1SyCo6Bl3C3raHwJ0wgVrXHSqf+A=
github.com/matrix-org/util v0.0.0-20171127121716-2e2df66af2f5 h1:W7l5CP4V7wPyPb4tYE11dbmeAOwtFQBTW0rf4OonOS8= github.com/matrix-org/util v0.0.0-20171127121716-2e2df66af2f5 h1:W7l5CP4V7wPyPb4tYE11dbmeAOwtFQBTW0rf4OonOS8=

View file

@ -386,16 +386,20 @@ func (r *messagesReq) backfill(fromEventIDs []string, limit int) ([]gomatrixserv
} }
for _, p := range txn.PDUs { for _, p := range txn.PDUs {
pdus = append(pdus, p.Headered(verRes.RoomVersion)) event, e := gomatrixserverlib.NewEventFromUntrustedJSON(p, verRes.RoomVersion)
if e != nil {
continue
}
pdus = append(pdus, event.Headered(verRes.RoomVersion))
} }
util.GetLogger(r.ctx).WithField("server", srvToBackfillFrom).WithField("new_events", len(pdus)).Info("Storing new events from backfill") util.GetLogger(r.ctx).WithField("server", srvToBackfillFrom).WithField("new_events", len(pdus)).Info("Storing new events from backfill")
// Store the events in the database, while marking them as unfit to show // Store the events in the database, while marking them as unfit to show
// up in responses to sync requests. // up in responses to sync requests.
for _, pdu := range pdus { for i := range pdus {
if _, err = r.db.WriteEvent( if _, err = r.db.WriteEvent(
r.ctx, r.ctx,
&pdu, &pdus[i],
[]gomatrixserverlib.HeaderedEvent{}, []gomatrixserverlib.HeaderedEvent{},
[]string{}, []string{},
[]string{}, []string{},