mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-09 15:13:12 -06:00
If we have events in the topological range, but a filter removes all of
them, make sure to return an empty "start" and "end" to avoid loops
This commit is contained in:
parent
41cf902d09
commit
0c70f19590
|
|
@ -16,12 +16,14 @@ package routing
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"net/http"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/dendrite/syncapi/storage/shared"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/gomatrixserverlib/spec"
|
||||
"github.com/matrix-org/util"
|
||||
|
|
@ -330,6 +332,12 @@ func (r *messagesReq) retrieveEvents(ctx context.Context, rsAPI api.SyncRoomserv
|
|||
// Retrieve the events from the local database.
|
||||
streamEvents, err := r.snapshot.GetEventsInTopologicalRange(r.ctx, r.from, r.to, r.roomID, r.filter, r.backwardOrdering)
|
||||
if err != nil {
|
||||
// While there are events in the topological range, the provided filter
|
||||
// removed all of them. This means there are no events for this user
|
||||
// anymore. Let them know.
|
||||
if errors.Is(err, shared.ErrNoEventsForFilter) {
|
||||
return []synctypes.ClientEvent{}, emptyToken, emptyToken, nil
|
||||
}
|
||||
err = fmt.Errorf("GetEventsInRange: %w", err)
|
||||
return []synctypes.ClientEvent{}, emptyToken, emptyToken, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ package shared
|
|||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
|
||||
|
|
@ -231,6 +232,8 @@ func (d *DatabaseTransaction) GetAccountDataInRange(
|
|||
return d.AccountData.SelectAccountDataInRange(ctx, d.txn, userID, r, accountDataFilterPart)
|
||||
}
|
||||
|
||||
var ErrNoEventsForFilter = errors.New("no events returned using the provided filter")
|
||||
|
||||
func (d *DatabaseTransaction) GetEventsInTopologicalRange(
|
||||
ctx context.Context,
|
||||
from, to *types.TopologyToken,
|
||||
|
|
@ -264,6 +267,15 @@ func (d *DatabaseTransaction) GetEventsInTopologicalRange(
|
|||
|
||||
// Retrieve the events' contents using their IDs.
|
||||
events, err = d.OutputEvents.SelectEvents(ctx, d.txn, eIDs, filter, true)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
// Check if we should be able to return events.
|
||||
// If we received 0 events, this most likely means that the provided filter removed them.
|
||||
if len(eIDs) > 0 && len(events) == 0 {
|
||||
return nil, ErrNoEventsForFilter
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@ import (
|
|||
rstypes "github.com/matrix-org/dendrite/roomserver/types"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage/shared"
|
||||
"github.com/matrix-org/dendrite/syncapi/synctypes"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
"github.com/matrix-org/dendrite/test"
|
||||
|
|
@ -223,6 +224,39 @@ func TestGetEventsInRangeWithTopologyToken(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
// The purpose of this test is to ensure that backfill does indeed go backwards, using a topology token
|
||||
func TestGetEventsInRangeWithTopologyTokenNoEventsForFilter(t *testing.T) {
|
||||
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
|
||||
db, close := MustCreateDatabase(t, dbType)
|
||||
defer close()
|
||||
alice := test.NewUser(t)
|
||||
r := test.NewRoom(t, alice)
|
||||
for i := 0; i < 10; i++ {
|
||||
r.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": fmt.Sprintf("hi %d", i)})
|
||||
}
|
||||
events := r.Events()
|
||||
_ = MustWriteEvents(t, db, events)
|
||||
|
||||
WithSnapshot(t, db, func(snapshot storage.DatabaseTransaction) {
|
||||
from := types.TopologyToken{Depth: math.MaxInt64, PDUPosition: math.MaxInt64}
|
||||
t.Logf("max topo pos = %+v", from)
|
||||
// head towards the beginning of time
|
||||
to := types.TopologyToken{}
|
||||
|
||||
// backpaginate 20 messages starting at the latest position.
|
||||
notTypes := []string{spec.MRoomRedaction}
|
||||
senders := []string{alice.ID}
|
||||
filter := &synctypes.RoomEventFilter{Limit: 20, NotTypes: ¬Types, Senders: &senders}
|
||||
paginatedEvents, err := snapshot.GetEventsInTopologicalRange(ctx, &from, &to, r.ID, filter, true)
|
||||
assert.Equal(t, shared.ErrNoEventsForFilter, err)
|
||||
assert.Nil(t, paginatedEvents)
|
||||
for _, x := range paginatedEvents {
|
||||
t.Logf("EventType: %s", x.Type())
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestStreamToTopologicalPosition(t *testing.T) {
|
||||
alice := test.NewUser(t)
|
||||
r := test.NewRoom(t, alice)
|
||||
|
|
|
|||
Loading…
Reference in a new issue