Separate SQLite stream IDs

This commit is contained in:
Neil Alexander 2021-01-19 15:06:48 +00:00
parent f13e546e49
commit f8aea35292
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
5 changed files with 36 additions and 12 deletions

View file

@ -82,7 +82,7 @@ func (s *accountDataStatements) InsertAccountData(
ctx context.Context, txn *sql.Tx,
userID, roomID, dataType string,
) (pos types.StreamPosition, err error) {
pos, err = s.streamIDStatements.nextStreamID(ctx, txn)
pos, err = s.streamIDStatements.nextAccountDataID(ctx, txn)
if err != nil {
return
}

View file

@ -93,7 +93,7 @@ func NewSqliteInvitesTable(db *sql.DB, streamID *streamIDStatements) (tables.Inv
func (s *inviteEventsStatements) InsertInviteEvent(
ctx context.Context, txn *sql.Tx, inviteEvent *gomatrixserverlib.HeaderedEvent,
) (streamPos types.StreamPosition, err error) {
streamPos, err = s.streamIDStatements.nextStreamID(ctx, txn)
streamPos, err = s.streamIDStatements.nextInviteID(ctx, txn)
if err != nil {
return
}
@ -119,7 +119,7 @@ func (s *inviteEventsStatements) InsertInviteEvent(
func (s *inviteEventsStatements) DeleteInviteEvent(
ctx context.Context, txn *sql.Tx, inviteEventID string,
) (types.StreamPosition, error) {
streamPos, err := s.streamIDStatements.nextStreamID(ctx, txn)
streamPos, err := s.streamIDStatements.nextInviteID(ctx, txn)
if err != nil {
return streamPos, err
}

View file

@ -198,16 +198,16 @@ func (s *outputRoomEventsStatements) UpdateEventJSON(ctx context.Context, event
// two positions, only the most recent state is returned.
func (s *outputRoomEventsStatements) SelectStateInRange(
ctx context.Context, txn *sql.Tx, r types.Range,
stateFilterPart *gomatrixserverlib.StateFilter,
stateFilter *gomatrixserverlib.StateFilter,
) (map[string]map[string]bool, map[string]types.StreamEvent, error) {
stmt, params, err := s.prepareWithFilters(
selectStateInRangeSQL,
[]interface{}{
r.Low(), r.High(),
},
stateFilterPart.Senders, stateFilterPart.NotSenders,
stateFilterPart.Types, stateFilterPart.NotTypes,
stateFilterPart.Limit, "ASC",
stateFilter.Senders, stateFilter.NotSenders,
stateFilter.Types, stateFilter.NotTypes,
stateFilter.Limit, "ASC",
)
if err != nil {
return nil, nil, fmt.Errorf("s.prepareWithFilters: %w", err)
@ -337,7 +337,7 @@ func (s *outputRoomEventsStatements) InsertEvent(
return 0, err
}
streamPos, err := s.streamIDStatements.nextStreamID(ctx, txn)
streamPos, err := s.streamIDStatements.nextPDUID(ctx, txn)
if err != nil {
return 0, err
}

View file

@ -108,7 +108,7 @@ func NewSqlitePeeksTable(db *sql.DB, streamID *streamIDStatements) (tables.Peeks
func (s *peekStatements) InsertPeek(
ctx context.Context, txn *sql.Tx, roomID, userID, deviceID string,
) (streamPos types.StreamPosition, err error) {
streamPos, err = s.streamIDStatements.nextStreamID(ctx, txn)
streamPos, err = s.streamIDStatements.nextPDUID(ctx, txn)
if err != nil {
return
}
@ -120,7 +120,7 @@ func (s *peekStatements) InsertPeek(
func (s *peekStatements) DeletePeek(
ctx context.Context, txn *sql.Tx, roomID, userID, deviceID string,
) (streamPos types.StreamPosition, err error) {
streamPos, err = s.streamIDStatements.nextStreamID(ctx, txn)
streamPos, err = s.streamIDStatements.nextPDUID(ctx, txn)
if err != nil {
return
}
@ -131,7 +131,7 @@ func (s *peekStatements) DeletePeek(
func (s *peekStatements) DeletePeeks(
ctx context.Context, txn *sql.Tx, roomID, userID string,
) (types.StreamPosition, error) {
streamPos, err := s.streamIDStatements.nextStreamID(ctx, txn)
streamPos, err := s.streamIDStatements.nextPDUID(ctx, txn)
if err != nil {
return 0, err
}

View file

@ -20,6 +20,10 @@ INSERT INTO syncapi_stream_id (stream_name, stream_id) VALUES ("global", 0)
ON CONFLICT DO NOTHING;
INSERT INTO syncapi_stream_id (stream_name, stream_id) VALUES ("receipt", 0)
ON CONFLICT DO NOTHING;
INSERT INTO syncapi_stream_id (stream_name, stream_id) VALUES ("accountdata", 0)
ON CONFLICT DO NOTHING;
INSERT INTO syncapi_stream_id (stream_name, stream_id) VALUES ("invite", 0)
ON CONFLICT DO NOTHING;
`
const increaseStreamIDStmt = "" +
@ -49,7 +53,7 @@ func (s *streamIDStatements) prepare(db *sql.DB) (err error) {
return
}
func (s *streamIDStatements) nextStreamID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error) {
func (s *streamIDStatements) nextPDUID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error) {
increaseStmt := sqlutil.TxStmt(txn, s.increaseStreamIDStmt)
selectStmt := sqlutil.TxStmt(txn, s.selectStreamIDStmt)
if _, err = increaseStmt.ExecContext(ctx, "global"); err != nil {
@ -68,3 +72,23 @@ func (s *streamIDStatements) nextReceiptID(ctx context.Context, txn *sql.Tx) (po
err = selectStmt.QueryRowContext(ctx, "receipt").Scan(&pos)
return
}
func (s *streamIDStatements) nextInviteID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error) {
increaseStmt := sqlutil.TxStmt(txn, s.increaseStreamIDStmt)
selectStmt := sqlutil.TxStmt(txn, s.selectStreamIDStmt)
if _, err = increaseStmt.ExecContext(ctx, "invite"); err != nil {
return
}
err = selectStmt.QueryRowContext(ctx, "invite").Scan(&pos)
return
}
func (s *streamIDStatements) nextAccountDataID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error) {
increaseStmt := sqlutil.TxStmt(txn, s.increaseStreamIDStmt)
selectStmt := sqlutil.TxStmt(txn, s.selectStreamIDStmt)
if _, err = increaseStmt.ExecContext(ctx, "accountdata"); err != nil {
return
}
err = selectStmt.QueryRowContext(ctx, "accountdata").Scan(&pos)
return
}