mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-07 15:03:09 -06:00
/sync: Handle missing state events and return events in the correct order
This commit is contained in:
parent
92a0325266
commit
6b86d68bed
|
|
@ -146,19 +146,15 @@ func (s *outputRoomEventsStatements) StateBetween(txn *sql.Tx, oldPos, newPos ty
|
||||||
eventIDToEvent[ev.EventID()] = ev
|
eventIDToEvent[ev.EventID()] = ev
|
||||||
}
|
}
|
||||||
|
|
||||||
stateBetween, missingEvents := mapEventIDsToEvents(eventIDToEvent, stateNeeded)
|
return s.fetchStateEvents(txn, stateNeeded, eventIDToEvent)
|
||||||
|
|
||||||
if len(missingEvents) > 0 {
|
|
||||||
return nil, fmt.Errorf("error StateBetween: TODO missing events")
|
|
||||||
}
|
|
||||||
return stateBetween, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// convert the set of event IDs into a set of events. Mark any which are missing.
|
// fetchStateEvents converts the set of event IDs into a set of events. It will fetch any which are missing from the database.
|
||||||
func mapEventIDsToEvents(eventIDToEvent map[string]gomatrixserverlib.Event, stateNeeded map[string]map[string]bool) (map[string][]gomatrixserverlib.Event, map[string][]string) {
|
// Returns a map of room ID to list of events.
|
||||||
|
func (s *outputRoomEventsStatements) fetchStateEvents(txn *sql.Tx, roomIDToEventIDSet map[string]map[string]bool, eventIDToEvent map[string]gomatrixserverlib.Event) (map[string][]gomatrixserverlib.Event, error) {
|
||||||
stateBetween := make(map[string][]gomatrixserverlib.Event)
|
stateBetween := make(map[string][]gomatrixserverlib.Event)
|
||||||
missingEvents := make(map[string][]string)
|
missingEvents := make(map[string][]string)
|
||||||
for roomID, ids := range stateNeeded {
|
for roomID, ids := range roomIDToEventIDSet {
|
||||||
events := stateBetween[roomID]
|
events := stateBetween[roomID]
|
||||||
for id, need := range ids {
|
for id, need := range ids {
|
||||||
if !need {
|
if !need {
|
||||||
|
|
@ -175,7 +171,25 @@ func mapEventIDsToEvents(eventIDToEvent map[string]gomatrixserverlib.Event, stat
|
||||||
}
|
}
|
||||||
stateBetween[roomID] = events
|
stateBetween[roomID] = events
|
||||||
}
|
}
|
||||||
return stateBetween, missingEvents
|
|
||||||
|
if len(missingEvents) > 0 {
|
||||||
|
// This happens when add_state_ids has an event ID which is not in the provided range.
|
||||||
|
// We need to explicitly fetch them.
|
||||||
|
allMissingEventIDs := []string{}
|
||||||
|
for _, missingEvIDs := range missingEvents {
|
||||||
|
allMissingEventIDs = append(allMissingEventIDs, missingEvIDs...)
|
||||||
|
}
|
||||||
|
evs, err := s.Events(txn, allMissingEventIDs)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
// we know we got them all otherwise an error would've been returned, so just loop the events
|
||||||
|
for _, ev := range evs {
|
||||||
|
roomID := ev.RoomID()
|
||||||
|
stateBetween[roomID] = append(stateBetween[roomID], ev)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return stateBetween, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// MaxID returns the ID of the last inserted event in this table. 'txn' is optional. If it is not supplied,
|
// MaxID returns the ID of the last inserted event in this table. 'txn' is optional. If it is not supplied,
|
||||||
|
|
@ -210,7 +224,9 @@ func (s *outputRoomEventsStatements) RecentEventsInRoom(txn *sql.Tx, roomID stri
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
defer rows.Close()
|
defer rows.Close()
|
||||||
return rowsToEvents(rows)
|
// reverse the order because [0] is the newest event due to the ORDER BY in SQL-land. The reverse order makes [0] the oldest event,
|
||||||
|
// which is correct for /sync responses.
|
||||||
|
return rowsToEvents(rows, rowOrderReverse)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Events returns the events for the given event IDs. Returns an error if any one of the event IDs given are missing
|
// Events returns the events for the given event IDs. Returns an error if any one of the event IDs given are missing
|
||||||
|
|
@ -221,7 +237,7 @@ func (s *outputRoomEventsStatements) Events(txn *sql.Tx, eventIDs []string) ([]g
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
defer rows.Close()
|
defer rows.Close()
|
||||||
result, err := rowsToEvents(rows)
|
result, err := rowsToEvents(rows, rowOrderForward)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
@ -232,7 +248,14 @@ func (s *outputRoomEventsStatements) Events(txn *sql.Tx, eventIDs []string) ([]g
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func rowsToEvents(rows *sql.Rows) ([]gomatrixserverlib.Event, error) {
|
type rowOrdering uint8
|
||||||
|
|
||||||
|
const (
|
||||||
|
rowOrderForward = rowOrdering(iota)
|
||||||
|
rowOrderReverse
|
||||||
|
)
|
||||||
|
|
||||||
|
func rowsToEvents(rows *sql.Rows, order rowOrdering) ([]gomatrixserverlib.Event, error) {
|
||||||
var result []gomatrixserverlib.Event
|
var result []gomatrixserverlib.Event
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
var eventBytes []byte
|
var eventBytes []byte
|
||||||
|
|
@ -244,7 +267,14 @@ func rowsToEvents(rows *sql.Rows) ([]gomatrixserverlib.Event, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
result = append(result, ev)
|
if order == rowOrderForward {
|
||||||
|
result = append(result, ev)
|
||||||
|
} else if order == rowOrderReverse {
|
||||||
|
result = append([]gomatrixserverlib.Event{ev}, result...)
|
||||||
|
} else {
|
||||||
|
return nil, fmt.Errorf("rowsToEvents: bad order %d", order)
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue