diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go index 7d44b57ec..3e146c12e 100644 --- a/syncapi/routing/messages.go +++ b/syncapi/routing/messages.go @@ -16,12 +16,14 @@ package routing import ( "context" + "errors" "fmt" "math" "net/http" "sort" "time" + "github.com/matrix-org/dendrite/syncapi/storage/shared" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib/spec" "github.com/matrix-org/util" @@ -330,6 +332,12 @@ func (r *messagesReq) retrieveEvents(ctx context.Context, rsAPI api.SyncRoomserv // Retrieve the events from the local database. streamEvents, err := r.snapshot.GetEventsInTopologicalRange(r.ctx, r.from, r.to, r.roomID, r.filter, r.backwardOrdering) if err != nil { + // While there are events in the topological range, the provided filter + // removed all of them. This means there are no events for this user + // anymore. Let them know. + if errors.Is(err, shared.ErrNoEventsForFilter) { + return []synctypes.ClientEvent{}, emptyToken, emptyToken, nil + } err = fmt.Errorf("GetEventsInRange: %w", err) return []synctypes.ClientEvent{}, emptyToken, emptyToken, err } diff --git a/syncapi/storage/shared/storage_sync.go b/syncapi/storage/shared/storage_sync.go index 8e79b71df..478e9a533 100644 --- a/syncapi/storage/shared/storage_sync.go +++ b/syncapi/storage/shared/storage_sync.go @@ -3,6 +3,7 @@ package shared import ( "context" "database/sql" + "errors" "fmt" "math" @@ -231,6 +232,8 @@ func (d *DatabaseTransaction) GetAccountDataInRange( return d.AccountData.SelectAccountDataInRange(ctx, d.txn, userID, r, accountDataFilterPart) } +var ErrNoEventsForFilter = errors.New("no events returned using the provided filter") + func (d *DatabaseTransaction) GetEventsInTopologicalRange( ctx context.Context, from, to *types.TopologyToken, @@ -264,6 +267,15 @@ func (d *DatabaseTransaction) GetEventsInTopologicalRange( // Retrieve the events' contents using their IDs. events, err = d.OutputEvents.SelectEvents(ctx, d.txn, eIDs, filter, true) + if err != nil { + return + } + // Check if we should be able to return events. + // If we received 0 events, this most likely means that the provided filter removed them. + if len(eIDs) > 0 && len(events) == 0 { + return nil, ErrNoEventsForFilter + } + return } diff --git a/syncapi/storage/storage_test.go b/syncapi/storage/storage_test.go index f57b0d618..7c1700147 100644 --- a/syncapi/storage/storage_test.go +++ b/syncapi/storage/storage_test.go @@ -14,6 +14,7 @@ import ( rstypes "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/syncapi/storage" + "github.com/matrix-org/dendrite/syncapi/storage/shared" "github.com/matrix-org/dendrite/syncapi/synctypes" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/test" @@ -223,6 +224,39 @@ func TestGetEventsInRangeWithTopologyToken(t *testing.T) { }) } +// The purpose of this test is to ensure that backfill does indeed go backwards, using a topology token +func TestGetEventsInRangeWithTopologyTokenNoEventsForFilter(t *testing.T) { + test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { + db, close := MustCreateDatabase(t, dbType) + defer close() + alice := test.NewUser(t) + r := test.NewRoom(t, alice) + for i := 0; i < 10; i++ { + r.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": fmt.Sprintf("hi %d", i)}) + } + events := r.Events() + _ = MustWriteEvents(t, db, events) + + WithSnapshot(t, db, func(snapshot storage.DatabaseTransaction) { + from := types.TopologyToken{Depth: math.MaxInt64, PDUPosition: math.MaxInt64} + t.Logf("max topo pos = %+v", from) + // head towards the beginning of time + to := types.TopologyToken{} + + // backpaginate 20 messages starting at the latest position. + notTypes := []string{spec.MRoomRedaction} + senders := []string{alice.ID} + filter := &synctypes.RoomEventFilter{Limit: 20, NotTypes: ¬Types, Senders: &senders} + paginatedEvents, err := snapshot.GetEventsInTopologicalRange(ctx, &from, &to, r.ID, filter, true) + assert.Equal(t, shared.ErrNoEventsForFilter, err) + assert.Nil(t, paginatedEvents) + for _, x := range paginatedEvents { + t.Logf("EventType: %s", x.Type()) + } + }) + }) +} + func TestStreamToTopologicalPosition(t *testing.T) { alice := test.NewUser(t) r := test.NewRoom(t, alice)