From 5424b88f3061dddbeae53df8b67bd064e8cbc400 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Tue, 16 Aug 2022 11:55:06 +0100 Subject: [PATCH 1/6] Use `is_direct` flag from `/createRoom`, update stripped state (#2644) * Use `is_direct` flag from `/createRoom`, update stripped state * Add comment --- clientapi/routing/createroom.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/clientapi/routing/createroom.go b/clientapi/routing/createroom.go index 874908639..3e837c864 100644 --- a/clientapi/routing/createroom.go +++ b/clientapi/routing/createroom.go @@ -49,6 +49,7 @@ type createRoomRequest struct { GuestCanJoin bool `json:"guest_can_join"` RoomVersion gomatrixserverlib.RoomVersion `json:"room_version"` PowerLevelContentOverride json.RawMessage `json:"power_level_content_override"` + IsDirect bool `json:"is_direct"` } const ( @@ -499,9 +500,17 @@ func createRoom( // Build some stripped state for the invite. var globalStrippedState []gomatrixserverlib.InviteV2StrippedState for _, event := range builtEvents { + // Chosen events from the spec: + // https://spec.matrix.org/v1.3/client-server-api/#stripped-state switch event.Type() { + case gomatrixserverlib.MRoomCreate: + fallthrough case gomatrixserverlib.MRoomName: fallthrough + case gomatrixserverlib.MRoomAvatar: + fallthrough + case gomatrixserverlib.MRoomTopic: + fallthrough case gomatrixserverlib.MRoomCanonicalAlias: fallthrough case gomatrixserverlib.MRoomEncryption: @@ -522,7 +531,7 @@ func createRoom( // Build the invite event. inviteEvent, err := buildMembershipEvent( ctx, invitee, "", profileAPI, device, gomatrixserverlib.Invite, - roomID, true, cfg, evTime, rsAPI, asAPI, + roomID, r.IsDirect, cfg, evTime, rsAPI, asAPI, ) if err != nil { util.GetLogger(ctx).WithError(err).Error("buildMembershipEvent failed") From 804653e55118a8155dfbfe5187e941a2f8ce8337 Mon Sep 17 00:00:00 2001 From: Till <2353100+S7evinK@users.noreply.github.com> Date: Tue, 16 Aug 2022 13:21:22 +0200 Subject: [PATCH 2/6] Verify a shared secret is set in `create-account` (#2645) --- cmd/create-account/main.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cmd/create-account/main.go b/cmd/create-account/main.go index fbd14b8d7..bd053f2f7 100644 --- a/cmd/create-account/main.go +++ b/cmd/create-account/main.go @@ -85,6 +85,10 @@ func main() { logrus.Fatalf("The reset-password flag has been replaced by the POST /_dendrite/admin/resetPassword/{localpart} admin API.") } + if cfg.ClientAPI.RegistrationSharedSecret == "" { + logrus.Fatalln("Shared secret registration is not enabled, enable it by setting a shared secret in the config: 'client_api.registration_shared_secret'") + } + if *username == "" { flag.Usage() os.Exit(1) From ec16c944eb646c6ef61f0b7783f2f3869b9bc10c Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Tue, 16 Aug 2022 14:42:00 +0100 Subject: [PATCH 3/6] Lazy-loading fixes (#2646) * Use existing current room state if we have it * Don't dedupe before applying the history vis filter * Revert "Don't dedupe before applying the history vis filter" This reverts commit d27c4a0874dabb77c2eda6b23eb7c00478bc9e90. * Revert "Use existing current room state if we have it" This reverts commit 5819b4a7ce511204c4fb48d3c4741612b136e2ea. * Tweaks --- syncapi/streams/stream_pdu.go | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index 136cbea5a..0e9dda577 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -560,14 +560,13 @@ func (p *PDUStreamProvider) lazyLoadMembers( // If this is a gapped incremental sync, we still want this membership isGappedIncremental := limited && incremental // We want this users membership event, keep it in the list - _, ok := timelineUsers[event.Sender()] - wantMembership := ok || isGappedIncremental - if wantMembership { + stateKey := *event.StateKey() + if _, ok := timelineUsers[stateKey]; ok || isGappedIncremental { newStateEvents = append(newStateEvents, event) if !includeRedundant { - p.lazyLoadCache.StoreLazyLoadedUser(device, roomID, event.Sender(), event.EventID()) + p.lazyLoadCache.StoreLazyLoadedUser(device, roomID, stateKey, event.EventID()) } - delete(timelineUsers, event.Sender()) + delete(timelineUsers, stateKey) } } else { newStateEvents = append(newStateEvents, event) @@ -578,17 +577,16 @@ func (p *PDUStreamProvider) lazyLoadMembers( wantUsers = append(wantUsers, userID) } // Query missing membership events - memberships, err := p.DB.GetStateEventsForRoom(ctx, roomID, &gomatrixserverlib.StateFilter{ - Limit: 100, - Senders: &wantUsers, - Types: &[]string{gomatrixserverlib.MRoomMember}, - }) + filter := gomatrixserverlib.DefaultStateFilter() + filter.Senders = &wantUsers + filter.Types = &[]string{gomatrixserverlib.MRoomMember} + memberships, err := p.DB.GetStateEventsForRoom(ctx, roomID, &filter) if err != nil { return stateEvents, err } // cache the membership events for _, membership := range memberships { - p.lazyLoadCache.StoreLazyLoadedUser(device, roomID, membership.Sender(), membership.EventID()) + p.lazyLoadCache.StoreLazyLoadedUser(device, roomID, *membership.StateKey(), membership.EventID()) } stateEvents = append(newStateEvents, memberships...) return stateEvents, nil From ad4ac2c016d2916c51695fd11d3a7b52e38d0ca2 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Tue, 16 Aug 2022 14:42:35 +0100 Subject: [PATCH 4/6] Stop spamming the logs with `StateBetween: ignoring deleted state` event IDs --- syncapi/storage/postgres/output_room_events_table.go | 4 ++-- syncapi/storage/sqlite3/output_room_events_table.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go index 8f633640e..9c758cac1 100644 --- a/syncapi/storage/postgres/output_room_events_table.go +++ b/syncapi/storage/postgres/output_room_events_table.go @@ -279,8 +279,8 @@ func (s *outputRoomEventsStatements) SelectStateInRange( log.WithFields(log.Fields{ "since": r.From, "current": r.To, - "adds": addIDs, - "dels": delIDs, + "adds": len(addIDs), + "dels": len(delIDs), }).Warn("StateBetween: ignoring deleted state") } diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go index 91fd35b5b..cd9a46d4c 100644 --- a/syncapi/storage/sqlite3/output_room_events_table.go +++ b/syncapi/storage/sqlite3/output_room_events_table.go @@ -234,8 +234,8 @@ func (s *outputRoomEventsStatements) SelectStateInRange( log.WithFields(log.Fields{ "since": r.From, "current": r.To, - "adds": addIDsJSON, - "dels": delIDsJSON, + "adds": len(addIDsJSON), + "dels": len(delIDsJSON), }).Warn("StateBetween: ignoring deleted state") } From 8d9c8f11c5c5673d48b8e5308e1f565927bdd341 Mon Sep 17 00:00:00 2001 From: Till Faelligen <2353100+S7evinK@users.noreply.github.com> Date: Thu, 18 Aug 2022 08:56:57 +0200 Subject: [PATCH 5/6] Add a delay after sending events to the roomserver --- syncapi/syncapi_test.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/syncapi/syncapi_test.go b/syncapi/syncapi_test.go index dc073a16e..089bdafaf 100644 --- a/syncapi/syncapi_test.go +++ b/syncapi/syncapi_test.go @@ -10,6 +10,10 @@ import ( "testing" "time" + "github.com/matrix-org/gomatrixserverlib" + "github.com/nats-io/nats.go" + "github.com/tidwall/gjson" + "github.com/matrix-org/dendrite/clientapi/producers" keyapi "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/roomserver" @@ -21,9 +25,6 @@ import ( "github.com/matrix-org/dendrite/test" "github.com/matrix-org/dendrite/test/testrig" userapi "github.com/matrix-org/dendrite/userapi/api" - "github.com/matrix-org/gomatrixserverlib" - "github.com/nats-io/nats.go" - "github.com/tidwall/gjson" ) type syncRoomserverAPI struct { @@ -422,6 +423,7 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) { if err := api.SendEvents(ctx, rsAPI, api.KindNew, eventsToSend, "test", "test", nil, false); err != nil { t.Fatalf("failed to send events: %v", err) } + time.Sleep(100 * time.Millisecond) // TODO: find a better way // There is only one event, we expect only to be able to see this, if the room is world_readable w := httptest.NewRecorder() @@ -454,6 +456,7 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) { if err := api.SendEvents(ctx, rsAPI, api.KindNew, eventsToSend, "test", "test", nil, false); err != nil { t.Fatalf("failed to send events: %v", err) } + time.Sleep(100 * time.Millisecond) // TODO: find a better way // Verify the messages after/before invite are visible or not w = httptest.NewRecorder() From 59bc0a6f4ed0324da0387118e1761b4551aaf103 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Thu, 18 Aug 2022 10:37:47 +0100 Subject: [PATCH 6/6] Reprocess rejected input events (#2647) * Reprocess outliers that were previously rejected * Might as well do all events this way * More useful errors * Fix queries * Tweak condition * Don't wrap errors * Report more useful error * Flatten error on `r.Queryer.QueryStateAfterEvents` * Some more debug logging * Flatten error in `QueryRestrictedJoinAllowed` * Revert "Flatten error in `QueryRestrictedJoinAllowed`" This reverts commit 1238b4184c30e0c31ffb0f364806fa1275aba483. * Tweak `QueryStateAfterEvents` * Handle MissingStateError too * Scope to room * Clean up * Fix the error * Only apply rejection check to outliers --- roomserver/internal/input/input_events.go | 47 +++++++++++---------- roomserver/internal/query/query.go | 13 +++--- roomserver/storage/interface.go | 2 + roomserver/storage/postgres/events_table.go | 13 ++++++ roomserver/storage/shared/storage.go | 4 ++ roomserver/storage/sqlite3/events_table.go | 13 ++++++ roomserver/storage/tables/interface.go | 1 + 7 files changed, 65 insertions(+), 28 deletions(-) diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go index 81541260c..53ccd5973 100644 --- a/roomserver/internal/input/input_events.go +++ b/roomserver/internal/input/input_events.go @@ -17,8 +17,8 @@ package input import ( - "bytes" "context" + "database/sql" "fmt" "time" @@ -107,28 +107,6 @@ func (r *Inputer) processRoomEvent( }) } - // if we have already got this event then do not process it again, if the input kind is an outlier. - // Outliers contain no extra information which may warrant a re-processing. - if input.Kind == api.KindOutlier { - evs, err2 := r.DB.EventsFromIDs(ctx, []string{event.EventID()}) - if err2 == nil && len(evs) == 1 { - // check hash matches if we're on early room versions where the event ID was a random string - idFormat, err2 := headered.RoomVersion.EventIDFormat() - if err2 == nil { - switch idFormat { - case gomatrixserverlib.EventIDFormatV1: - if bytes.Equal(event.EventReference().EventSHA256, evs[0].EventReference().EventSHA256) { - logger.Debugf("Already processed event; ignoring") - return nil - } - default: - logger.Debugf("Already processed event; ignoring") - return nil - } - } - } - } - // Don't waste time processing the event if the room doesn't exist. // A room entry locally will only be created in response to a create // event. @@ -141,6 +119,29 @@ func (r *Inputer) processRoomEvent( return fmt.Errorf("room %s does not exist for event %s", event.RoomID(), event.EventID()) } + // If we already know about this outlier and it hasn't been rejected + // then we won't attempt to reprocess it. If it was rejected or has now + // arrived as a different kind of event, then we can attempt to reprocess, + // in case we have learned something new or need to weave the event into + // the DAG now. + if input.Kind == api.KindOutlier && roomInfo != nil { + wasRejected, werr := r.DB.IsEventRejected(ctx, roomInfo.RoomNID, event.EventID()) + switch { + case werr == sql.ErrNoRows: + // We haven't seen this event before so continue. + case werr != nil: + // Something has gone wrong trying to find out if we rejected + // this event already. + logger.WithError(werr).Errorf("Failed to check if event %q is already seen", event.EventID()) + return werr + case !wasRejected: + // We've seen this event before and it wasn't rejected so we + // should ignore it. + logger.Debugf("Already processed event %q, ignoring", event.EventID()) + return nil + } + } + var missingAuth, missingPrev bool serverRes := &fedapi.QueryJoinedHostServerNamesInRoomResponse{} if !isCreateEvent { diff --git a/roomserver/internal/query/query.go b/roomserver/internal/query/query.go index f3061c222..f5d8c2d49 100644 --- a/roomserver/internal/query/query.go +++ b/roomserver/internal/query/query.go @@ -72,13 +72,10 @@ func (r *Queryer) QueryStateAfterEvents( prevStates, err := r.DB.StateAtEventIDs(ctx, request.PrevEventIDs) if err != nil { - switch err.(type) { - case types.MissingEventError: - util.GetLogger(ctx).Errorf("QueryStateAfterEvents: MissingEventError: %s", err) + if _, ok := err.(types.MissingEventError); ok { return nil - default: - return err } + return err } response.PrevEventsExist = true @@ -95,6 +92,12 @@ func (r *Queryer) QueryStateAfterEvents( ) } if err != nil { + if _, ok := err.(types.MissingEventError); ok { + return nil + } + if _, ok := err.(types.MissingStateError); ok { + return nil + } return err } diff --git a/roomserver/storage/interface.go b/roomserver/storage/interface.go index b12025c41..5c068873a 100644 --- a/roomserver/storage/interface.go +++ b/roomserver/storage/interface.go @@ -94,6 +94,8 @@ type Database interface { // Opens and returns a room updater, which locks the room and opens a transaction. // The GetRoomUpdater must have Commit or Rollback called on it if this doesn't return an error. // If this returns an error then no further action is required. + // IsEventRejected returns true if the event is known and rejected. + IsEventRejected(ctx context.Context, roomNID types.RoomNID, eventID string) (rejected bool, err error) GetRoomUpdater(ctx context.Context, roomInfo *types.RoomInfo) (*shared.RoomUpdater, error) // Look up event references for the latest events in the room and the current state snapshot. // Returns the latest events, the current state and the maximum depth of the latest events plus 1. diff --git a/roomserver/storage/postgres/events_table.go b/roomserver/storage/postgres/events_table.go index a4d05756d..c7748d2be 100644 --- a/roomserver/storage/postgres/events_table.go +++ b/roomserver/storage/postgres/events_table.go @@ -136,6 +136,9 @@ const selectMaxEventDepthSQL = "" + const selectRoomNIDsForEventNIDsSQL = "" + "SELECT event_nid, room_nid FROM roomserver_events WHERE event_nid = ANY($1)" +const selectEventRejectedSQL = "" + + "SELECT is_rejected FROM roomserver_events WHERE room_nid = $1 AND event_id = $2" + type eventStatements struct { insertEventStmt *sql.Stmt selectEventStmt *sql.Stmt @@ -153,6 +156,7 @@ type eventStatements struct { bulkSelectUnsentEventNIDStmt *sql.Stmt selectMaxEventDepthStmt *sql.Stmt selectRoomNIDsForEventNIDsStmt *sql.Stmt + selectEventRejectedStmt *sql.Stmt } func CreateEventsTable(db *sql.DB) error { @@ -180,6 +184,7 @@ func PrepareEventsTable(db *sql.DB) (tables.Events, error) { {&s.bulkSelectUnsentEventNIDStmt, bulkSelectUnsentEventNIDSQL}, {&s.selectMaxEventDepthStmt, selectMaxEventDepthSQL}, {&s.selectRoomNIDsForEventNIDsStmt, selectRoomNIDsForEventNIDsSQL}, + {&s.selectEventRejectedStmt, selectEventRejectedSQL}, }.Prepare(db) } @@ -540,3 +545,11 @@ func eventNIDsAsArray(eventNIDs []types.EventNID) pq.Int64Array { } return nids } + +func (s *eventStatements) SelectEventRejected( + ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, eventID string, +) (rejected bool, err error) { + stmt := sqlutil.TxStmt(txn, s.selectEventRejectedStmt) + err = stmt.QueryRowContext(ctx, roomNID, eventID).Scan(&rejected) + return +} diff --git a/roomserver/storage/shared/storage.go b/roomserver/storage/shared/storage.go index cbf9c8b20..4f92adf1f 100644 --- a/roomserver/storage/shared/storage.go +++ b/roomserver/storage/shared/storage.go @@ -567,6 +567,10 @@ func (d *Database) GetRoomUpdater( return updater, err } +func (d *Database) IsEventRejected(ctx context.Context, roomNID types.RoomNID, eventID string) (bool, error) { + return d.EventsTable.SelectEventRejected(ctx, nil, roomNID, eventID) +} + func (d *Database) StoreEvent( ctx context.Context, event *gomatrixserverlib.Event, authEventNIDs []types.EventNID, isRejected bool, diff --git a/roomserver/storage/sqlite3/events_table.go b/roomserver/storage/sqlite3/events_table.go index 1dda34c36..174e3a9a7 100644 --- a/roomserver/storage/sqlite3/events_table.go +++ b/roomserver/storage/sqlite3/events_table.go @@ -109,6 +109,9 @@ const selectMaxEventDepthSQL = "" + const selectRoomNIDsForEventNIDsSQL = "" + "SELECT event_nid, room_nid FROM roomserver_events WHERE event_nid IN ($1)" +const selectEventRejectedSQL = "" + + "SELECT is_rejected FROM roomserver_events WHERE room_nid = $1 AND event_id = $2" + type eventStatements struct { db *sql.DB insertEventStmt *sql.Stmt @@ -122,6 +125,7 @@ type eventStatements struct { bulkSelectStateAtEventAndReferenceStmt *sql.Stmt bulkSelectEventReferenceStmt *sql.Stmt bulkSelectEventIDStmt *sql.Stmt + selectEventRejectedStmt *sql.Stmt //bulkSelectEventNIDStmt *sql.Stmt //bulkSelectUnsentEventNIDStmt *sql.Stmt //selectRoomNIDsForEventNIDsStmt *sql.Stmt @@ -152,6 +156,7 @@ func PrepareEventsTable(db *sql.DB) (tables.Events, error) { //{&s.bulkSelectEventNIDStmt, bulkSelectEventNIDSQL}, //{&s.bulkSelectUnsentEventNIDStmt, bulkSelectUnsentEventNIDSQL}, //{&s.selectRoomNIDForEventNIDStmt, selectRoomNIDForEventNIDSQL}, + {&s.selectEventRejectedStmt, selectEventRejectedSQL}, }.Prepare(db) } @@ -614,3 +619,11 @@ func eventNIDsAsArray(eventNIDs []types.EventNID) string { b, _ := json.Marshal(eventNIDs) return string(b) } + +func (s *eventStatements) SelectEventRejected( + ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, eventID string, +) (rejected bool, err error) { + stmt := sqlutil.TxStmt(txn, s.selectEventRejectedStmt) + err = stmt.QueryRowContext(ctx, roomNID, eventID).Scan(&rejected) + return +} diff --git a/roomserver/storage/tables/interface.go b/roomserver/storage/tables/interface.go index 0bc389b80..ed67c43d8 100644 --- a/roomserver/storage/tables/interface.go +++ b/roomserver/storage/tables/interface.go @@ -66,6 +66,7 @@ type Events interface { BulkSelectUnsentEventNID(ctx context.Context, txn *sql.Tx, eventIDs []string) (map[string]types.EventNID, error) SelectMaxEventDepth(ctx context.Context, txn *sql.Tx, eventNIDs []types.EventNID) (int64, error) SelectRoomNIDsForEventNIDs(ctx context.Context, txn *sql.Tx, eventNIDs []types.EventNID) (roomNIDs map[types.EventNID]types.RoomNID, err error) + SelectEventRejected(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, eventID string) (rejected bool, err error) } type Rooms interface {