diff --git a/syncapi/routing/context.go b/syncapi/routing/context.go index 095a868c7..3f641e438 100644 --- a/syncapi/routing/context.go +++ b/syncapi/routing/context.go @@ -159,6 +159,7 @@ func Context( }).Debug("applied history visibility (context eventsBefore/eventsAfter)") // TODO: Get the actual state at the last event returned by SelectContextAfterEvent + stateFilter.LazyLoadMembers = false state, err := snapshot.CurrentState(ctx, roomID, &stateFilter, nil) if err != nil { logrus.WithError(err).Error("unable to fetch current room state") diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index a7a127e3a..04c2020a0 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -46,7 +46,7 @@ type DatabaseTransaction interface { RoomIDsWithMembership(ctx context.Context, userID string, membership string) ([]string, error) MembershipCount(ctx context.Context, roomID, membership string, pos types.StreamPosition) (int, error) GetRoomSummary(ctx context.Context, roomID, userID string) (summary *types.Summary, err error) - RecentEvents(ctx context.Context, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error) + RecentEvents(ctx context.Context, roomIDs []string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) (map[string]types.RecentEvents, error) GetBackwardTopologyPos(ctx context.Context, events []*gomatrixserverlib.HeaderedEvent) (types.TopologyToken, error) PositionInTopology(ctx context.Context, eventID string) (pos types.StreamPosition, spos types.StreamPosition, err error) InviteEventsInRange(ctx context.Context, targetUserID string, r types.Range) (map[string]*gomatrixserverlib.HeaderedEvent, map[string]*gomatrixserverlib.HeaderedEvent, types.StreamPosition, error) diff --git a/syncapi/storage/postgres/current_room_state_table.go b/syncapi/storage/postgres/current_room_state_table.go index 3caafa14b..0d607b7c0 100644 --- a/syncapi/storage/postgres/current_room_state_table.go +++ b/syncapi/storage/postgres/current_room_state_table.go @@ -275,6 +275,15 @@ func (s *currentRoomStateStatements) SelectCurrentState( ) ([]*gomatrixserverlib.HeaderedEvent, error) { stmt := sqlutil.TxStmt(txn, s.selectCurrentStateStmt) senders, notSenders := getSendersStateFilterFilter(stateFilter) + // We're going to query members later, so remove them from this request + if stateFilter.LazyLoadMembers && !stateFilter.IncludeRedundantMembers { + notTypes := &[]string{gomatrixserverlib.MRoomMember} + if stateFilter.NotTypes != nil { + *stateFilter.NotTypes = append(*stateFilter.NotTypes, gomatrixserverlib.MRoomMember) + } else { + stateFilter.NotTypes = notTypes + } + } rows, err := stmt.QueryContext(ctx, roomID, pq.StringArray(senders), pq.StringArray(notSenders), diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go index 0075fc8d3..6d7fb20f7 100644 --- a/syncapi/storage/postgres/output_room_events_table.go +++ b/syncapi/storage/postgres/output_room_events_table.go @@ -21,16 +21,14 @@ import ( "encoding/json" "sort" + "github.com/lib/pq" "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/storage/postgres/deltas" "github.com/matrix-org/dendrite/syncapi/storage/tables" "github.com/matrix-org/dendrite/syncapi/types" - - "github.com/lib/pq" "github.com/matrix-org/gomatrixserverlib" - - "github.com/matrix-org/dendrite/internal/sqlutil" ) const outputRoomEventsSchema = ` @@ -109,14 +107,27 @@ const selectRecentEventsSQL = "" + " AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" + " ORDER BY id DESC LIMIT $8" -const selectRecentEventsForSyncSQL = "" + - "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events" + - " WHERE room_id = $1 AND id > $2 AND id <= $3 AND exclude_from_sync = FALSE" + - " AND ( $4::text[] IS NULL OR sender = ANY($4) )" + - " AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" + - " AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" + - " AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" + - " ORDER BY id DESC LIMIT $8" +const selectRecentEventsForSyncSQL = ` +WITH room_ids AS ( + SELECT unnest($1::text[]) AS room_id +) +SELECT x.* +FROM room_ids, + LATERAL ( + SELECT room_id, event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility + FROM syncapi_output_room_events recent_events + WHERE + recent_events.room_id = room_ids.room_id + AND recent_events.exclude_from_sync = FALSE + AND id > $2 AND id <= $3 + AND ( $4::text[] IS NULL OR sender = ANY($4) ) + AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) ) + AND ( $6::text[] IS NULL OR type LIKE ANY($6) ) + AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) ) + ORDER BY recent_events.id DESC + LIMIT $8 + ) AS x +` const selectEarlyEventsSQL = "" + "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events" + @@ -396,11 +407,7 @@ func (s *outputRoomEventsStatements) InsertEvent( // selectRecentEvents returns the most recent events in the given room, up to a maximum of 'limit'. // If onlySyncEvents has a value of true, only returns the events that aren't marked as to exclude // from sync. -func (s *outputRoomEventsStatements) SelectRecentEvents( - ctx context.Context, txn *sql.Tx, - roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, - chronologicalOrder bool, onlySyncEvents bool, -) ([]types.StreamEvent, bool, error) { +func (s *outputRoomEventsStatements) SelectRecentEvents(ctx context.Context, txn *sql.Tx, roomIDs []string, ra types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) (map[string]types.RecentEvents, error) { var stmt *sql.Stmt if onlySyncEvents { stmt = sqlutil.TxStmt(txn, s.selectRecentEventsForSyncStmt) @@ -408,8 +415,9 @@ func (s *outputRoomEventsStatements) SelectRecentEvents( stmt = sqlutil.TxStmt(txn, s.selectRecentEventsStmt) } senders, notSenders := getSendersRoomEventFilter(eventFilter) + rows, err := stmt.QueryContext( - ctx, roomID, r.Low(), r.High(), + ctx, pq.StringArray(roomIDs), ra.Low(), ra.High(), pq.StringArray(senders), pq.StringArray(notSenders), pq.StringArray(filterConvertTypeWildcardToSQL(eventFilter.Types)), @@ -417,34 +425,80 @@ func (s *outputRoomEventsStatements) SelectRecentEvents( eventFilter.Limit+1, ) if err != nil { - return nil, false, err + return nil, err } defer internal.CloseAndLogIfError(ctx, rows, "selectRecentEvents: rows.close() failed") - events, err := rowsToStreamEvents(rows) - if err != nil { - return nil, false, err - } - if chronologicalOrder { - // The events need to be returned from oldest to latest, which isn't - // necessary the way the SQL query returns them, so a sort is necessary to - // ensure the events are in the right order in the slice. - sort.SliceStable(events, func(i int, j int) bool { - return events[i].StreamPosition < events[j].StreamPosition - }) - } - // we queried for 1 more than the limit, so if we returned one more mark limited=true - limited := false - if len(events) > eventFilter.Limit { - limited = true - // re-slice the extra (oldest) event out: in chronological order this is the first entry, else the last. - if chronologicalOrder { - events = events[1:] - } else { - events = events[:len(events)-1] + + result := make(map[string]types.RecentEvents) + + for rows.Next() { + var ( + roomID string + eventID string + streamPos types.StreamPosition + eventBytes []byte + excludeFromSync bool + sessionID *int64 + txnID *string + transactionID *api.TransactionID + historyVisibility gomatrixserverlib.HistoryVisibility + ) + if err := rows.Scan(&roomID, &eventID, &streamPos, &eventBytes, &sessionID, &excludeFromSync, &txnID, &historyVisibility); err != nil { + return nil, err } + // TODO: Handle redacted events + var ev gomatrixserverlib.HeaderedEvent + if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil { + return nil, err + } + + if sessionID != nil && txnID != nil { + transactionID = &api.TransactionID{ + SessionID: *sessionID, + TransactionID: *txnID, + } + } + + r := result[roomID] + + ev.Visibility = historyVisibility + r.Events = append(r.Events, types.StreamEvent{ + HeaderedEvent: &ev, + StreamPosition: streamPos, + TransactionID: transactionID, + ExcludeFromSync: excludeFromSync, + }) + + result[roomID] = r } - return events, limited, nil + if chronologicalOrder { + for roomID, evs := range result { + // The events need to be returned from oldest to latest, which isn't + // necessary the way the SQL query returns them, so a sort is necessary to + // ensure the events are in the right order in the slice. + sort.SliceStable(evs.Events, func(i int, j int) bool { + return evs.Events[i].StreamPosition < evs.Events[j].StreamPosition + }) + + if len(evs.Events) > eventFilter.Limit { + evs.Limited = true + evs.Events = evs.Events[1:] + } + + result[roomID] = evs + } + } else { + for roomID, evs := range result { + if len(evs.Events) > eventFilter.Limit { + evs.Limited = true + evs.Events = evs.Events[:len(evs.Events)-1] + } + + result[roomID] = evs + } + } + return result, rows.Err() } // selectEarlyEvents returns the earliest events in the given room, starting diff --git a/syncapi/storage/shared/storage_sync.go b/syncapi/storage/shared/storage_sync.go index 8385b95a5..931bc9e23 100644 --- a/syncapi/storage/shared/storage_sync.go +++ b/syncapi/storage/shared/storage_sync.go @@ -151,8 +151,8 @@ func (d *DatabaseTransaction) GetRoomSummary(ctx context.Context, roomID, userID return summary, nil } -func (d *DatabaseTransaction) RecentEvents(ctx context.Context, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error) { - return d.OutputEvents.SelectRecentEvents(ctx, d.txn, roomID, r, eventFilter, chronologicalOrder, onlySyncEvents) +func (d *DatabaseTransaction) RecentEvents(ctx context.Context, roomIDs []string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) (map[string]types.RecentEvents, error) { + return d.OutputEvents.SelectRecentEvents(ctx, d.txn, roomIDs, r, eventFilter, chronologicalOrder, onlySyncEvents) } func (d *DatabaseTransaction) PositionInTopology(ctx context.Context, eventID string) (pos types.StreamPosition, spos types.StreamPosition, err error) { @@ -370,19 +370,25 @@ func (d *DatabaseTransaction) GetStateDeltas( } // get all the state events ever (i.e. for all available rooms) between these two positions - stateNeededFiltered, eventMapFiltered, err := d.OutputEvents.SelectStateInRange(ctx, d.txn, r, stateFilter, allRoomIDs) - if err != nil { - if err == sql.ErrNoRows { - return nil, nil, nil + stateFiltered := state + // avoid hitting the database if the result would be the same as above + if !isStatefilterEmpty(stateFilter) { + var stateNeededFiltered map[string]map[string]bool + var eventMapFiltered map[string]types.StreamEvent + stateNeededFiltered, eventMapFiltered, err = d.OutputEvents.SelectStateInRange(ctx, d.txn, r, stateFilter, allRoomIDs) + if err != nil { + if err == sql.ErrNoRows { + return nil, nil, nil + } + return nil, nil, err } - return nil, nil, err - } - stateFiltered, err := d.fetchStateEvents(ctx, d.txn, stateNeededFiltered, eventMapFiltered) - if err != nil { - if err == sql.ErrNoRows { - return nil, nil, nil + stateFiltered, err = d.fetchStateEvents(ctx, d.txn, stateNeededFiltered, eventMapFiltered) + if err != nil { + if err == sql.ErrNoRows { + return nil, nil, nil + } + return nil, nil, err } - return nil, nil, err } // find out which rooms this user is peeking, if any. @@ -701,6 +707,28 @@ func (d *DatabaseTransaction) MaxStreamPositionForRelations(ctx context.Context) return types.StreamPosition(id), err } +func isStatefilterEmpty(filter *gomatrixserverlib.StateFilter) bool { + if filter == nil { + return true + } + switch { + case filter.NotTypes != nil && len(*filter.NotTypes) > 0: + return false + case filter.Types != nil && len(*filter.Types) > 0: + return false + case filter.Senders != nil && len(*filter.Senders) > 0: + return false + case filter.NotSenders != nil && len(*filter.NotSenders) > 0: + return false + case filter.NotRooms != nil && len(*filter.NotRooms) > 0: + return false + case filter.ContainsURL != nil: + return false + default: + return true + } +} + func (d *DatabaseTransaction) RelationsFor(ctx context.Context, roomID, eventID, relType, eventType string, from, to types.StreamPosition, backwards bool, limit int) ( events []types.StreamEvent, prevBatch, nextBatch string, err error, ) { diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go index db708c083..dabba0402 100644 --- a/syncapi/storage/sqlite3/output_room_events_table.go +++ b/syncapi/storage/sqlite3/output_room_events_table.go @@ -366,11 +366,7 @@ func (s *outputRoomEventsStatements) InsertEvent( return streamPos, err } -func (s *outputRoomEventsStatements) SelectRecentEvents( - ctx context.Context, txn *sql.Tx, - roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, - chronologicalOrder bool, onlySyncEvents bool, -) ([]types.StreamEvent, bool, error) { +func (s *outputRoomEventsStatements) SelectRecentEvents(ctx context.Context, txn *sql.Tx, roomIDs []string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) (map[string]types.RecentEvents, error) { var query string if onlySyncEvents { query = selectRecentEventsForSyncSQL @@ -381,25 +377,25 @@ func (s *outputRoomEventsStatements) SelectRecentEvents( stmt, params, err := prepareWithFilters( s.db, txn, query, []interface{}{ - roomID, r.Low(), r.High(), + roomIDs, r.Low(), r.High(), }, eventFilter.Senders, eventFilter.NotSenders, eventFilter.Types, eventFilter.NotTypes, nil, eventFilter.ContainsURL, eventFilter.Limit+1, FilterOrderDesc, ) if err != nil { - return nil, false, fmt.Errorf("s.prepareWithFilters: %w", err) + return nil, fmt.Errorf("s.prepareWithFilters: %w", err) } defer internal.CloseAndLogIfError(ctx, stmt, "selectRecentEvents: stmt.close() failed") rows, err := stmt.QueryContext(ctx, params...) if err != nil { - return nil, false, err + return nil, err } defer internal.CloseAndLogIfError(ctx, rows, "selectRecentEvents: rows.close() failed") events, err := rowsToStreamEvents(rows) if err != nil { - return nil, false, err + return nil, err } if chronologicalOrder { // The events need to be returned from oldest to latest, which isn't @@ -410,9 +406,7 @@ func (s *outputRoomEventsStatements) SelectRecentEvents( }) } // we queried for 1 more than the limit, so if we returned one more mark limited=true - limited := false if len(events) > eventFilter.Limit { - limited = true // re-slice the extra (oldest) event out: in chronological order this is the first entry, else the last. if chronologicalOrder { events = events[1:] @@ -420,7 +414,7 @@ func (s *outputRoomEventsStatements) SelectRecentEvents( events = events[:len(events)-1] } } - return events, limited, nil + return map[string]types.RecentEvents{}, nil } func (s *outputRoomEventsStatements) SelectEarlyEvents( diff --git a/syncapi/storage/storage_test.go b/syncapi/storage/storage_test.go index e65367d8b..55a5de64e 100644 --- a/syncapi/storage/storage_test.go +++ b/syncapi/storage/storage_test.go @@ -156,12 +156,12 @@ func TestRecentEventsPDU(t *testing.T) { tc := testCases[i] t.Run(tc.Name, func(st *testing.T) { var filter gomatrixserverlib.RoomEventFilter - var gotEvents []types.StreamEvent + var gotEvents map[string]types.RecentEvents var limited bool filter.Limit = tc.Limit WithSnapshot(t, db, func(snapshot storage.DatabaseTransaction) { var err error - gotEvents, limited, err = snapshot.RecentEvents(ctx, r.ID, types.Range{ + gotEvents, err = snapshot.RecentEvents(ctx, []string{r.ID}, types.Range{ From: tc.From, To: tc.To, }, &filter, !tc.ReverseOrder, true) @@ -169,15 +169,18 @@ func TestRecentEventsPDU(t *testing.T) { st.Fatalf("failed to do sync: %s", err) } }) + streamEvents := gotEvents[r.ID] + limited = streamEvents.Limited if limited != tc.WantLimited { st.Errorf("got limited=%v want %v", limited, tc.WantLimited) } - if len(gotEvents) != len(tc.WantEvents) { + if len(streamEvents.Events) != len(tc.WantEvents) { st.Errorf("got %d events, want %d", len(gotEvents), len(tc.WantEvents)) } - for j := range gotEvents { - if !reflect.DeepEqual(gotEvents[j].JSON(), tc.WantEvents[j].JSON()) { - st.Errorf("event %d got %s want %s", j, string(gotEvents[j].JSON()), string(tc.WantEvents[j].JSON())) + + for j := range streamEvents.Events { + if !reflect.DeepEqual(streamEvents.Events[j].JSON(), tc.WantEvents[j].JSON()) { + st.Errorf("event %d got %s want %s", j, string(streamEvents.Events[j].JSON()), string(tc.WantEvents[j].JSON())) } } }) diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index 145e197cc..727d6bf2c 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -66,7 +66,7 @@ type Events interface { // SelectRecentEvents returns events between the two stream positions: exclusive of low and inclusive of high. // If onlySyncEvents has a value of true, only returns the events that aren't marked as to exclude from sync. // Returns up to `limit` events. Returns `limited=true` if there are more events in this range but we hit the `limit`. - SelectRecentEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error) + SelectRecentEvents(ctx context.Context, txn *sql.Tx, roomIDs []string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) (map[string]types.RecentEvents, error) // SelectEarlyEvents returns the earliest events in the given room. SelectEarlyEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter) ([]types.StreamEvent, error) SelectEvents(ctx context.Context, txn *sql.Tx, eventIDs []string, filter *gomatrixserverlib.RoomEventFilter, preserveOrder bool) ([]types.StreamEvent, error) diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index 44013e37c..9b69a5ca9 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -82,19 +82,30 @@ func (p *PDUStreamProvider) CompleteSync( req.Log.WithError(err).Error("unable to update event filter with ignored users") } - // Invalidate the lazyLoadCache, otherwise we end up with missing displaynames/avatars - // TODO: This might be inefficient, when joined to many and/or large rooms. + s := time.Now() + recentEvents, err := snapshot.RecentEvents(ctx, joinedRoomIDs, r, &eventFilter, true, true) + if err != nil { + return from + } + logrus.WithFields(logrus.Fields{ + "duration": time.Since(s), + "rooms": len(joinedRoomIDs)}). + Debugf("got recent events for all rooms") + // Build up a /sync response. Add joined rooms. + s = time.Now() for _, roomID := range joinedRoomIDs { + events := recentEvents[roomID] + // Invalidate the lazyLoadCache, otherwise we end up with missing displaynames/avatars + // TODO: This might be inefficient, when joined to many and/or large rooms. joinedUsers := p.notifier.JoinedUsers(roomID) for _, sharedUser := range joinedUsers { p.lazyLoadCache.InvalidateLazyLoadedUser(req.Device, roomID, sharedUser) } - } - // Build up a /sync response. Add joined rooms. - for _, roomID := range joinedRoomIDs { + // get the join response for each room jr, jerr := p.getJoinResponseForCompleteSync( - ctx, snapshot, roomID, r, &stateFilter, &eventFilter, req.WantFullState, req.Device, false, + ctx, snapshot, roomID, &stateFilter, req.WantFullState, req.Device, false, + events.Events, events.Limited, ) if jerr != nil { req.Log.WithError(jerr).Error("p.getJoinResponseForCompleteSync failed") @@ -106,6 +117,10 @@ func (p *PDUStreamProvider) CompleteSync( req.Response.Rooms.Join[roomID] = jr req.Rooms[roomID] = gomatrixserverlib.Join } + logrus.WithFields(logrus.Fields{ + "duration": time.Since(s), + "rooms": len(joinedRoomIDs)}). + Debugf("built join response for all rooms") // Add peeked rooms. peeks, err := snapshot.PeeksInRange(ctx, req.Device.UserID, req.Device.ID, r) @@ -113,11 +128,25 @@ func (p *PDUStreamProvider) CompleteSync( req.Log.WithError(err).Error("p.DB.PeeksInRange failed") return from } - for _, peek := range peeks { - if !peek.Deleted { + if len(peeks) > 0 { + peekRooms := make([]string, 0, len(peeks)) + for _, peek := range peeks { + if !peek.Deleted { + peekRooms = append(peekRooms, peek.RoomID) + } + } + + recentEvents, err = snapshot.RecentEvents(ctx, peekRooms, r, &eventFilter, true, true) + if err != nil { + return from + } + + for _, roomID := range peekRooms { var jr *types.JoinResponse + events := recentEvents[roomID] jr, err = p.getJoinResponseForCompleteSync( - ctx, snapshot, peek.RoomID, r, &stateFilter, &eventFilter, req.WantFullState, req.Device, true, + ctx, snapshot, roomID, &stateFilter, req.WantFullState, req.Device, true, + events.Events, events.Limited, ) if err != nil { req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed") @@ -126,7 +155,7 @@ func (p *PDUStreamProvider) CompleteSync( } continue } - req.Response.Rooms.Peek[peek.RoomID] = jr + req.Response.Rooms.Peek[roomID] = jr } } @@ -227,7 +256,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( stateFilter *gomatrixserverlib.StateFilter, req *types.SyncRequest, ) (types.StreamPosition, error) { - + var err error originalLimit := eventFilter.Limit // If we're going backwards, grep at least X events, this is mostly to satisfy Sytest if r.Backwards && originalLimit < recentEventBackwardsLimit { @@ -238,8 +267,8 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( } } - recentStreamEvents, limited, err := snapshot.RecentEvents( - ctx, delta.RoomID, r, + dbEvents, err := snapshot.RecentEvents( + ctx, []string{delta.RoomID}, r, eventFilter, true, true, ) if err != nil { @@ -248,6 +277,10 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( } return r.From, fmt.Errorf("p.DB.RecentEvents: %w", err) } + + recentStreamEvents := dbEvents[delta.RoomID].Events + limited := dbEvents[delta.RoomID].Limited + recentEvents := gomatrixserverlib.HeaderedReverseTopologicalOrdering( snapshot.StreamEventsToEvents(device, recentStreamEvents), gomatrixserverlib.TopologicalOrderByPrevEvents, @@ -428,25 +461,16 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync( ctx context.Context, snapshot storage.DatabaseTransaction, roomID string, - r types.Range, stateFilter *gomatrixserverlib.StateFilter, - eventFilter *gomatrixserverlib.RoomEventFilter, wantFullState bool, device *userapi.Device, isPeek bool, + recentStreamEvents []types.StreamEvent, + limited bool, ) (jr *types.JoinResponse, err error) { jr = types.NewJoinResponse() // TODO: When filters are added, we may need to call this multiple times to get enough events. // See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316 - recentStreamEvents, limited, err := snapshot.RecentEvents( - ctx, roomID, r, eventFilter, true, true, - ) - if err != nil { - if err == sql.ErrNoRows { - return jr, nil - } - return - } // Work our way through the timeline events and pick out the event IDs // of any state events that appear in the timeline. We'll specifically diff --git a/syncapi/types/types.go b/syncapi/types/types.go index 9fbadc06c..6495dd535 100644 --- a/syncapi/types/types.go +++ b/syncapi/types/types.go @@ -63,6 +63,11 @@ type StreamEvent struct { ExcludeFromSync bool } +type RecentEvents struct { + Limited bool + Events []StreamEvent +} + // Range represents a range between two stream positions. type Range struct { // From is the position the client has already received.