diff --git a/.github/workflows/dendrite.yml b/.github/workflows/dendrite.yml index 674848f87..3dccd713f 100644 --- a/.github/workflows/dendrite.yml +++ b/.github/workflows/dendrite.yml @@ -7,6 +7,7 @@ on: pull_request: release: types: [published] + workflow_dispatch: concurrency: group: ${{ github.workflow }}-${{ github.ref }} diff --git a/CHANGES.md b/CHANGES.md index 36a1a6311..aaf5836ba 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,18 @@ # Changelog +## Dendrite 0.9.4 (2022-08-19) + +### Fixes + +* A bug in the roomserver around handling rejected outliers has been fixed +* Backfilled events will now use the correct history visibility where possible +* The device list updater backoff has been fixed, which should reduce the number of outbound HTTP requests and `Failed to query device keys for some users` log entries for dead servers +* The `/sync` endpoint will no longer incorrectly return room entries for retired invites which could cause some rooms to show up in the client "Historical" section +* The `/createRoom` endpoint will now correctly populate `is_direct` in invite membership events, which may help clients to classify direct messages correctly +* The `create-account` tool will now log an error if the shared secret is not set in the Dendrite config +* A couple of minor bugs have been fixed in the membership lazy-loading +* Queued EDUs in the federation API are now cached properly + ## Dendrite 0.9.3 (2022-08-15) ### Important diff --git a/clientapi/routing/createroom.go b/clientapi/routing/createroom.go index 874908639..3e837c864 100644 --- a/clientapi/routing/createroom.go +++ b/clientapi/routing/createroom.go @@ -49,6 +49,7 @@ type createRoomRequest struct { GuestCanJoin bool `json:"guest_can_join"` RoomVersion gomatrixserverlib.RoomVersion `json:"room_version"` PowerLevelContentOverride json.RawMessage `json:"power_level_content_override"` + IsDirect bool `json:"is_direct"` } const ( @@ -499,9 +500,17 @@ func createRoom( // Build some stripped state for the invite. var globalStrippedState []gomatrixserverlib.InviteV2StrippedState for _, event := range builtEvents { + // Chosen events from the spec: + // https://spec.matrix.org/v1.3/client-server-api/#stripped-state switch event.Type() { + case gomatrixserverlib.MRoomCreate: + fallthrough case gomatrixserverlib.MRoomName: fallthrough + case gomatrixserverlib.MRoomAvatar: + fallthrough + case gomatrixserverlib.MRoomTopic: + fallthrough case gomatrixserverlib.MRoomCanonicalAlias: fallthrough case gomatrixserverlib.MRoomEncryption: @@ -522,7 +531,7 @@ func createRoom( // Build the invite event. inviteEvent, err := buildMembershipEvent( ctx, invitee, "", profileAPI, device, gomatrixserverlib.Invite, - roomID, true, cfg, evTime, rsAPI, asAPI, + roomID, r.IsDirect, cfg, evTime, rsAPI, asAPI, ) if err != nil { util.GetLogger(ctx).WithError(err).Error("buildMembershipEvent failed") diff --git a/cmd/create-account/main.go b/cmd/create-account/main.go index fbd14b8d7..bd053f2f7 100644 --- a/cmd/create-account/main.go +++ b/cmd/create-account/main.go @@ -85,6 +85,10 @@ func main() { logrus.Fatalf("The reset-password flag has been replaced by the POST /_dendrite/admin/resetPassword/{localpart} admin API.") } + if cfg.ClientAPI.RegistrationSharedSecret == "" { + logrus.Fatalln("Shared secret registration is not enabled, enable it by setting a shared secret in the config: 'client_api.registration_shared_secret'") + } + if *username == "" { flag.Usage() os.Exit(1) diff --git a/federationapi/storage/shared/storage_edus.go b/federationapi/storage/shared/storage_edus.go index b62e5d9c5..ce9632ed3 100644 --- a/federationapi/storage/shared/storage_edus.go +++ b/federationapi/storage/shared/storage_edus.go @@ -110,6 +110,7 @@ func (d *Database) GetPendingEDUs( return fmt.Errorf("json.Unmarshal: %w", err) } edus[&Receipt{nid}] = &event + d.Cache.StoreFederationQueuedEDU(nid, &event) } return nil @@ -177,20 +178,18 @@ func (d *Database) GetPendingEDUServerNames( return d.FederationQueueEDUs.SelectQueueEDUServerNames(ctx, nil) } -// DeleteExpiredEDUs deletes expired EDUs +// DeleteExpiredEDUs deletes expired EDUs and evicts them from the cache. func (d *Database) DeleteExpiredEDUs(ctx context.Context) error { - return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { + var jsonNIDs []int64 + err := d.Writer.Do(d.DB, nil, func(txn *sql.Tx) (err error) { expiredBefore := gomatrixserverlib.AsTimestamp(time.Now()) - jsonNIDs, err := d.FederationQueueEDUs.SelectExpiredEDUs(ctx, txn, expiredBefore) + jsonNIDs, err = d.FederationQueueEDUs.SelectExpiredEDUs(ctx, txn, expiredBefore) if err != nil { return err } if len(jsonNIDs) == 0 { return nil } - for i := range jsonNIDs { - d.Cache.EvictFederationQueuedEDU(jsonNIDs[i]) - } if err = d.FederationQueueJSON.DeleteQueueJSON(ctx, txn, jsonNIDs); err != nil { return err @@ -198,4 +197,14 @@ func (d *Database) DeleteExpiredEDUs(ctx context.Context) error { return d.FederationQueueEDUs.DeleteExpiredEDUs(ctx, txn, expiredBefore) }) + + if err != nil { + return err + } + + for i := range jsonNIDs { + d.Cache.EvictFederationQueuedEDU(jsonNIDs[i]) + } + + return nil } diff --git a/federationapi/storage/storage_test.go b/federationapi/storage/storage_test.go index 7eba2cbee..3b0268e55 100644 --- a/federationapi/storage/storage_test.go +++ b/federationapi/storage/storage_test.go @@ -31,7 +31,7 @@ func mustCreateFederationDatabase(t *testing.T, dbType test.DBType) (storage.Dat func TestExpireEDUs(t *testing.T) { var expireEDUTypes = map[string]time.Duration{ - gomatrixserverlib.MReceipt: time.Millisecond, + gomatrixserverlib.MReceipt: 0, } ctx := context.Background() diff --git a/internal/version.go b/internal/version.go index 561e6c06f..384f091a0 100644 --- a/internal/version.go +++ b/internal/version.go @@ -17,7 +17,7 @@ var build string const ( VersionMajor = 0 VersionMinor = 9 - VersionPatch = 3 + VersionPatch = 4 VersionTag = "" // example: "rc1" ) diff --git a/keyserver/internal/device_list_update.go b/keyserver/internal/device_list_update.go index 80efbec51..304b67b23 100644 --- a/keyserver/internal/device_list_update.go +++ b/keyserver/internal/device_list_update.go @@ -335,8 +335,9 @@ func (u *DeviceListUpdater) worker(ch chan gomatrixserverlib.ServerName) { retriesMu := &sync.Mutex{} // restarter goroutine which will inject failed servers into ch when it is time go func() { + var serversToRetry []gomatrixserverlib.ServerName for { - var serversToRetry []gomatrixserverlib.ServerName + serversToRetry = serversToRetry[:0] // reuse memory time.Sleep(time.Second) retriesMu.Lock() now := time.Now() @@ -355,11 +356,17 @@ func (u *DeviceListUpdater) worker(ch chan gomatrixserverlib.ServerName) { } }() for serverName := range ch { + retriesMu.Lock() + _, exists := retries[serverName] + retriesMu.Unlock() + if exists { + // Don't retry a server that we're already waiting for. + continue + } waitTime, shouldRetry := u.processServer(serverName) if shouldRetry { retriesMu.Lock() - _, exists := retries[serverName] - if !exists { + if _, exists = retries[serverName]; !exists { retries[serverName] = time.Now().Add(waitTime) } retriesMu.Unlock() diff --git a/roomserver/api/perform.go b/roomserver/api/perform.go index d9ea9dd1c..20931f807 100644 --- a/roomserver/api/perform.go +++ b/roomserver/api/perform.go @@ -5,9 +5,10 @@ import ( "fmt" "net/http" - "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" + + "github.com/matrix-org/dendrite/clientapi/jsonerror" ) type PerformErrorCode int @@ -161,7 +162,8 @@ func (r *PerformBackfillRequest) PrevEventIDs() []string { // PerformBackfillResponse is a response to PerformBackfill. type PerformBackfillResponse struct { // Missing events, arbritrary order. - Events []*gomatrixserverlib.HeaderedEvent `json:"events"` + Events []*gomatrixserverlib.HeaderedEvent `json:"events"` + HistoryVisibility gomatrixserverlib.HistoryVisibility `json:"history_visibility"` } type PerformPublishRequest struct { diff --git a/roomserver/internal/helpers/auth.go b/roomserver/internal/helpers/auth.go index 648c50cf6..935a045df 100644 --- a/roomserver/internal/helpers/auth.go +++ b/roomserver/internal/helpers/auth.go @@ -39,7 +39,7 @@ func CheckForSoftFail( var authStateEntries []types.StateEntry var err error if rewritesState { - authStateEntries, err = db.StateEntriesForEventIDs(ctx, stateEventIDs) + authStateEntries, err = db.StateEntriesForEventIDs(ctx, stateEventIDs, true) if err != nil { return true, fmt.Errorf("StateEntriesForEventIDs failed: %w", err) } @@ -97,7 +97,7 @@ func CheckAuthEvents( authEventIDs []string, ) ([]types.EventNID, error) { // Grab the numeric IDs for the supplied auth state events from the database. - authStateEntries, err := db.StateEntriesForEventIDs(ctx, authEventIDs) + authStateEntries, err := db.StateEntriesForEventIDs(ctx, authEventIDs, true) if err != nil { return nil, fmt.Errorf("db.StateEntriesForEventIDs: %w", err) } diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go index 81541260c..0ece9d145 100644 --- a/roomserver/internal/input/input_events.go +++ b/roomserver/internal/input/input_events.go @@ -17,8 +17,8 @@ package input import ( - "bytes" "context" + "database/sql" "fmt" "time" @@ -107,28 +107,6 @@ func (r *Inputer) processRoomEvent( }) } - // if we have already got this event then do not process it again, if the input kind is an outlier. - // Outliers contain no extra information which may warrant a re-processing. - if input.Kind == api.KindOutlier { - evs, err2 := r.DB.EventsFromIDs(ctx, []string{event.EventID()}) - if err2 == nil && len(evs) == 1 { - // check hash matches if we're on early room versions where the event ID was a random string - idFormat, err2 := headered.RoomVersion.EventIDFormat() - if err2 == nil { - switch idFormat { - case gomatrixserverlib.EventIDFormatV1: - if bytes.Equal(event.EventReference().EventSHA256, evs[0].EventReference().EventSHA256) { - logger.Debugf("Already processed event; ignoring") - return nil - } - default: - logger.Debugf("Already processed event; ignoring") - return nil - } - } - } - } - // Don't waste time processing the event if the room doesn't exist. // A room entry locally will only be created in response to a create // event. @@ -141,6 +119,29 @@ func (r *Inputer) processRoomEvent( return fmt.Errorf("room %s does not exist for event %s", event.RoomID(), event.EventID()) } + // If we already know about this outlier and it hasn't been rejected + // then we won't attempt to reprocess it. If it was rejected or has now + // arrived as a different kind of event, then we can attempt to reprocess, + // in case we have learned something new or need to weave the event into + // the DAG now. + if input.Kind == api.KindOutlier && roomInfo != nil { + wasRejected, werr := r.DB.IsEventRejected(ctx, roomInfo.RoomNID, event.EventID()) + switch { + case werr == sql.ErrNoRows: + // We haven't seen this event before so continue. + case werr != nil: + // Something has gone wrong trying to find out if we rejected + // this event already. + logger.WithError(werr).Errorf("Failed to check if event %q is already seen", event.EventID()) + return werr + case !wasRejected: + // We've seen this event before and it wasn't rejected so we + // should ignore it. + logger.Debugf("Already processed event %q, ignoring", event.EventID()) + return nil + } + } + var missingAuth, missingPrev bool serverRes := &fedapi.QueryJoinedHostServerNamesInRoomResponse{} if !isCreateEvent { @@ -300,7 +301,7 @@ func (r *Inputer) processRoomEvent( // bother doing this if the event was already rejected as it just ends up // burning CPU time. historyVisibility := gomatrixserverlib.HistoryVisibilityShared // Default to shared. - if rejectionErr == nil && !isRejected && !softfail { + if input.Kind != api.KindOutlier && rejectionErr == nil && !isRejected && !softfail { var err error historyVisibility, rejectionErr, err = r.processStateBefore(ctx, input, missingPrev) if err != nil { @@ -355,6 +356,8 @@ func (r *Inputer) processRoomEvent( // We stop here if the event is rejected: We've stored it but won't update forward extremities or notify anyone about it. if isRejected || softfail { logger.WithError(rejectionErr).WithFields(logrus.Fields{ + "room_id": event.RoomID(), + "event_id": event.EventID(), "soft_fail": softfail, "missing_prev": missingPrev, }).Warn("Stored rejected event") @@ -660,7 +663,7 @@ func (r *Inputer) calculateAndSetState( // We've been told what the state at the event is so we don't need to calculate it. // Check that those state events are in the database and store the state. var entries []types.StateEntry - if entries, err = r.DB.StateEntriesForEventIDs(ctx, input.StateEventIDs); err != nil { + if entries, err = r.DB.StateEntriesForEventIDs(ctx, input.StateEventIDs, true); err != nil { return fmt.Errorf("updater.StateEntriesForEventIDs: %w", err) } entries = types.DeduplicateStateEntries(entries) diff --git a/roomserver/internal/perform/perform_backfill.go b/roomserver/internal/perform/perform_backfill.go index 298ba04f6..de76b6412 100644 --- a/roomserver/internal/perform/perform_backfill.go +++ b/roomserver/internal/perform/perform_backfill.go @@ -140,11 +140,11 @@ func (r *Backfiller) backfillViaFederation(ctx context.Context, req *api.Perform continue } var entries []types.StateEntry - if entries, err = r.DB.StateEntriesForEventIDs(ctx, stateIDs); err != nil { + if entries, err = r.DB.StateEntriesForEventIDs(ctx, stateIDs, true); err != nil { // attempt to fetch the missing events r.fetchAndStoreMissingEvents(ctx, info.RoomVersion, requester, stateIDs) // try again - entries, err = r.DB.StateEntriesForEventIDs(ctx, stateIDs) + entries, err = r.DB.StateEntriesForEventIDs(ctx, stateIDs, true) if err != nil { logrus.WithError(err).WithField("event_id", ev.EventID()).Error("backfillViaFederation: failed to get state entries for event") return err @@ -164,6 +164,7 @@ func (r *Backfiller) backfillViaFederation(ctx context.Context, req *api.Perform // TODO: update backwards extremities, as that should be moved from syncapi to roomserver at some point. res.Events = events + res.HistoryVisibility = requester.historyVisiblity return nil } @@ -248,6 +249,7 @@ type backfillRequester struct { servers []gomatrixserverlib.ServerName eventIDToBeforeStateIDs map[string][]string eventIDMap map[string]*gomatrixserverlib.Event + historyVisiblity gomatrixserverlib.HistoryVisibility } func newBackfillRequester( @@ -266,6 +268,7 @@ func newBackfillRequester( eventIDMap: make(map[string]*gomatrixserverlib.Event), bwExtrems: bwExtrems, preferServer: preferServer, + historyVisiblity: gomatrixserverlib.HistoryVisibilityShared, } } @@ -447,7 +450,8 @@ FindSuccessor: } // possibly return all joined servers depending on history visiblity - memberEventsFromVis, err := joinEventsFromHistoryVisibility(ctx, b.db, roomID, stateEntries, b.thisServer) + memberEventsFromVis, visibility, err := joinEventsFromHistoryVisibility(ctx, b.db, roomID, stateEntries, b.thisServer) + b.historyVisiblity = visibility if err != nil { logrus.WithError(err).Error("ServersAtEvent: failed calculate servers from history visibility rules") return nil @@ -528,7 +532,7 @@ func (b *backfillRequester) ProvideEvents(roomVer gomatrixserverlib.RoomVersion, // pull all events and then filter by that table. func joinEventsFromHistoryVisibility( ctx context.Context, db storage.Database, roomID string, stateEntries []types.StateEntry, - thisServer gomatrixserverlib.ServerName) ([]types.Event, error) { + thisServer gomatrixserverlib.ServerName) ([]types.Event, gomatrixserverlib.HistoryVisibility, error) { var eventNIDs []types.EventNID for _, entry := range stateEntries { @@ -542,7 +546,9 @@ func joinEventsFromHistoryVisibility( // Get all of the events in this state stateEvents, err := db.Events(ctx, eventNIDs) if err != nil { - return nil, err + // even though the default should be shared, restricting the visibility to joined + // feels more secure here. + return nil, gomatrixserverlib.HistoryVisibilityJoined, err } events := make([]*gomatrixserverlib.Event, len(stateEvents)) for i := range stateEvents { @@ -551,20 +557,22 @@ func joinEventsFromHistoryVisibility( // Can we see events in the room? canSeeEvents := auth.IsServerAllowed(thisServer, true, events) + visibility := gomatrixserverlib.HistoryVisibility(auth.HistoryVisibilityForRoom(events)) if !canSeeEvents { - logrus.Infof("ServersAtEvent history not visible to us: %s", auth.HistoryVisibilityForRoom(events)) - return nil, nil + logrus.Infof("ServersAtEvent history not visible to us: %s", visibility) + return nil, visibility, nil } // get joined members info, err := db.RoomInfo(ctx, roomID) if err != nil { - return nil, err + return nil, visibility, nil } joinEventNIDs, err := db.GetMembershipEventNIDsForRoom(ctx, info.RoomNID, true, false) if err != nil { - return nil, err + return nil, visibility, err } - return db.Events(ctx, joinEventNIDs) + evs, err := db.Events(ctx, joinEventNIDs) + return evs, visibility, err } func persistEvents(ctx context.Context, db storage.Database, events []*gomatrixserverlib.HeaderedEvent) (types.RoomNID, map[string]types.Event) { diff --git a/roomserver/internal/query/query.go b/roomserver/internal/query/query.go index 6802d3f99..f3a09dcce 100644 --- a/roomserver/internal/query/query.go +++ b/roomserver/internal/query/query.go @@ -73,13 +73,10 @@ func (r *Queryer) QueryStateAfterEvents( prevStates, err := r.DB.StateAtEventIDs(ctx, request.PrevEventIDs) if err != nil { - switch err.(type) { - case types.MissingEventError: - util.GetLogger(ctx).Errorf("QueryStateAfterEvents: MissingEventError: %s", err) + if _, ok := err.(types.MissingEventError); ok { return nil - default: - return err } + return err } response.PrevEventsExist = true @@ -96,6 +93,12 @@ func (r *Queryer) QueryStateAfterEvents( ) } if err != nil { + if _, ok := err.(types.MissingEventError); ok { + return nil + } + if _, ok := err.(types.MissingStateError); ok { + return nil + } return err } diff --git a/roomserver/storage/interface.go b/roomserver/storage/interface.go index 8a7bd5bf8..144b8dd6f 100644 --- a/roomserver/storage/interface.go +++ b/roomserver/storage/interface.go @@ -79,7 +79,7 @@ type Database interface { // Look up the state entries for a list of string event IDs // Returns an error if the there is an error talking to the database // Returns a types.MissingEventError if the event IDs aren't in the database. - StateEntriesForEventIDs(ctx context.Context, eventIDs []string) ([]types.StateEntry, error) + StateEntriesForEventIDs(ctx context.Context, eventIDs []string, excludeRejected bool) ([]types.StateEntry, error) // Look up the string event state keys for a list of numeric event state keys // Returns an error if there was a problem talking to the database. EventStateKeys(ctx context.Context, eventStateKeyNIDs []types.EventStateKeyNID) (map[types.EventStateKeyNID]string, error) @@ -94,6 +94,8 @@ type Database interface { // Opens and returns a room updater, which locks the room and opens a transaction. // The GetRoomUpdater must have Commit or Rollback called on it if this doesn't return an error. // If this returns an error then no further action is required. + // IsEventRejected returns true if the event is known and rejected. + IsEventRejected(ctx context.Context, roomNID types.RoomNID, eventID string) (rejected bool, err error) GetRoomUpdater(ctx context.Context, roomInfo *types.RoomInfo) (*shared.RoomUpdater, error) // Look up event references for the latest events in the room and the current state snapshot. // Returns the latest events, the current state and the maximum depth of the latest events plus 1. diff --git a/roomserver/storage/postgres/events_table.go b/roomserver/storage/postgres/events_table.go index a4d05756d..a310c3963 100644 --- a/roomserver/storage/postgres/events_table.go +++ b/roomserver/storage/postgres/events_table.go @@ -88,6 +88,14 @@ const bulkSelectStateEventByIDSQL = "" + " WHERE event_id = ANY($1)" + " ORDER BY event_type_nid, event_state_key_nid ASC" +// Bulk lookup of events by string ID that aren't excluded. +// Sort by the numeric IDs for event type and state key. +// This means we can use binary search to lookup entries by type and state key. +const bulkSelectStateEventByIDExcludingRejectedSQL = "" + + "SELECT event_type_nid, event_state_key_nid, event_nid FROM roomserver_events" + + " WHERE event_id = ANY($1) AND is_rejected = FALSE" + + " ORDER BY event_type_nid, event_state_key_nid ASC" + // Bulk look up of events by event NID, optionally filtering by the event type // or event state key NIDs if provided. (The CARDINALITY check will return true // if the provided arrays are empty, ergo no filtering). @@ -136,23 +144,28 @@ const selectMaxEventDepthSQL = "" + const selectRoomNIDsForEventNIDsSQL = "" + "SELECT event_nid, room_nid FROM roomserver_events WHERE event_nid = ANY($1)" +const selectEventRejectedSQL = "" + + "SELECT is_rejected FROM roomserver_events WHERE room_nid = $1 AND event_id = $2" + type eventStatements struct { - insertEventStmt *sql.Stmt - selectEventStmt *sql.Stmt - bulkSelectStateEventByIDStmt *sql.Stmt - bulkSelectStateEventByNIDStmt *sql.Stmt - bulkSelectStateAtEventByIDStmt *sql.Stmt - updateEventStateStmt *sql.Stmt - selectEventSentToOutputStmt *sql.Stmt - updateEventSentToOutputStmt *sql.Stmt - selectEventIDStmt *sql.Stmt - bulkSelectStateAtEventAndReferenceStmt *sql.Stmt - bulkSelectEventReferenceStmt *sql.Stmt - bulkSelectEventIDStmt *sql.Stmt - bulkSelectEventNIDStmt *sql.Stmt - bulkSelectUnsentEventNIDStmt *sql.Stmt - selectMaxEventDepthStmt *sql.Stmt - selectRoomNIDsForEventNIDsStmt *sql.Stmt + insertEventStmt *sql.Stmt + selectEventStmt *sql.Stmt + bulkSelectStateEventByIDStmt *sql.Stmt + bulkSelectStateEventByIDExcludingRejectedStmt *sql.Stmt + bulkSelectStateEventByNIDStmt *sql.Stmt + bulkSelectStateAtEventByIDStmt *sql.Stmt + updateEventStateStmt *sql.Stmt + selectEventSentToOutputStmt *sql.Stmt + updateEventSentToOutputStmt *sql.Stmt + selectEventIDStmt *sql.Stmt + bulkSelectStateAtEventAndReferenceStmt *sql.Stmt + bulkSelectEventReferenceStmt *sql.Stmt + bulkSelectEventIDStmt *sql.Stmt + bulkSelectEventNIDStmt *sql.Stmt + bulkSelectUnsentEventNIDStmt *sql.Stmt + selectMaxEventDepthStmt *sql.Stmt + selectRoomNIDsForEventNIDsStmt *sql.Stmt + selectEventRejectedStmt *sql.Stmt } func CreateEventsTable(db *sql.DB) error { @@ -167,6 +180,7 @@ func PrepareEventsTable(db *sql.DB) (tables.Events, error) { {&s.insertEventStmt, insertEventSQL}, {&s.selectEventStmt, selectEventSQL}, {&s.bulkSelectStateEventByIDStmt, bulkSelectStateEventByIDSQL}, + {&s.bulkSelectStateEventByIDExcludingRejectedStmt, bulkSelectStateEventByIDExcludingRejectedSQL}, {&s.bulkSelectStateEventByNIDStmt, bulkSelectStateEventByNIDSQL}, {&s.bulkSelectStateAtEventByIDStmt, bulkSelectStateAtEventByIDSQL}, {&s.updateEventStateStmt, updateEventStateSQL}, @@ -180,6 +194,7 @@ func PrepareEventsTable(db *sql.DB) (tables.Events, error) { {&s.bulkSelectUnsentEventNIDStmt, bulkSelectUnsentEventNIDSQL}, {&s.selectMaxEventDepthStmt, selectMaxEventDepthSQL}, {&s.selectRoomNIDsForEventNIDsStmt, selectRoomNIDsForEventNIDsSQL}, + {&s.selectEventRejectedStmt, selectEventRejectedSQL}, }.Prepare(db) } @@ -216,11 +231,18 @@ func (s *eventStatements) SelectEvent( } // bulkSelectStateEventByID lookups a list of state events by event ID. -// If any of the requested events are missing from the database it returns a types.MissingEventError +// If not excluding rejected events, and any of the requested events are missing from +// the database it returns a types.MissingEventError. If excluding rejected events, +// the events will be silently omitted without error. func (s *eventStatements) BulkSelectStateEventByID( - ctx context.Context, txn *sql.Tx, eventIDs []string, + ctx context.Context, txn *sql.Tx, eventIDs []string, excludeRejected bool, ) ([]types.StateEntry, error) { - stmt := sqlutil.TxStmt(txn, s.bulkSelectStateEventByIDStmt) + var stmt *sql.Stmt + if excludeRejected { + stmt = sqlutil.TxStmt(txn, s.bulkSelectStateEventByIDExcludingRejectedStmt) + } else { + stmt = sqlutil.TxStmt(txn, s.bulkSelectStateEventByIDStmt) + } rows, err := stmt.QueryContext(ctx, pq.StringArray(eventIDs)) if err != nil { return nil, err @@ -230,10 +252,10 @@ func (s *eventStatements) BulkSelectStateEventByID( // because of the unique constraint on event IDs. // So we can allocate an array of the correct size now. // We might get fewer results than IDs so we adjust the length of the slice before returning it. - results := make([]types.StateEntry, len(eventIDs)) + results := make([]types.StateEntry, 0, len(eventIDs)) i := 0 for ; rows.Next(); i++ { - result := &results[i] + var result types.StateEntry if err = rows.Scan( &result.EventTypeNID, &result.EventStateKeyNID, @@ -241,11 +263,12 @@ func (s *eventStatements) BulkSelectStateEventByID( ); err != nil { return nil, err } + results = append(results, result) } if err = rows.Err(); err != nil { return nil, err } - if i != len(eventIDs) { + if !excludeRejected && i != len(eventIDs) { // If there are fewer rows returned than IDs then we were asked to lookup event IDs we don't have. // We don't know which ones were missing because we don't return the string IDs in the query. // However it should be possible debug this by replaying queries or entries from the input kafka logs. @@ -540,3 +563,11 @@ func eventNIDsAsArray(eventNIDs []types.EventNID) pq.Int64Array { } return nids } + +func (s *eventStatements) SelectEventRejected( + ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, eventID string, +) (rejected bool, err error) { + stmt := sqlutil.TxStmt(txn, s.selectEventRejectedStmt) + err = stmt.QueryRowContext(ctx, roomNID, eventID).Scan(&rejected) + return +} diff --git a/roomserver/storage/shared/storage.go b/roomserver/storage/shared/storage.go index cc49f4485..e602aa3b1 100644 --- a/roomserver/storage/shared/storage.go +++ b/roomserver/storage/shared/storage.go @@ -113,9 +113,9 @@ func (d *Database) eventStateKeyNIDs( } func (d *Database) StateEntriesForEventIDs( - ctx context.Context, eventIDs []string, + ctx context.Context, eventIDs []string, excludeRejected bool, ) ([]types.StateEntry, error) { - return d.EventsTable.BulkSelectStateEventByID(ctx, nil, eventIDs) + return d.EventsTable.BulkSelectStateEventByID(ctx, nil, eventIDs, excludeRejected) } func (d *Database) StateEntriesForTuples( @@ -567,6 +567,10 @@ func (d *Database) GetRoomUpdater( return updater, err } +func (d *Database) IsEventRejected(ctx context.Context, roomNID types.RoomNID, eventID string) (bool, error) { + return d.EventsTable.SelectEventRejected(ctx, nil, roomNID, eventID) +} + func (d *Database) StoreEvent( ctx context.Context, event *gomatrixserverlib.Event, authEventNIDs []types.EventNID, isRejected bool, diff --git a/roomserver/storage/sqlite3/events_table.go b/roomserver/storage/sqlite3/events_table.go index 1dda34c36..943f256eb 100644 --- a/roomserver/storage/sqlite3/events_table.go +++ b/roomserver/storage/sqlite3/events_table.go @@ -65,6 +65,14 @@ const bulkSelectStateEventByIDSQL = "" + " WHERE event_id IN ($1)" + " ORDER BY event_type_nid, event_state_key_nid ASC" +// Bulk lookup of events by string ID that aren't rejected. +// Sort by the numeric IDs for event type and state key. +// This means we can use binary search to lookup entries by type and state key. +const bulkSelectStateEventByIDExcludingRejectedSQL = "" + + "SELECT event_type_nid, event_state_key_nid, event_nid FROM roomserver_events" + + " WHERE event_id IN ($1) AND is_rejected = 0" + + " ORDER BY event_type_nid, event_state_key_nid ASC" + const bulkSelectStateEventByNIDSQL = "" + "SELECT event_type_nid, event_state_key_nid, event_nid FROM roomserver_events" + " WHERE event_nid IN ($1)" @@ -109,19 +117,24 @@ const selectMaxEventDepthSQL = "" + const selectRoomNIDsForEventNIDsSQL = "" + "SELECT event_nid, room_nid FROM roomserver_events WHERE event_nid IN ($1)" +const selectEventRejectedSQL = "" + + "SELECT is_rejected FROM roomserver_events WHERE room_nid = $1 AND event_id = $2" + type eventStatements struct { - db *sql.DB - insertEventStmt *sql.Stmt - selectEventStmt *sql.Stmt - bulkSelectStateEventByIDStmt *sql.Stmt - bulkSelectStateAtEventByIDStmt *sql.Stmt - updateEventStateStmt *sql.Stmt - selectEventSentToOutputStmt *sql.Stmt - updateEventSentToOutputStmt *sql.Stmt - selectEventIDStmt *sql.Stmt - bulkSelectStateAtEventAndReferenceStmt *sql.Stmt - bulkSelectEventReferenceStmt *sql.Stmt - bulkSelectEventIDStmt *sql.Stmt + db *sql.DB + insertEventStmt *sql.Stmt + selectEventStmt *sql.Stmt + bulkSelectStateEventByIDStmt *sql.Stmt + bulkSelectStateEventByIDExcludingRejectedStmt *sql.Stmt + bulkSelectStateAtEventByIDStmt *sql.Stmt + updateEventStateStmt *sql.Stmt + selectEventSentToOutputStmt *sql.Stmt + updateEventSentToOutputStmt *sql.Stmt + selectEventIDStmt *sql.Stmt + bulkSelectStateAtEventAndReferenceStmt *sql.Stmt + bulkSelectEventReferenceStmt *sql.Stmt + bulkSelectEventIDStmt *sql.Stmt + selectEventRejectedStmt *sql.Stmt //bulkSelectEventNIDStmt *sql.Stmt //bulkSelectUnsentEventNIDStmt *sql.Stmt //selectRoomNIDsForEventNIDsStmt *sql.Stmt @@ -141,6 +154,7 @@ func PrepareEventsTable(db *sql.DB) (tables.Events, error) { {&s.insertEventStmt, insertEventSQL}, {&s.selectEventStmt, selectEventSQL}, {&s.bulkSelectStateEventByIDStmt, bulkSelectStateEventByIDSQL}, + {&s.bulkSelectStateEventByIDExcludingRejectedStmt, bulkSelectStateEventByIDExcludingRejectedSQL}, {&s.bulkSelectStateAtEventByIDStmt, bulkSelectStateAtEventByIDSQL}, {&s.updateEventStateStmt, updateEventStateSQL}, {&s.updateEventSentToOutputStmt, updateEventSentToOutputSQL}, @@ -152,6 +166,7 @@ func PrepareEventsTable(db *sql.DB) (tables.Events, error) { //{&s.bulkSelectEventNIDStmt, bulkSelectEventNIDSQL}, //{&s.bulkSelectUnsentEventNIDStmt, bulkSelectUnsentEventNIDSQL}, //{&s.selectRoomNIDForEventNIDStmt, selectRoomNIDForEventNIDSQL}, + {&s.selectEventRejectedStmt, selectEventRejectedSQL}, }.Prepare(db) } @@ -189,16 +204,24 @@ func (s *eventStatements) SelectEvent( } // bulkSelectStateEventByID lookups a list of state events by event ID. -// If any of the requested events are missing from the database it returns a types.MissingEventError +// If not excluding rejected events, and any of the requested events are missing from +// the database it returns a types.MissingEventError. If excluding rejected events, +// the events will be silently omitted without error. func (s *eventStatements) BulkSelectStateEventByID( - ctx context.Context, txn *sql.Tx, eventIDs []string, + ctx context.Context, txn *sql.Tx, eventIDs []string, excludeRejected bool, ) ([]types.StateEntry, error) { /////////////// + var sql string + if excludeRejected { + sql = bulkSelectStateEventByIDExcludingRejectedSQL + } else { + sql = bulkSelectStateEventByIDSQL + } iEventIDs := make([]interface{}, len(eventIDs)) for k, v := range eventIDs { iEventIDs[k] = v } - selectOrig := strings.Replace(bulkSelectStateEventByIDSQL, "($1)", sqlutil.QueryVariadic(len(iEventIDs)), 1) + selectOrig := strings.Replace(sql, "($1)", sqlutil.QueryVariadic(len(iEventIDs)), 1) selectPrep, err := s.db.Prepare(selectOrig) if err != nil { return nil, err @@ -216,10 +239,10 @@ func (s *eventStatements) BulkSelectStateEventByID( // because of the unique constraint on event IDs. // So we can allocate an array of the correct size now. // We might get fewer results than IDs so we adjust the length of the slice before returning it. - results := make([]types.StateEntry, len(eventIDs)) + results := make([]types.StateEntry, 0, len(eventIDs)) i := 0 for ; rows.Next(); i++ { - result := &results[i] + var result types.StateEntry if err = rows.Scan( &result.EventTypeNID, &result.EventStateKeyNID, @@ -227,8 +250,9 @@ func (s *eventStatements) BulkSelectStateEventByID( ); err != nil { return nil, err } + results = append(results, result) } - if i != len(eventIDs) { + if !excludeRejected && i != len(eventIDs) { // If there are fewer rows returned than IDs then we were asked to lookup event IDs we don't have. // We don't know which ones were missing because we don't return the string IDs in the query. // However it should be possible debug this by replaying queries or entries from the input kafka logs. @@ -614,3 +638,11 @@ func eventNIDsAsArray(eventNIDs []types.EventNID) string { b, _ := json.Marshal(eventNIDs) return string(b) } + +func (s *eventStatements) SelectEventRejected( + ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, eventID string, +) (rejected bool, err error) { + stmt := sqlutil.TxStmt(txn, s.selectEventRejectedStmt) + err = stmt.QueryRowContext(ctx, roomNID, eventID).Scan(&rejected) + return +} diff --git a/roomserver/storage/tables/events_table_test.go b/roomserver/storage/tables/events_table_test.go index 74502a31f..107af4784 100644 --- a/roomserver/storage/tables/events_table_test.go +++ b/roomserver/storage/tables/events_table_test.go @@ -102,7 +102,7 @@ func Test_EventsTable(t *testing.T) { }) } - stateEvents, err := tab.BulkSelectStateEventByID(ctx, nil, eventIDs) + stateEvents, err := tab.BulkSelectStateEventByID(ctx, nil, eventIDs, false) assert.NoError(t, err) assert.Equal(t, len(stateEvents), len(eventIDs)) nids := make([]types.EventNID, 0, len(stateEvents)) diff --git a/roomserver/storage/tables/interface.go b/roomserver/storage/tables/interface.go index 0bc389b80..68d30f994 100644 --- a/roomserver/storage/tables/interface.go +++ b/roomserver/storage/tables/interface.go @@ -46,7 +46,7 @@ type Events interface { SelectEvent(ctx context.Context, txn *sql.Tx, eventID string) (types.EventNID, types.StateSnapshotNID, error) // bulkSelectStateEventByID lookups a list of state events by event ID. // If any of the requested events are missing from the database it returns a types.MissingEventError - BulkSelectStateEventByID(ctx context.Context, txn *sql.Tx, eventIDs []string) ([]types.StateEntry, error) + BulkSelectStateEventByID(ctx context.Context, txn *sql.Tx, eventIDs []string, excludeRejected bool) ([]types.StateEntry, error) BulkSelectStateEventByNID(ctx context.Context, txn *sql.Tx, eventNIDs []types.EventNID, stateKeyTuples []types.StateKeyTuple) ([]types.StateEntry, error) // BulkSelectStateAtEventByID lookups the state at a list of events by event ID. // If any of the requested events are missing from the database it returns a types.MissingEventError. @@ -66,6 +66,7 @@ type Events interface { BulkSelectUnsentEventNID(ctx context.Context, txn *sql.Tx, eventIDs []string) (map[string]types.EventNID, error) SelectMaxEventDepth(ctx context.Context, txn *sql.Tx, eventNIDs []types.EventNID) (int64, error) SelectRoomNIDsForEventNIDs(ctx context.Context, txn *sql.Tx, eventNIDs []types.EventNID) (roomNIDs map[types.EventNID]types.RoomNID, err error) + SelectEventRejected(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, eventID string) (rejected bool, err error) } type Rooms interface { diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go index 9db3d8e17..03614302c 100644 --- a/syncapi/routing/messages.go +++ b/syncapi/routing/messages.go @@ -350,8 +350,10 @@ func (r *messagesReq) retrieveEvents() ( startTime := time.Now() filteredEvents, err := internal.ApplyHistoryVisibilityFilter(r.ctx, r.db, r.rsAPI, events, nil, r.device.UserID, "messages") logrus.WithFields(logrus.Fields{ - "duration": time.Since(startTime), - "room_id": r.roomID, + "duration": time.Since(startTime), + "room_id": r.roomID, + "events_before": len(events), + "events_after": len(filteredEvents), }).Debug("applied history visibility (messages)") return gomatrixserverlib.HeaderedToClientEvents(filteredEvents, gomatrixserverlib.FormatAll), start, end, err } @@ -513,6 +515,9 @@ func (r *messagesReq) backfill(roomID string, backwardsExtremities map[string][] // Store the events in the database, while marking them as unfit to show // up in responses to sync requests. + if res.HistoryVisibility == "" { + res.HistoryVisibility = gomatrixserverlib.HistoryVisibilityShared + } for i := range res.Events { _, err = r.db.WriteEvent( context.Background(), @@ -521,7 +526,7 @@ func (r *messagesReq) backfill(roomID string, backwardsExtremities map[string][] []string{}, []string{}, nil, true, - gomatrixserverlib.HistoryVisibilityShared, + res.HistoryVisibility, ) if err != nil { return nil, err @@ -534,6 +539,9 @@ func (r *messagesReq) backfill(roomID string, backwardsExtremities map[string][] // last `limit` events events = events[len(events)-limit:] } + for _, ev := range events { + ev.Visibility = res.HistoryVisibility + } return events, nil } diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go index 8f633640e..9c758cac1 100644 --- a/syncapi/storage/postgres/output_room_events_table.go +++ b/syncapi/storage/postgres/output_room_events_table.go @@ -279,8 +279,8 @@ func (s *outputRoomEventsStatements) SelectStateInRange( log.WithFields(log.Fields{ "since": r.From, "current": r.To, - "adds": addIDs, - "dels": delIDs, + "adds": len(addIDs), + "dels": len(delIDs), }).Warn("StateBetween: ignoring deleted state") } diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go index 91fd35b5b..cd9a46d4c 100644 --- a/syncapi/storage/sqlite3/output_room_events_table.go +++ b/syncapi/storage/sqlite3/output_room_events_table.go @@ -234,8 +234,8 @@ func (s *outputRoomEventsStatements) SelectStateInRange( log.WithFields(log.Fields{ "since": r.From, "current": r.To, - "adds": addIDsJSON, - "dels": delIDsJSON, + "adds": len(addIDsJSON), + "dels": len(delIDsJSON), }).Warn("StateBetween: ignoring deleted state") } diff --git a/syncapi/streams/stream_invite.go b/syncapi/streams/stream_invite.go index ddac9be2c..925da32f2 100644 --- a/syncapi/streams/stream_invite.go +++ b/syncapi/streams/stream_invite.go @@ -7,8 +7,9 @@ import ( "strconv" "time" - "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" + + "github.com/matrix-org/dendrite/syncapi/types" ) type InviteStreamProvider struct { @@ -62,6 +63,11 @@ func (p *InviteStreamProvider) IncrementalSync( req.Response.Rooms.Invite[roomID] = *ir } + // When doing an initial sync, we don't want to add retired invites, as this + // can add rooms we were invited to, but already left. + if from == 0 { + return to + } for roomID := range retiredInvites { if _, ok := req.Response.Rooms.Join[roomID]; !ok { lr := types.NewLeaveResponse() diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index 136cbea5a..0e9dda577 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -560,14 +560,13 @@ func (p *PDUStreamProvider) lazyLoadMembers( // If this is a gapped incremental sync, we still want this membership isGappedIncremental := limited && incremental // We want this users membership event, keep it in the list - _, ok := timelineUsers[event.Sender()] - wantMembership := ok || isGappedIncremental - if wantMembership { + stateKey := *event.StateKey() + if _, ok := timelineUsers[stateKey]; ok || isGappedIncremental { newStateEvents = append(newStateEvents, event) if !includeRedundant { - p.lazyLoadCache.StoreLazyLoadedUser(device, roomID, event.Sender(), event.EventID()) + p.lazyLoadCache.StoreLazyLoadedUser(device, roomID, stateKey, event.EventID()) } - delete(timelineUsers, event.Sender()) + delete(timelineUsers, stateKey) } } else { newStateEvents = append(newStateEvents, event) @@ -578,17 +577,16 @@ func (p *PDUStreamProvider) lazyLoadMembers( wantUsers = append(wantUsers, userID) } // Query missing membership events - memberships, err := p.DB.GetStateEventsForRoom(ctx, roomID, &gomatrixserverlib.StateFilter{ - Limit: 100, - Senders: &wantUsers, - Types: &[]string{gomatrixserverlib.MRoomMember}, - }) + filter := gomatrixserverlib.DefaultStateFilter() + filter.Senders = &wantUsers + filter.Types = &[]string{gomatrixserverlib.MRoomMember} + memberships, err := p.DB.GetStateEventsForRoom(ctx, roomID, &filter) if err != nil { return stateEvents, err } // cache the membership events for _, membership := range memberships { - p.lazyLoadCache.StoreLazyLoadedUser(device, roomID, membership.Sender(), membership.EventID()) + p.lazyLoadCache.StoreLazyLoadedUser(device, roomID, *membership.StateKey(), membership.EventID()) } stateEvents = append(newStateEvents, memberships...) return stateEvents, nil diff --git a/syncapi/syncapi_test.go b/syncapi/syncapi_test.go index dc073a16e..8b33c5e43 100644 --- a/syncapi/syncapi_test.go +++ b/syncapi/syncapi_test.go @@ -10,6 +10,10 @@ import ( "testing" "time" + "github.com/matrix-org/gomatrixserverlib" + "github.com/nats-io/nats.go" + "github.com/tidwall/gjson" + "github.com/matrix-org/dendrite/clientapi/producers" keyapi "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/roomserver" @@ -21,9 +25,6 @@ import ( "github.com/matrix-org/dendrite/test" "github.com/matrix-org/dendrite/test/testrig" userapi "github.com/matrix-org/dendrite/userapi/api" - "github.com/matrix-org/gomatrixserverlib" - "github.com/nats-io/nats.go" - "github.com/tidwall/gjson" ) type syncRoomserverAPI struct { @@ -153,8 +154,12 @@ func testSyncAccessTokens(t *testing.T, dbType test.DBType) { wantJoinedRooms: []string{room.ID}, }, } - // TODO: find a better way - time.Sleep(500 * time.Millisecond) + + syncUntil(t, base, alice.AccessToken, false, func(syncBody string) bool { + // wait for the last sent eventID to come down sync + path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(event_id=="%s")`, room.ID, room.Events()[len(room.Events())-1].EventID()) + return gjson.Get(syncBody, path).Exists() + }) for _, tc := range testCases { w := httptest.NewRecorder() @@ -342,6 +347,13 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) { // create the users alice := test.NewUser(t) + aliceDev := userapi.Device{ + ID: "ALICEID", + UserID: alice.ID, + AccessToken: "ALICE_BEARER_TOKEN", + DisplayName: "ALICE", + } + bob := test.NewUser(t) bobDev := userapi.Device{ @@ -408,7 +420,7 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) { rsAPI := roomserver.NewInternalAPI(base) rsAPI.SetFederationAPI(nil, nil) - AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{bobDev}}, rsAPI, &syncKeyAPI{}) + AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, &syncKeyAPI{}) for _, tc := range testCases { testname := fmt.Sprintf("%s - %s", tc.historyVisibility, userType) @@ -417,11 +429,18 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) { room := test.NewRoom(t, alice, test.RoomHistoryVisibility(tc.historyVisibility)) // send the events/messages to NATS to create the rooms - beforeJoinEv := room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": fmt.Sprintf("Before invite in a %s room", tc.historyVisibility)}) + beforeJoinBody := fmt.Sprintf("Before invite in a %s room", tc.historyVisibility) + beforeJoinEv := room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": beforeJoinBody}) eventsToSend := append(room.Events(), beforeJoinEv) if err := api.SendEvents(ctx, rsAPI, api.KindNew, eventsToSend, "test", "test", nil, false); err != nil { t.Fatalf("failed to send events: %v", err) } + syncUntil(t, base, aliceDev.AccessToken, false, + func(syncBody string) bool { + path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(content.body=="%s")`, room.ID, beforeJoinBody) + return gjson.Get(syncBody, path).Exists() + }, + ) // There is only one event, we expect only to be able to see this, if the room is world_readable w := httptest.NewRecorder() @@ -447,13 +466,20 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) { inviteEv := room.CreateAndInsert(t, alice, "m.room.member", map[string]interface{}{"membership": "invite"}, test.WithStateKey(bob.ID)) afterInviteEv := room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": fmt.Sprintf("After invite in a %s room", tc.historyVisibility)}) joinEv := room.CreateAndInsert(t, bob, "m.room.member", map[string]interface{}{"membership": "join"}, test.WithStateKey(bob.ID)) - msgEv := room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": fmt.Sprintf("After join in a %s room", tc.historyVisibility)}) + afterJoinBody := fmt.Sprintf("After join in a %s room", tc.historyVisibility) + msgEv := room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": afterJoinBody}) eventsToSend = append([]*gomatrixserverlib.HeaderedEvent{}, inviteEv, afterInviteEv, joinEv, msgEv) if err := api.SendEvents(ctx, rsAPI, api.KindNew, eventsToSend, "test", "test", nil, false); err != nil { t.Fatalf("failed to send events: %v", err) } + syncUntil(t, base, aliceDev.AccessToken, false, + func(syncBody string) bool { + path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(content.body=="%s")`, room.ID, afterJoinBody) + return gjson.Get(syncBody, path).Exists() + }, + ) // Verify the messages after/before invite are visible or not w = httptest.NewRecorder() @@ -508,8 +534,8 @@ func testSendToDevice(t *testing.T, dbType test.DBType) { AccountType: userapi.AccountTypeUser, } - base, close := testrig.CreateBaseDendrite(t, dbType) - defer close() + base, baseClose := testrig.CreateBaseDendrite(t, dbType) + defer baseClose() jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream) @@ -604,7 +630,14 @@ func testSendToDevice(t *testing.T, dbType test.DBType) { t.Fatalf("unable to send to device message: %v", err) } } - time.Sleep((time.Millisecond * 15) * time.Duration(tc.sendMessagesCount)) // wait a bit, so the messages can be processed + + syncUntil(t, base, alice.AccessToken, + len(tc.want) == 0, + func(body string) bool { + return gjson.Get(body, fmt.Sprintf(`to_device.events.#(content.dummy=="message %d")`, msgCounter)).Exists() + }, + ) + // Execute a /sync request, recording the response w := httptest.NewRecorder() base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{ @@ -627,6 +660,42 @@ func testSendToDevice(t *testing.T, dbType test.DBType) { } } +func syncUntil(t *testing.T, + base *base.BaseDendrite, accessToken string, + skip bool, + checkFunc func(syncBody string) bool, +) { + if checkFunc == nil { + t.Fatalf("No checkFunc defined") + } + if skip { + return + } + // loop on /sync until we receive the last send message or timeout after 5 seconds, since we don't know if the message made it + // to the syncAPI when hitting /sync + done := make(chan bool) + defer close(done) + go func() { + for { + w := httptest.NewRecorder() + base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{ + "access_token": accessToken, + "timeout": "1000", + }))) + if checkFunc(w.Body.String()) { + done <- true + return + } + } + }() + + select { + case <-done: + case <-time.After(time.Second * 5): + t.Fatalf("Timed out waiting for messages") + } +} + func toNATSMsgs(t *testing.T, base *base.BaseDendrite, input ...*gomatrixserverlib.HeaderedEvent) []*nats.Msg { result := make([]*nats.Msg, len(input)) for i, ev := range input { diff --git a/sytest-whitelist b/sytest-whitelist index 50db125bd..194f33799 100644 --- a/sytest-whitelist +++ b/sytest-whitelist @@ -144,7 +144,6 @@ Server correctly handles incoming m.device_list_update If remote user leaves room, changes device and rejoins we see update in sync If remote user leaves room, changes device and rejoins we see update in /keys/changes If remote user leaves room we no longer receive device updates -If a device list update goes missing, the server resyncs on the next one Server correctly resyncs when client query keys and there is no remote cache Server correctly resyncs when server leaves and rejoins a room Device list doesn't change if remote server is down @@ -632,7 +631,6 @@ Test that rejected pushers are removed. Trying to add push rule with no scope fails with 400 Trying to add push rule with invalid scope fails with 400 Forward extremities remain so even after the next events are populated as outliers -If a device list update goes missing, the server resyncs on the next one uploading self-signing key notifies over federation uploading signed devices gets propagated over federation Device list doesn't change if remote server is down