Try to de-race invites

This commit is contained in:
Neil Alexander 2022-09-30 11:12:23 +01:00
parent 878d1249d9
commit d138dc48b0
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
6 changed files with 36 additions and 19 deletions

View file

@ -46,7 +46,7 @@ type DatabaseTransaction interface {
RecentEvents(ctx context.Context, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error)
GetBackwardTopologyPos(ctx context.Context, events []types.StreamEvent) (types.TopologyToken, error)
PositionInTopology(ctx context.Context, eventID string) (pos types.StreamPosition, spos types.StreamPosition, err error)
InviteEventsInRange(ctx context.Context, targetUserID string, r types.Range) (map[string]*gomatrixserverlib.HeaderedEvent, map[string]*gomatrixserverlib.HeaderedEvent, error)
InviteEventsInRange(ctx context.Context, targetUserID string, r types.Range) (map[string]*gomatrixserverlib.HeaderedEvent, map[string]*gomatrixserverlib.HeaderedEvent, types.StreamPosition, error)
PeeksInRange(ctx context.Context, userID, deviceID string, r types.Range) (peeks []types.Peek, err error)
RoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []types.OutputReceiptEvent, error)
// AllJoinedUsersInRooms returns a map of room ID to a list of all joined user IDs.

View file

@ -55,7 +55,7 @@ const deleteInviteEventSQL = "" +
"UPDATE syncapi_invite_events SET deleted=TRUE, id=nextval('syncapi_stream_id') WHERE event_id = $1 AND deleted=FALSE RETURNING id"
const selectInviteEventsInRangeSQL = "" +
"SELECT room_id, headered_event_json, deleted FROM syncapi_invite_events" +
"SELECT id, room_id, headered_event_json, deleted FROM syncapi_invite_events" +
" WHERE target_user_id = $1 AND id > $2 AND id <= $3" +
" ORDER BY id DESC"
@ -121,23 +121,28 @@ func (s *inviteEventsStatements) DeleteInviteEvent(
// active invites for the target user ID in the supplied range.
func (s *inviteEventsStatements) SelectInviteEventsInRange(
ctx context.Context, txn *sql.Tx, targetUserID string, r types.Range,
) (map[string]*gomatrixserverlib.HeaderedEvent, map[string]*gomatrixserverlib.HeaderedEvent, error) {
) (map[string]*gomatrixserverlib.HeaderedEvent, map[string]*gomatrixserverlib.HeaderedEvent, types.StreamPosition, error) {
var lastPos types.StreamPosition
stmt := sqlutil.TxStmt(txn, s.selectInviteEventsInRangeStmt)
rows, err := stmt.QueryContext(ctx, targetUserID, r.Low(), r.High())
if err != nil {
return nil, nil, err
return nil, nil, lastPos, err
}
defer internal.CloseAndLogIfError(ctx, rows, "selectInviteEventsInRange: rows.close() failed")
result := map[string]*gomatrixserverlib.HeaderedEvent{}
retired := map[string]*gomatrixserverlib.HeaderedEvent{}
for rows.Next() {
var (
id types.StreamPosition
roomID string
eventJSON []byte
deleted bool
)
if err = rows.Scan(&roomID, &eventJSON, &deleted); err != nil {
return nil, nil, err
if err = rows.Scan(&id, &roomID, &eventJSON, &deleted); err != nil {
return nil, nil, lastPos, err
}
if id > lastPos {
lastPos = id
}
// if we have seen this room before, it has a higher stream position and hence takes priority
@ -150,7 +155,7 @@ func (s *inviteEventsStatements) SelectInviteEventsInRange(
var event *gomatrixserverlib.HeaderedEvent
if err := json.Unmarshal(eventJSON, &event); err != nil {
return nil, nil, err
return nil, nil, lastPos, err
}
if deleted {
@ -159,7 +164,10 @@ func (s *inviteEventsStatements) SelectInviteEventsInRange(
result[roomID] = event
}
}
return result, retired, rows.Err()
if lastPos == 0 {
lastPos = r.To
}
return result, retired, lastPos, rows.Err()
}
func (s *inviteEventsStatements) SelectMaxInviteID(

View file

@ -102,7 +102,7 @@ func (d *DatabaseTransaction) PositionInTopology(ctx context.Context, eventID st
return d.Topology.SelectPositionInTopology(ctx, d.txn, eventID)
}
func (d *DatabaseTransaction) InviteEventsInRange(ctx context.Context, targetUserID string, r types.Range) (map[string]*gomatrixserverlib.HeaderedEvent, map[string]*gomatrixserverlib.HeaderedEvent, error) {
func (d *DatabaseTransaction) InviteEventsInRange(ctx context.Context, targetUserID string, r types.Range) (map[string]*gomatrixserverlib.HeaderedEvent, map[string]*gomatrixserverlib.HeaderedEvent, types.StreamPosition, error) {
return d.Invites.SelectInviteEventsInRange(ctx, d.txn, targetUserID, r)
}

View file

@ -50,7 +50,7 @@ const deleteInviteEventSQL = "" +
"UPDATE syncapi_invite_events SET deleted=true, id=$1 WHERE event_id = $2 AND deleted=false"
const selectInviteEventsInRangeSQL = "" +
"SELECT room_id, headered_event_json, deleted FROM syncapi_invite_events" +
"SELECT id, room_id, headered_event_json, deleted FROM syncapi_invite_events" +
" WHERE target_user_id = $1 AND id > $2 AND id <= $3" +
" ORDER BY id DESC"
@ -132,23 +132,28 @@ func (s *inviteEventsStatements) DeleteInviteEvent(
// active invites for the target user ID in the supplied range.
func (s *inviteEventsStatements) SelectInviteEventsInRange(
ctx context.Context, txn *sql.Tx, targetUserID string, r types.Range,
) (map[string]*gomatrixserverlib.HeaderedEvent, map[string]*gomatrixserverlib.HeaderedEvent, error) {
) (map[string]*gomatrixserverlib.HeaderedEvent, map[string]*gomatrixserverlib.HeaderedEvent, types.StreamPosition, error) {
var lastPos types.StreamPosition
stmt := sqlutil.TxStmt(txn, s.selectInviteEventsInRangeStmt)
rows, err := stmt.QueryContext(ctx, targetUserID, r.Low(), r.High())
if err != nil {
return nil, nil, err
return nil, nil, lastPos, err
}
defer internal.CloseAndLogIfError(ctx, rows, "selectInviteEventsInRange: rows.close() failed")
result := map[string]*gomatrixserverlib.HeaderedEvent{}
retired := map[string]*gomatrixserverlib.HeaderedEvent{}
for rows.Next() {
var (
id types.StreamPosition
roomID string
eventJSON []byte
deleted bool
)
if err = rows.Scan(&roomID, &eventJSON, &deleted); err != nil {
return nil, nil, err
if err = rows.Scan(&id, &roomID, &eventJSON, &deleted); err != nil {
return nil, nil, lastPos, err
}
if id > lastPos {
lastPos = id
}
// if we have seen this room before, it has a higher stream position and hence takes priority
@ -161,15 +166,19 @@ func (s *inviteEventsStatements) SelectInviteEventsInRange(
var event *gomatrixserverlib.HeaderedEvent
if err := json.Unmarshal(eventJSON, &event); err != nil {
return nil, nil, err
return nil, nil, lastPos, err
}
if deleted {
retired[roomID] = event
} else {
result[roomID] = event
}
}
return result, retired, nil
if lastPos == 0 {
lastPos = r.To
}
return result, retired, lastPos, nil
}
func (s *inviteEventsStatements) SelectMaxInviteID(

View file

@ -37,7 +37,7 @@ type Invites interface {
DeleteInviteEvent(ctx context.Context, txn *sql.Tx, inviteEventID string) (types.StreamPosition, error)
// SelectInviteEventsInRange returns a map of room ID to invite events. If multiple invite/retired invites exist in the given range, return the latest value
// for the room.
SelectInviteEventsInRange(ctx context.Context, txn *sql.Tx, targetUserID string, r types.Range) (invites map[string]*gomatrixserverlib.HeaderedEvent, retired map[string]*gomatrixserverlib.HeaderedEvent, err error)
SelectInviteEventsInRange(ctx context.Context, txn *sql.Tx, targetUserID string, r types.Range) (invites map[string]*gomatrixserverlib.HeaderedEvent, retired map[string]*gomatrixserverlib.HeaderedEvent, maxID types.StreamPosition, err error)
SelectMaxInviteID(ctx context.Context, txn *sql.Tx) (id int64, err error)
}

View file

@ -51,7 +51,7 @@ func (p *InviteStreamProvider) IncrementalSync(
To: to,
}
invites, retiredInvites, err := snapshot.InviteEventsInRange(
invites, retiredInvites, maxID, err := snapshot.InviteEventsInRange(
ctx, req.Device.UserID, r,
)
if err != nil {
@ -91,5 +91,5 @@ func (p *InviteStreamProvider) IncrementalSync(
}
}
return to
return maxID
}