diff --git a/currentstateserver/consumers/roomserver.go b/currentstateserver/consumers/roomserver.go index 9e2694b0c..81878c6dc 100644 --- a/currentstateserver/consumers/roomserver.go +++ b/currentstateserver/consumers/roomserver.go @@ -61,6 +61,8 @@ func (c *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { return c.onNewRoomEvent(context.TODO(), *output.NewRoomEvent) case api.OutputTypeNewInviteEvent: case api.OutputTypeRetireInviteEvent: + case api.OutputTypeRedactedEvent: + return c.onRedactEvent(context.Background(), *output.RedactedEvent) default: log.WithField("type", output.Type).Debug( "roomserver output log: ignoring unknown output type", @@ -105,6 +107,12 @@ func (c *OutputRoomEventConsumer) onNewRoomEvent( return nil } +func (c *OutputRoomEventConsumer) onRedactEvent( + ctx context.Context, msg api.OutputRedactedEvent, +) error { + return c.db.RedactEvent(ctx, msg.RedactedEventID, msg.RedactedBecause) +} + // Start consuming from room servers func (c *OutputRoomEventConsumer) Start() error { return c.rsConsumer.Start() diff --git a/currentstateserver/storage/interface.go b/currentstateserver/storage/interface.go index 04636bafb..0e95cde87 100644 --- a/currentstateserver/storage/interface.go +++ b/currentstateserver/storage/interface.go @@ -35,4 +35,6 @@ type Database interface { // GetBulkStateContent returns all state events which match a given room ID and a given state key tuple. Both must be satisfied for a match. // If a tuple has the StateKey of '*' and allowWildcards=true then all state events with the EventType should be returned. GetBulkStateContent(ctx context.Context, roomIDs []string, tuples []gomatrixserverlib.StateKeyTuple, allowWildcards bool) ([]tables.StrippedEvent, error) + // Redact a state event + RedactEvent(ctx context.Context, redactedEventID string, redactedBecause gomatrixserverlib.HeaderedEvent) error } diff --git a/currentstateserver/storage/postgres/current_room_state_table.go b/currentstateserver/storage/postgres/current_room_state_table.go index bd2e075f0..79c9f9670 100644 --- a/currentstateserver/storage/postgres/current_room_state_table.go +++ b/currentstateserver/storage/postgres/current_room_state_table.go @@ -189,7 +189,6 @@ func (s *currentRoomStateStatements) SelectEventsWithEventIDs( if err := rows.Scan(&eventBytes); err != nil { return nil, err } - // TODO: Handle redacted events var ev gomatrixserverlib.HeaderedEvent if err := json.Unmarshal(eventBytes, &ev); err != nil { return nil, err diff --git a/currentstateserver/storage/shared/storage.go b/currentstateserver/storage/shared/storage.go index cd59ac129..362dafe96 100644 --- a/currentstateserver/storage/shared/storage.go +++ b/currentstateserver/storage/shared/storage.go @@ -17,10 +17,13 @@ package shared import ( "context" "database/sql" + "fmt" "github.com/matrix-org/dendrite/currentstateserver/storage/tables" + "github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" ) type Database struct { @@ -36,6 +39,28 @@ func (d *Database) GetBulkStateContent(ctx context.Context, roomIDs []string, tu return d.CurrentRoomState.SelectBulkStateContent(ctx, roomIDs, tuples, allowWildcards) } +func (d *Database) RedactEvent(ctx context.Context, redactedEventID string, redactedBecause gomatrixserverlib.HeaderedEvent) error { + events, err := d.CurrentRoomState.SelectEventsWithEventIDs(ctx, nil, []string{redactedEventID}) + if err != nil { + return err + } + if len(events) != 1 { + // this should never happen but is non-fatal + util.GetLogger(ctx).WithField("redacted_event_id", redactedEventID).WithField("redaction_event_id", redactedBecause.EventID()).Warnf( + "RedactEvent: missing redacted event", + ) + return nil + } + redactionEvent := redactedBecause.Unwrap() + eventBeingRedacted := events[0].Unwrap() + redactedEvent, err := eventutil.RedactEvent(&redactionEvent, &eventBeingRedacted) + if err != nil { + return fmt.Errorf("RedactEvent failed: %w", err) + } + // replace the state event with a redacted version of itself + return d.StoreStateEvents(ctx, []gomatrixserverlib.HeaderedEvent{redactedEvent.Headered(redactedBecause.RoomVersion)}, []string{redactedEventID}) +} + func (d *Database) StoreStateEvents(ctx context.Context, addStateEvents []gomatrixserverlib.HeaderedEvent, removeStateEventIDs []string) error { return sqlutil.WithTransaction(d.DB, func(txn *sql.Tx) error { diff --git a/currentstateserver/storage/sqlite3/current_room_state_table.go b/currentstateserver/storage/sqlite3/current_room_state_table.go index 95185d9a8..8fac4f352 100644 --- a/currentstateserver/storage/sqlite3/current_room_state_table.go +++ b/currentstateserver/storage/sqlite3/current_room_state_table.go @@ -162,7 +162,13 @@ func (s *currentRoomStateStatements) SelectEventsWithEventIDs( iEventIDs[k] = v } query := strings.Replace(selectEventsWithEventIDsSQL, "($1)", sqlutil.QueryVariadic(len(iEventIDs)), 1) - rows, err := txn.QueryContext(ctx, query, iEventIDs...) + var rows *sql.Rows + var err error + if txn != nil { + rows, err = txn.QueryContext(ctx, query, iEventIDs...) + } else { + rows, err = s.db.QueryContext(ctx, query, iEventIDs...) + } if err != nil { return nil, err } @@ -173,7 +179,6 @@ func (s *currentRoomStateStatements) SelectEventsWithEventIDs( if err := rows.Scan(&eventBytes); err != nil { return nil, err } - // TODO: Handle redacted events var ev gomatrixserverlib.HeaderedEvent if err := json.Unmarshal(eventBytes, &ev); err != nil { return nil, err diff --git a/currentstateserver/storage/tables/interface.go b/currentstateserver/storage/tables/interface.go index 8ba4e4eb9..12884b684 100644 --- a/currentstateserver/storage/tables/interface.go +++ b/currentstateserver/storage/tables/interface.go @@ -24,6 +24,8 @@ import ( type CurrentRoomState interface { SelectStateEvent(ctx context.Context, roomID, evType, stateKey string) (*gomatrixserverlib.HeaderedEvent, error) + // SelectEventsWithEventIDs returns the events for the given event IDs. If the event(s) are missing, they are not returned + // and no error is returned. SelectEventsWithEventIDs(ctx context.Context, txn *sql.Tx, eventIDs []string) ([]gomatrixserverlib.HeaderedEvent, error) // UpsertRoomState stores the given event in the database, along with an extracted piece of content. // The piece of content will vary depending on the event type, and table implementations may use this information to optimise diff --git a/go.mod b/go.mod index 4fec1bc9a..00b9a9c6a 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,7 @@ require ( github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4 github.com/matrix-org/go-sqlite3-js v0.0.0-20200522092705-bc8506ccbcf3 github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 - github.com/matrix-org/gomatrixserverlib v0.0.0-20200630110352-4948932681fe + github.com/matrix-org/gomatrixserverlib v0.0.0-20200707103800-7470b03f069b github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 github.com/mattn/go-sqlite3 v2.0.2+incompatible diff --git a/go.sum b/go.sum index e89b93dfe..dcc96c827 100644 --- a/go.sum +++ b/go.sum @@ -421,8 +421,8 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20200522092705-bc8506ccbcf3 h1:Yb+Wlf github.com/matrix-org/go-sqlite3-js v0.0.0-20200522092705-bc8506ccbcf3/go.mod h1:e+cg2q7C7yE5QnAXgzo512tgFh1RbQLC0+jozuegKgo= github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 h1:Hr3zjRsq2bhrnp3Ky1qgx/fzCtCALOoGYylh2tpS9K4= github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0= -github.com/matrix-org/gomatrixserverlib v0.0.0-20200630110352-4948932681fe h1:rCjG+azihYsO+EIdm//Zx5gQ7hzeJVraeSukLsW1Mic= -github.com/matrix-org/gomatrixserverlib v0.0.0-20200630110352-4948932681fe/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= +github.com/matrix-org/gomatrixserverlib v0.0.0-20200707103800-7470b03f069b h1:g1ueoPHI5tpafw/QysVfDw43FwRTPqz8sT+MZbK54yk= +github.com/matrix-org/gomatrixserverlib v0.0.0-20200707103800-7470b03f069b/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f h1:pRz4VTiRCO4zPlEMc3ESdUOcW4PXHH4Kj+YDz1XyE+Y= github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f/go.mod h1:y0oDTjZDv5SM9a2rp3bl+CU+bvTRINQsdb7YlDql5Go= github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 h1:ntrLa/8xVzeSs8vHFHK25k0C+NV74sYMJnNSg5NoSRo= diff --git a/internal/eventutil/events.go b/internal/eventutil/events.go index d56f5be8f..e3b8f3d31 100644 --- a/internal/eventutil/events.go +++ b/internal/eventutil/events.go @@ -149,3 +149,18 @@ func truncateAuthAndPrevEvents(auth, prev []gomatrixserverlib.EventReference) ( } return } + +// RedactEvent redacts the given event and sets the unsigned field appropriately. This should be used by +// downstream components to the roomserver when an OutputTypeRedactedEvent occurs. +func RedactEvent(redactionEvent, redactedEvent *gomatrixserverlib.Event) (*gomatrixserverlib.Event, error) { + // sanity check + if redactionEvent.Type() != gomatrixserverlib.MRoomRedaction { + return nil, fmt.Errorf("RedactEvent: redactionEvent isn't a redaction event, is '%s'", redactionEvent.Type()) + } + r := redactedEvent.Redact() + err := r.SetUnsignedField("redacted_because", redactionEvent) + if err != nil { + return nil, err + } + return &r, nil +} diff --git a/roomserver/api/output.go b/roomserver/api/output.go index 2bbd97af8..b25353ae4 100644 --- a/roomserver/api/output.go +++ b/roomserver/api/output.go @@ -28,6 +28,8 @@ const ( OutputTypeNewInviteEvent OutputType = "new_invite_event" // OutputTypeRetireInviteEvent indicates that the event is an OutputRetireInviteEvent OutputTypeRetireInviteEvent OutputType = "retire_invite_event" + // OutputTypeRedactedEvent indicates that the event is an OutputRedactedEvent + OutputTypeRedactedEvent OutputType = "redacted_event" ) // An OutputEvent is an entry in the roomserver output kafka log. @@ -41,6 +43,8 @@ type OutputEvent struct { NewInviteEvent *OutputNewInviteEvent `json:"new_invite_event,omitempty"` // The content of event with type OutputTypeRetireInviteEvent RetireInviteEvent *OutputRetireInviteEvent `json:"retire_invite_event,omitempty"` + // The content of event with type OutputTypeRedactedEvent + RedactedEvent *OutputRedactedEvent `json:"redacted_event,omitempty"` } // An OutputNewRoomEvent is written when the roomserver receives a new event. @@ -165,3 +169,13 @@ type OutputRetireInviteEvent struct { // "leave" or "ban". Membership string } + +// An OutputRedactedEvent is written whenever a redaction has been /validated/. +// Downstream components MUST redact the given event ID if they have stored the +// event JSON. It is guaranteed that this event ID has been seen before. +type OutputRedactedEvent struct { + // The event ID that was redacted + RedactedEventID string + // The value of `unsigned.redacted_because` - the redaction event itself + RedactedBecause gomatrixserverlib.HeaderedEvent +} diff --git a/roomserver/internal/input_events.go b/roomserver/internal/input_events.go index ae57f2e77..04538cf69 100644 --- a/roomserver/internal/input_events.go +++ b/roomserver/internal/input_events.go @@ -19,6 +19,7 @@ package internal import ( "context" + "github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/state" "github.com/matrix-org/dendrite/roomserver/types" @@ -31,6 +32,8 @@ import ( // TODO(#375): This should be rewritten to allow concurrent calls. The // difficulty is in ensuring that we correctly annotate events with the correct // state deltas when sending to kafka streams +// TODO: Break up function - we should probably do transaction ID checks before calling this. +// nolint:gocyclo func (r *RoomserverInternalAPI) processRoomEvent( ctx context.Context, input api.InputRoomEvent, @@ -60,10 +63,18 @@ func (r *RoomserverInternalAPI) processRoomEvent( } // Store the event. - roomNID, stateAtEvent, err := r.DB.StoreEvent(ctx, event, input.TransactionID, authEventNIDs) + roomNID, stateAtEvent, redactionEvent, redactedEventID, err := r.DB.StoreEvent(ctx, event, input.TransactionID, authEventNIDs) if err != nil { return } + // if storing this event results in it being redacted then do so. + if redactedEventID == event.EventID() { + r, rerr := eventutil.RedactEvent(redactionEvent, &event) + if rerr != nil { + return "", rerr + } + event = *r + } // For outliers we can stop after we've stored the event itself as it // doesn't have any associated state to store and we don't need to @@ -97,6 +108,25 @@ func (r *RoomserverInternalAPI) processRoomEvent( return } + // processing this event resulted in an event (which may not be the one we're processing) + // being redacted. We are guaranteed to have both sides (the redaction/redacted event), + // so notify downstream components to redact this event - they should have it if they've + // been tracking our output log. + if redactedEventID != "" { + err = r.WriteOutputEvents(event.RoomID(), []api.OutputEvent{ + { + Type: api.OutputTypeRedactedEvent, + RedactedEvent: &api.OutputRedactedEvent{ + RedactedEventID: redactedEventID, + RedactedBecause: redactionEvent.Headered(headered.RoomVersion), + }, + }, + }) + if err != nil { + return + } + } + // Update the extremities of the event graph for the room return event.EventID(), nil } diff --git a/roomserver/internal/query.go b/roomserver/internal/query.go index 7fa3247a6..ca4af0b25 100644 --- a/roomserver/internal/query.go +++ b/roomserver/internal/query.go @@ -880,11 +880,24 @@ func persistEvents(ctx context.Context, db storage.Database, events []gomatrixse i++ } var stateAtEvent types.StateAtEvent - roomNID, stateAtEvent, err = db.StoreEvent(ctx, ev.Unwrap(), nil, authNids) + var redactedEventID string + var redactionEvent *gomatrixserverlib.Event + roomNID, stateAtEvent, redactionEvent, redactedEventID, err = db.StoreEvent(ctx, ev.Unwrap(), nil, authNids) if err != nil { logrus.WithError(err).WithField("event_id", ev.EventID()).Error("Failed to persist event") continue } + // If storing this event results in it being redacted, then do so. + // It's also possible for this event to be a redaction which results in another event being + // redacted, which we don't care about since we aren't returning it in this backfill. + if redactedEventID == ev.EventID() { + ev = ev.Redact().Headered(ev.RoomVersion) + err = ev.SetUnsignedField("redacted_because", redactionEvent) + if err != nil { + logrus.WithError(err).WithField("event_id", ev.EventID()).Error("Failed to set unsigned field") + continue + } + } backfilledEventMap[ev.EventID()] = types.Event{ EventNID: stateAtEvent.StateEntry.EventNID, Event: ev.Unwrap(), diff --git a/roomserver/roomserver_test.go b/roomserver/roomserver_test.go new file mode 100644 index 000000000..d553c5b79 --- /dev/null +++ b/roomserver/roomserver_test.go @@ -0,0 +1,173 @@ +package roomserver + +import ( + "context" + "encoding/json" + "fmt" + "os" + "reflect" + "testing" + + "github.com/Shopify/sarama" + "github.com/matrix-org/dendrite/internal/caching" + "github.com/matrix-org/dendrite/internal/config" + "github.com/matrix-org/dendrite/internal/setup" + "github.com/matrix-org/dendrite/internal/test" + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/gomatrixserverlib" +) + +const ( + testOrigin = gomatrixserverlib.ServerName("kaer.morhen") + // we have to use an on-disk DB because we open multiple connections due to the *Updater structs. + // Using :memory: results in a brand new DB for each open connection, and sharing memory via + // ?cache=shared just allows read-only sharing, so writes to the database on other connections are lost. + roomserverDBFileURI = "file:roomserver_test.db" + roomserverDBFilePath = "./roomserver_test.db" +) + +var ( + ctx = context.Background() +) + +type dummyProducer struct { + topic string + producedMessages []*api.OutputEvent +} + +// SendMessage produces a given message, and returns only when it either has +// succeeded or failed to produce. It will return the partition and the offset +// of the produced message, or an error if the message failed to produce. +func (p *dummyProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) { + if msg.Topic != p.topic { + return 0, 0, nil + } + be := msg.Value.(sarama.ByteEncoder) + b := json.RawMessage(be) + fmt.Println("SENDING >>>>>>>> ", string(b)) + var out api.OutputEvent + err = json.Unmarshal(b, &out) + if err != nil { + return 0, 0, err + } + p.producedMessages = append(p.producedMessages, &out) + return 0, 0, nil +} + +// SendMessages produces a given set of messages, and returns only when all +// messages in the set have either succeeded or failed. Note that messages +// can succeed and fail individually; if some succeed and some fail, +// SendMessages will return an error. +func (p *dummyProducer) SendMessages(msgs []*sarama.ProducerMessage) error { + for _, m := range msgs { + p.SendMessage(m) + } + return nil +} + +// Close shuts down the producer and waits for any buffered messages to be +// flushed. You must call this function before a producer object passes out of +// scope, as it may otherwise leak memory. You must call this before calling +// Close on the underlying client. +func (p *dummyProducer) Close() error { + return nil +} + +func deleteDatabase() { + err := os.Remove(roomserverDBFilePath) + if err != nil { + fmt.Printf("failed to delete database %s: %s\n", roomserverDBFilePath, err) + } +} + +func mustLoadEvents(t *testing.T, ver gomatrixserverlib.RoomVersion, events []json.RawMessage) []gomatrixserverlib.HeaderedEvent { + hs := make([]gomatrixserverlib.HeaderedEvent, len(events)) + for i := range events { + e, err := gomatrixserverlib.NewEventFromTrustedJSON(events[i], false, ver) + if err != nil { + t.Fatalf("cannot load test data: " + err.Error()) + } + h := e.Headered(ver) + hs[i] = h + } + return hs +} + +func mustSendEvents(t *testing.T, ver gomatrixserverlib.RoomVersion, events []json.RawMessage) (api.RoomserverInternalAPI, *dummyProducer, []gomatrixserverlib.HeaderedEvent) { + cfg := &config.Dendrite{} + cfg.Database.RoomServer = roomserverDBFileURI + cfg.Kafka.Topics.OutputRoomEvent = "output_room_event" + cfg.Matrix.ServerName = testOrigin + cfg.Kafka.UseNaffka = true + dp := &dummyProducer{ + topic: string(cfg.Kafka.Topics.OutputRoomEvent), + } + cache, err := caching.NewInMemoryLRUCache(true) + if err != nil { + t.Fatalf("failed to make caches: %s", err) + } + base := &setup.BaseDendrite{ + KafkaProducer: dp, + Caches: cache, + Cfg: cfg, + } + + rsAPI := NewInternalAPI(base, &test.NopJSONVerifier{}, nil) + hevents := mustLoadEvents(t, ver, events) + _, err = api.SendEvents(ctx, rsAPI, hevents, testOrigin, nil) + if err != nil { + t.Errorf("failed to SendEvents: %s", err) + } + return rsAPI, dp, hevents +} + +func TestOutputRedactedEvent(t *testing.T) { + redactionEvents := []json.RawMessage{ + // create event + []byte(`{"auth_events":[],"content":{"creator":"@userid:kaer.morhen"},"depth":0,"event_id":"$N4us6vqqq3RjvpKd:kaer.morhen","hashes":{"sha256":"WTdrCn/YsiounXcJPsLP8xT0ZjHiO5Ov0NvXYmK2onE"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[],"prev_state":[],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"9+5JcpaN5b5KlHYHGp6r+GoNDH98lbfzGYwjfxensa5C5D/bDACaYnMDLnhwsHOE5nxgI+jT/GV271pz6PMSBQ"}},"state_key":"","type":"m.room.create"}`), + // join event + []byte(`{"auth_events":[["$N4us6vqqq3RjvpKd:kaer.morhen",{"sha256":"SylirfgfXFhscZL7p10NmOa1nFFEckiwz0lAideQMIM"}]],"content":{"membership":"join"},"depth":1,"event_id":"$6sUiGPQ0a3tqYGKo:kaer.morhen","hashes":{"sha256":"eYVBC7RO+FlxRyW1aXYf/ad4Dzi7T93tArdGw3r4RwQ"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$N4us6vqqq3RjvpKd:kaer.morhen",{"sha256":"SylirfgfXFhscZL7p10NmOa1nFFEckiwz0lAideQMIM"}]],"prev_state":[],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"tiDBTPFa53YMfHiupX3vSRE/ZcCiCjmGt7gDpIpDpwZapeays5Vqqcqb7KiywrDldpTkrrdJBAw2jXcq6ZyhDw"}},"state_key":"@userid:kaer.morhen","type":"m.room.member"}`), + // room name + []byte(`{"auth_events":[["$N4us6vqqq3RjvpKd:kaer.morhen",{"sha256":"SylirfgfXFhscZL7p10NmOa1nFFEckiwz0lAideQMIM"}],["$6sUiGPQ0a3tqYGKo:kaer.morhen",{"sha256":"IS4HSMqpqVUGh1Z3qgC99YcaizjCoO4yFhYYe8j53IE"}]],"content":{"name":"My Room Name"},"depth":2,"event_id":"$VC1zZ9YWwuUbSNHD:kaer.morhen","hashes":{"sha256":"bpqTkfLx6KHzWz7/wwpsXnXwJWEGW14aV63ffexzDFg"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$6sUiGPQ0a3tqYGKo:kaer.morhen",{"sha256":"IS4HSMqpqVUGh1Z3qgC99YcaizjCoO4yFhYYe8j53IE"}]],"prev_state":[],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"mhJZ3X4bAKrF/T0mtPf1K2Tmls0h6xGY1IPDpJ/SScQBqDlu3HQR2BPa7emqj5bViyLTWVNh+ZCpzx/6STTrAg"}},"state_key":"","type":"m.room.name"}`), + // redact room name + []byte(`{"auth_events":[["$N4us6vqqq3RjvpKd:kaer.morhen",{"sha256":"SylirfgfXFhscZL7p10NmOa1nFFEckiwz0lAideQMIM"}],["$6sUiGPQ0a3tqYGKo:kaer.morhen",{"sha256":"IS4HSMqpqVUGh1Z3qgC99YcaizjCoO4yFhYYe8j53IE"}]],"content":{"reason":"Spamming"},"depth":3,"event_id":"$tJI0pE3b8u9UMYpT:kaer.morhen","hashes":{"sha256":"/3TStqa5SQqYaEtl7ajEvSRvu6d12MMKfICUzrBpd2Q"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$VC1zZ9YWwuUbSNHD:kaer.morhen",{"sha256":"+l8cNa7syvm0EF7CAmQRlYknLEMjivnI4FLhB/TUBEY"}]],"redacts":"$VC1zZ9YWwuUbSNHD:kaer.morhen","room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"QBOh+amf0vTJbm6+9VwAcR9uJviBIor2KON0Y7+EyQx5YbUZEzW1HPeJxarLIHBcxMzgOVzjuM+StzjbUgDzAg"}},"type":"m.room.redaction"}`), + // message + []byte(`{"auth_events":[["$N4us6vqqq3RjvpKd:kaer.morhen",{"sha256":"SylirfgfXFhscZL7p10NmOa1nFFEckiwz0lAideQMIM"}],["$6sUiGPQ0a3tqYGKo:kaer.morhen",{"sha256":"IS4HSMqpqVUGh1Z3qgC99YcaizjCoO4yFhYYe8j53IE"}]],"content":{"body":"Test Message"},"depth":4,"event_id":"$o8KHsgSIYbJrddnd:kaer.morhen","hashes":{"sha256":"IE/rGVlKOpiGWeIo887g1CK1drYqcWDZhL6THZHkJ1c"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$tJI0pE3b8u9UMYpT:kaer.morhen",{"sha256":"zvmwyXuDox7jpA16JRH6Fc1zbfQht2tpkBbMTUOi3Jw"}]],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"/3z+pJjiJXWhwfqIEzmNksvBHCoXTktK/y0rRuWJXw6i1+ygRG/suDCKhFuuz6gPapRmEMPVILi2mJqHHXPKAg"}},"type":"m.room.message"}`), + // redact previous message + []byte(`{"auth_events":[["$N4us6vqqq3RjvpKd:kaer.morhen",{"sha256":"SylirfgfXFhscZL7p10NmOa1nFFEckiwz0lAideQMIM"}],["$6sUiGPQ0a3tqYGKo:kaer.morhen",{"sha256":"IS4HSMqpqVUGh1Z3qgC99YcaizjCoO4yFhYYe8j53IE"}]],"content":{"reason":"Spamming more"},"depth":5,"event_id":"$UpsE8belb2gJItJG:kaer.morhen","hashes":{"sha256":"zU8PWJOld/I7OtjdpltFSKC+DMNm2ZyEXAHcprsafD0"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$o8KHsgSIYbJrddnd:kaer.morhen",{"sha256":"UgjMuCFXH4warIjKuwlRq9zZ6dSJrZWCd+CkqtgLSHM"}]],"redacts":"$o8KHsgSIYbJrddnd:kaer.morhen","room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"zxFGr/7aGOzqOEN6zRNrBpFkkMnfGFPbCteYL33wC+PycBPIK+2WRa5qlAR2+lcLiK3HjIzwRYkKNsVFTqvRAw"}},"type":"m.room.redaction"}`), + } + var redactedOutputs []api.OutputEvent + deleteDatabase() + _, producer, hevents := mustSendEvents(t, gomatrixserverlib.RoomVersionV1, redactionEvents) + defer deleteDatabase() + for _, msg := range producer.producedMessages { + if msg.Type == api.OutputTypeRedactedEvent { + redactedOutputs = append(redactedOutputs, *msg) + } + } + wantRedactedOutputs := []api.OutputEvent{ + { + Type: api.OutputTypeRedactedEvent, + RedactedEvent: &api.OutputRedactedEvent{ + RedactedEventID: hevents[2].EventID(), + RedactedBecause: hevents[3], + }, + }, + { + Type: api.OutputTypeRedactedEvent, + RedactedEvent: &api.OutputRedactedEvent{ + RedactedEventID: hevents[4].EventID(), + RedactedBecause: hevents[5], + }, + }, + } + t.Logf("redactedOutputs: %+v", redactedOutputs) + if len(wantRedactedOutputs) != len(redactedOutputs) { + t.Fatalf("Got %d redacted events, want %d", len(redactedOutputs), len(wantRedactedOutputs)) + } + for i := 0; i < len(wantRedactedOutputs); i++ { + if !reflect.DeepEqual(*redactedOutputs[i].RedactedEvent, *wantRedactedOutputs[i].RedactedEvent) { + t.Errorf("OutputRedactionEvent %d: wrong event got:\n%+v want:\n%+v", i+1, redactedOutputs[i].RedactedEvent, wantRedactedOutputs[i].RedactedEvent) + } + } +} diff --git a/roomserver/storage/interface.go b/roomserver/storage/interface.go index 5c916f294..afe5bcb1f 100644 --- a/roomserver/storage/interface.go +++ b/roomserver/storage/interface.go @@ -63,8 +63,10 @@ type Database interface { SnapshotNIDFromEventID(ctx context.Context, eventID string) (types.StateSnapshotNID, error) // Look up a room version from the room NID. GetRoomVersionForRoomNID(ctx context.Context, roomNID types.RoomNID) (gomatrixserverlib.RoomVersion, error) - // Stores a matrix room event in the database - StoreEvent(ctx context.Context, event gomatrixserverlib.Event, txnAndSessionID *api.TransactionID, authEventNIDs []types.EventNID) (types.RoomNID, types.StateAtEvent, error) + // Stores a matrix room event in the database. Returns the room NID, the state snapshot and the redacted event ID if any, or an error. + StoreEvent( + ctx context.Context, event gomatrixserverlib.Event, txnAndSessionID *api.TransactionID, authEventNIDs []types.EventNID, + ) (types.RoomNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error) // Look up the state entries for a list of string event IDs // Returns an error if the there is an error talking to the database // Returns a types.MissingEventError if the event IDs aren't in the database. diff --git a/roomserver/storage/postgres/redactions_table.go b/roomserver/storage/postgres/redactions_table.go index fa0f8713e..289e1320f 100644 --- a/roomserver/storage/postgres/redactions_table.go +++ b/roomserver/storage/postgres/redactions_table.go @@ -41,11 +41,11 @@ const insertRedactionSQL = "" + "INSERT INTO roomserver_redactions (redaction_event_id, redacts_event_id, validated)" + " VALUES ($1, $2, $3)" -const selectRedactedEventSQL = "" + +const selectRedactionInfoByRedactionEventIDSQL = "" + "SELECT redaction_event_id, redacts_event_id, validated FROM roomserver_redactions" + " WHERE redaction_event_id = $1" -const selectRedactionEventSQL = "" + +const selectRedactionInfoByEventBeingRedactedSQL = "" + "SELECT redaction_event_id, redacts_event_id, validated FROM roomserver_redactions" + " WHERE redacts_event_id = $1" @@ -53,10 +53,10 @@ const markRedactionValidatedSQL = "" + " UPDATE roomserver_redactions SET validated = $2 WHERE redaction_event_id = $1" type redactionStatements struct { - insertRedactionStmt *sql.Stmt - selectRedactedEventStmt *sql.Stmt - selectRedactionEventStmt *sql.Stmt - markRedactionValidatedStmt *sql.Stmt + insertRedactionStmt *sql.Stmt + selectRedactionInfoByRedactionEventIDStmt *sql.Stmt + selectRedactionInfoByEventBeingRedactedStmt *sql.Stmt + markRedactionValidatedStmt *sql.Stmt } func NewPostgresRedactionsTable(db *sql.DB) (tables.Redactions, error) { @@ -68,8 +68,8 @@ func NewPostgresRedactionsTable(db *sql.DB) (tables.Redactions, error) { return s, shared.StatementList{ {&s.insertRedactionStmt, insertRedactionSQL}, - {&s.selectRedactedEventStmt, selectRedactedEventSQL}, - {&s.selectRedactionEventStmt, selectRedactionEventSQL}, + {&s.selectRedactionInfoByRedactionEventIDStmt, selectRedactionInfoByRedactionEventIDSQL}, + {&s.selectRedactionInfoByEventBeingRedactedStmt, selectRedactionInfoByEventBeingRedactedSQL}, {&s.markRedactionValidatedStmt, markRedactionValidatedSQL}, }.Prepare(db) } @@ -82,32 +82,32 @@ func (s *redactionStatements) InsertRedaction( return err } -func (s *redactionStatements) SelectRedactedEvent( +func (s *redactionStatements) SelectRedactionInfoByRedactionEventID( ctx context.Context, txn *sql.Tx, redactionEventID string, ) (info *tables.RedactionInfo, err error) { info = &tables.RedactionInfo{} - stmt := sqlutil.TxStmt(txn, s.selectRedactedEventStmt) + stmt := sqlutil.TxStmt(txn, s.selectRedactionInfoByRedactionEventIDStmt) err = stmt.QueryRowContext(ctx, redactionEventID).Scan( &info.RedactionEventID, &info.RedactsEventID, &info.Validated, ) if err == sql.ErrNoRows { - err = nil info = nil + err = nil } return } -func (s *redactionStatements) SelectRedactionEvent( - ctx context.Context, txn *sql.Tx, redactedEventID string, +func (s *redactionStatements) SelectRedactionInfoByEventBeingRedacted( + ctx context.Context, txn *sql.Tx, eventID string, ) (info *tables.RedactionInfo, err error) { info = &tables.RedactionInfo{} - stmt := sqlutil.TxStmt(txn, s.selectRedactionEventStmt) - err = stmt.QueryRowContext(ctx, redactedEventID).Scan( + stmt := sqlutil.TxStmt(txn, s.selectRedactionInfoByEventBeingRedactedStmt) + err = stmt.QueryRowContext(ctx, eventID).Scan( &info.RedactionEventID, &info.RedactsEventID, &info.Validated, ) if err == sql.ErrNoRows { - err = nil info = nil + err = nil } return } diff --git a/roomserver/storage/shared/storage.go b/roomserver/storage/shared/storage.go index 8c7854e80..a9cb57821 100644 --- a/roomserver/storage/shared/storage.go +++ b/roomserver/storage/shared/storage.go @@ -4,7 +4,6 @@ import ( "context" "database/sql" "encoding/json" - "fmt" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/roomserver/api" @@ -345,13 +344,15 @@ func (d *Database) GetLatestEventsForUpdate( func (d *Database) StoreEvent( ctx context.Context, event gomatrixserverlib.Event, txnAndSessionID *api.TransactionID, authEventNIDs []types.EventNID, -) (types.RoomNID, types.StateAtEvent, error) { +) (types.RoomNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error) { var ( roomNID types.RoomNID eventTypeNID types.EventTypeNID eventStateKeyNID types.EventStateKeyNID eventNID types.EventNID stateNID types.StateSnapshotNID + redactionEvent *gomatrixserverlib.Event + redactedEventID string err error ) @@ -419,11 +420,11 @@ func (d *Database) StoreEvent( if err = d.EventJSONTable.InsertEventJSON(ctx, txn, eventNID, event.JSON()); err != nil { return err } - - return d.handleRedactions(ctx, txn, eventNID, event) + redactionEvent, redactedEventID, err = d.handleRedactions(ctx, txn, eventNID, event) + return err }) if err != nil { - return 0, types.StateAtEvent{}, err + return 0, types.StateAtEvent{}, nil, "", err } return roomNID, types.StateAtEvent{ @@ -435,7 +436,7 @@ func (d *Database) StoreEvent( }, EventNID: eventNID, }, - }, nil + }, redactionEvent, redactedEventID, nil } func (d *Database) PublishRoom(ctx context.Context, roomID string, publish bool) error { @@ -531,20 +532,42 @@ func extractRoomVersionFromCreateEvent(event gomatrixserverlib.Event) ( // When an event is redacted, the redacted event JSON is modified to add an `unsigned.redacted_because` field. We use this field // when loading events to determine whether to apply redactions. This keeps the hot-path of reading events quick as we don't need // to cross-reference with other tables when loading. -func (d *Database) handleRedactions(ctx context.Context, txn *sql.Tx, eventNID types.EventNID, event gomatrixserverlib.Event) error { +// +// Returns the redaction event and the event ID of the redacted event if this call resulted in a redaction. +func (d *Database) handleRedactions( + ctx context.Context, txn *sql.Tx, eventNID types.EventNID, event gomatrixserverlib.Event, +) (*gomatrixserverlib.Event, string, error) { + var err error + isRedactionEvent := event.Type() == gomatrixserverlib.MRoomRedaction && event.StateKey() == nil + if isRedactionEvent { + // an event which redacts itself should be ignored + if event.EventID() == event.Redacts() { + return nil, "", nil + } + + err = d.RedactionsTable.InsertRedaction(ctx, txn, tables.RedactionInfo{ + Validated: false, + RedactionEventID: event.EventID(), + RedactsEventID: event.Redacts(), + }) + if err != nil { + return nil, "", err + } + } + redactionEvent, redactedEvent, validated, err := d.loadRedactionPair(ctx, txn, eventNID, event) if err != nil { - return err + return nil, "", err } if validated || redactedEvent == nil || redactionEvent == nil { // we've seen this redaction before or there is nothing to redact - return nil + return nil, "", nil } // mark the event as redacted err = redactedEvent.SetUnsignedField("redacted_because", redactionEvent) if err != nil { - return err + return nil, "", err } if redactionsArePermanent { redactedEvent.Event = redactedEvent.Redact() @@ -552,82 +575,51 @@ func (d *Database) handleRedactions(ctx context.Context, txn *sql.Tx, eventNID t // overwrite the eventJSON table err = d.EventJSONTable.InsertEventJSON(ctx, txn, redactedEvent.EventNID, redactedEvent.JSON()) if err != nil { - return err + return nil, "", err } - return d.RedactionsTable.MarkRedactionValidated(ctx, txn, redactionEvent.EventID(), true) + return &redactionEvent.Event, redactedEvent.EventID(), d.RedactionsTable.MarkRedactionValidated(ctx, txn, redactionEvent.EventID(), true) } // loadRedactionPair returns both the redaction event and the redacted event, else nil. -// nolint:gocyclo func (d *Database) loadRedactionPair( ctx context.Context, txn *sql.Tx, eventNID types.EventNID, event gomatrixserverlib.Event, ) (*types.Event, *types.Event, bool, error) { var redactionEvent, redactedEvent *types.Event var info *tables.RedactionInfo - var nids map[string]types.EventNID - var evs []types.Event var err error isRedactionEvent := event.Type() == gomatrixserverlib.MRoomRedaction && event.StateKey() == nil + + var eventBeingRedacted string if isRedactionEvent { + eventBeingRedacted = event.Redacts() redactionEvent = &types.Event{ EventNID: eventNID, Event: event, } - // find the redacted event if one exists - info, err = d.RedactionsTable.SelectRedactedEvent(ctx, txn, event.EventID()) - if err != nil { - return nil, nil, false, err - } - if info == nil { - // we don't have the redacted event yet - return nil, nil, false, nil - } - nids, err = d.EventNIDs(ctx, []string{info.RedactsEventID}) - if err != nil { - return nil, nil, false, err - } - if len(nids) == 0 { - return nil, nil, false, fmt.Errorf("redaction: missing event NID being redacted: %+v", info) - } - evs, err = d.Events(ctx, []types.EventNID{nids[info.RedactsEventID]}) - if err != nil { - return nil, nil, false, err - } - if len(evs) != 1 { - return nil, nil, false, fmt.Errorf("redaction: missing event being redacted: %+v", info) - } - redactedEvent = &evs[0] } else { + eventBeingRedacted = event.EventID() // maybe, we'll see if we have info redactedEvent = &types.Event{ EventNID: eventNID, Event: event, } - // find the redaction event if one exists - info, err = d.RedactionsTable.SelectRedactionEvent(ctx, txn, event.EventID()) - if err != nil { - return nil, nil, false, err - } - if info == nil { - // this event is not redacted - return nil, nil, false, nil - } - nids, err = d.EventNIDs(ctx, []string{info.RedactionEventID}) - if err != nil { - return nil, nil, false, err - } - if len(nids) == 0 { - return nil, nil, false, fmt.Errorf("redaction: missing redaction event NID: %+v", info) - } - evs, err = d.Events(ctx, []types.EventNID{nids[info.RedactionEventID]}) - if err != nil { - return nil, nil, false, err - } - if len(evs) != 1 { - return nil, nil, false, fmt.Errorf("redaction: missing redaction event: %+v", info) - } - redactionEvent = &evs[0] } + + info, err = d.RedactionsTable.SelectRedactionInfoByEventBeingRedacted(ctx, txn, eventBeingRedacted) + if err != nil { + return nil, nil, false, err + } + if info == nil { + // this event hasn't been redacted or we don't have the redaction for it yet + return nil, nil, false, nil + } + + if isRedactionEvent { + redactedEvent = d.loadEvent(ctx, info.RedactsEventID) + } else { + redactionEvent = d.loadEvent(ctx, info.RedactionEventID) + } + return redactionEvent, redactedEvent, info.Validated, nil } @@ -639,3 +631,22 @@ func (d *Database) applyRedactions(events []types.Event) { } } } + +// loadEvent loads a single event or returns nil on any problems/missing event +func (d *Database) loadEvent(ctx context.Context, eventID string) *types.Event { + nids, err := d.EventNIDs(ctx, []string{eventID}) + if err != nil { + return nil + } + if len(nids) == 0 { + return nil + } + evs, err := d.Events(ctx, []types.EventNID{nids[eventID]}) + if err != nil { + return nil + } + if len(evs) != 1 { + return nil + } + return &evs[0] +} diff --git a/roomserver/storage/sqlite3/redactions_table.go b/roomserver/storage/sqlite3/redactions_table.go index 9910892c4..1cddb9b4f 100644 --- a/roomserver/storage/sqlite3/redactions_table.go +++ b/roomserver/storage/sqlite3/redactions_table.go @@ -40,11 +40,11 @@ const insertRedactionSQL = "" + "INSERT INTO roomserver_redactions (redaction_event_id, redacts_event_id, validated)" + " VALUES ($1, $2, $3)" -const selectRedactedEventSQL = "" + +const selectRedactionInfoByRedactionEventIDSQL = "" + "SELECT redaction_event_id, redacts_event_id, validated FROM roomserver_redactions" + " WHERE redaction_event_id = $1" -const selectRedactionEventSQL = "" + +const selectRedactionInfoByEventBeingRedactedSQL = "" + "SELECT redaction_event_id, redacts_event_id, validated FROM roomserver_redactions" + " WHERE redacts_event_id = $1" @@ -52,10 +52,10 @@ const markRedactionValidatedSQL = "" + " UPDATE roomserver_redactions SET validated = $2 WHERE redaction_event_id = $1" type redactionStatements struct { - insertRedactionStmt *sql.Stmt - selectRedactedEventStmt *sql.Stmt - selectRedactionEventStmt *sql.Stmt - markRedactionValidatedStmt *sql.Stmt + insertRedactionStmt *sql.Stmt + selectRedactionInfoByRedactionEventIDStmt *sql.Stmt + selectRedactionInfoByEventBeingRedactedStmt *sql.Stmt + markRedactionValidatedStmt *sql.Stmt } func NewSqliteRedactionsTable(db *sql.DB) (tables.Redactions, error) { @@ -67,8 +67,8 @@ func NewSqliteRedactionsTable(db *sql.DB) (tables.Redactions, error) { return s, shared.StatementList{ {&s.insertRedactionStmt, insertRedactionSQL}, - {&s.selectRedactedEventStmt, selectRedactedEventSQL}, - {&s.selectRedactionEventStmt, selectRedactionEventSQL}, + {&s.selectRedactionInfoByRedactionEventIDStmt, selectRedactionInfoByRedactionEventIDSQL}, + {&s.selectRedactionInfoByEventBeingRedactedStmt, selectRedactionInfoByEventBeingRedactedSQL}, {&s.markRedactionValidatedStmt, markRedactionValidatedSQL}, }.Prepare(db) } @@ -81,11 +81,11 @@ func (s *redactionStatements) InsertRedaction( return err } -func (s *redactionStatements) SelectRedactedEvent( +func (s *redactionStatements) SelectRedactionInfoByRedactionEventID( ctx context.Context, txn *sql.Tx, redactionEventID string, ) (info *tables.RedactionInfo, err error) { info = &tables.RedactionInfo{} - stmt := sqlutil.TxStmt(txn, s.selectRedactedEventStmt) + stmt := sqlutil.TxStmt(txn, s.selectRedactionInfoByRedactionEventIDStmt) err = stmt.QueryRowContext(ctx, redactionEventID).Scan( &info.RedactionEventID, &info.RedactsEventID, &info.Validated, ) @@ -96,12 +96,12 @@ func (s *redactionStatements) SelectRedactedEvent( return } -func (s *redactionStatements) SelectRedactionEvent( - ctx context.Context, txn *sql.Tx, redactedEventID string, +func (s *redactionStatements) SelectRedactionInfoByEventBeingRedacted( + ctx context.Context, txn *sql.Tx, eventID string, ) (info *tables.RedactionInfo, err error) { info = &tables.RedactionInfo{} - stmt := sqlutil.TxStmt(txn, s.selectRedactionEventStmt) - err = stmt.QueryRowContext(ctx, redactedEventID).Scan( + stmt := sqlutil.TxStmt(txn, s.selectRedactionInfoByEventBeingRedactedStmt) + err = stmt.QueryRowContext(ctx, eventID).Scan( &info.RedactionEventID, &info.RedactsEventID, &info.Validated, ) if err == sql.ErrNoRows { diff --git a/roomserver/storage/tables/interface.go b/roomserver/storage/tables/interface.go index c6eb6696b..78273b3cc 100644 --- a/roomserver/storage/tables/interface.go +++ b/roomserver/storage/tables/interface.go @@ -139,10 +139,10 @@ type RedactionInfo struct { type Redactions interface { InsertRedaction(ctx context.Context, txn *sql.Tx, info RedactionInfo) error - // SelectRedactedEvent returns the redaction info for the given redaction event ID, or nil if there is no match. - SelectRedactedEvent(ctx context.Context, txn *sql.Tx, redactionEventID string) (*RedactionInfo, error) - // SelectRedactionEvent returns the redaction info for the given redacted event ID, or nil if there is no match. - SelectRedactionEvent(ctx context.Context, txn *sql.Tx, redactedEventID string) (*RedactionInfo, error) + // SelectRedactionInfoByRedactionEventID returns the redaction info for the given redaction event ID, or nil if there is no match. + SelectRedactionInfoByRedactionEventID(ctx context.Context, txn *sql.Tx, redactionEventID string) (*RedactionInfo, error) + // SelectRedactionInfoByEventBeingRedacted returns the redaction info for the given redacted event ID, or nil if there is no match. + SelectRedactionInfoByEventBeingRedacted(ctx context.Context, txn *sql.Tx, eventID string) (*RedactionInfo, error) // Mark this redaction event as having been validated. This means we have both sides of the redaction and have // successfully redacted the event JSON. MarkRedactionValidated(ctx context.Context, txn *sql.Tx, redactionEventID string, validated bool) error