Revert "Try to de-race stream positions"

This reverts commit 1506d595de.
This commit is contained in:
Neil Alexander 2022-09-30 10:21:03 +01:00
parent 1506d595de
commit 3aa0071676
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
18 changed files with 56 additions and 121 deletions

View file

@ -45,8 +45,14 @@ func GetFilter(
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
} }
snapshot, err := syncDB.NewDatabaseSnapshot(req.Context())
if err != nil {
return jsonerror.InternalServerError()
}
defer snapshot.Rollback() // nolint:errcheck
filter := gomatrixserverlib.DefaultFilter() filter := gomatrixserverlib.DefaultFilter()
if err := syncDB.GetFilter(req.Context(), &filter, localpart, filterID); err != nil { if err := snapshot.GetFilter(req.Context(), &filter, localpart, filterID); err != nil {
//TODO better error handling. This error message is *probably* right, //TODO better error handling. This error message is *probably* right,
// but if there are obscure db errors, this will also be returned, // but if there are obscure db errors, this will also be returned,
// even though it is not correct. // even though it is not correct.

View file

@ -90,6 +90,10 @@ type DatabaseSnapshot interface {
// SendToDeviceUpdatesForSync returns a list of send-to-device updates. It returns the // SendToDeviceUpdatesForSync returns a list of send-to-device updates. It returns the
// relevant events within the given ranges for the supplied user ID and device ID. // relevant events within the given ranges for the supplied user ID and device ID.
SendToDeviceUpdatesForSync(ctx context.Context, userID, deviceID string, from, to types.StreamPosition) (pos types.StreamPosition, events []types.SendToDeviceEvent, err error) SendToDeviceUpdatesForSync(ctx context.Context, userID, deviceID string, from, to types.StreamPosition) (pos types.StreamPosition, events []types.SendToDeviceEvent, err error)
// GetFilter looks up the filter associated with a given local user and filter ID
// and populates the target filter. Otherwise returns an error if no such filter exists
// or if there was an error talking to the database.
GetFilter(ctx context.Context, target *gomatrixserverlib.Filter, localpart string, filterID string) error
// GetRoomReceipts gets all receipts for a given roomID // GetRoomReceipts gets all receipts for a given roomID
GetRoomReceipts(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) ([]types.OutputReceiptEvent, error) GetRoomReceipts(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) ([]types.OutputReceiptEvent, error)
SelectContextEvent(ctx context.Context, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error) SelectContextEvent(ctx context.Context, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error)
@ -114,10 +118,6 @@ type Database interface {
NewDatabaseSnapshot(ctx context.Context) (*shared.DatabaseSnapshot, error) NewDatabaseSnapshot(ctx context.Context) (*shared.DatabaseSnapshot, error)
NewDatabaseWritable(ctx context.Context) (*shared.DatabaseSnapshot, error) NewDatabaseWritable(ctx context.Context) (*shared.DatabaseSnapshot, error)
// GetFilter looks up the filter associated with a given local user and filter ID
// and populates the target filter. Otherwise returns an error if no such filter exists
// or if there was an error talking to the database.
GetFilter(ctx context.Context, target *gomatrixserverlib.Filter, localpart string, filterID string) error
// Events lookups a list of event by their event ID. // Events lookups a list of event by their event ID.
// Returns a list of events matching the requested IDs found in the database. // Returns a list of events matching the requested IDs found in the database.
// If an event is not found in the database then it will be omitted from the list. // If an event is not found in the database then it will be omitted from the list.

View file

@ -253,6 +253,12 @@ func (d *DatabaseSnapshot) StreamToTopologicalPosition(
} }
} }
func (d *DatabaseSnapshot) GetFilter(
ctx context.Context, target *gomatrixserverlib.Filter, localpart string, filterID string,
) error {
return d.Filter.SelectFilter(ctx, d.txn, target, localpart, filterID)
}
// GetBackwardTopologyPos retrieves the backward topology position, i.e. the position of the // GetBackwardTopologyPos retrieves the backward topology position, i.e. the position of the
// oldest event in the room's topology. // oldest event in the room's topology.
func (d *DatabaseSnapshot) GetBackwardTopologyPos( func (d *DatabaseSnapshot) GetBackwardTopologyPos(

View file

@ -85,12 +85,6 @@ func (d *Database) NewDatabaseWritable(ctx context.Context) (*DatabaseSnapshot,
}, nil }, nil
} }
func (d *Database) GetFilter(
ctx context.Context, target *gomatrixserverlib.Filter, localpart string, filterID string,
) error {
return d.Filter.SelectFilter(ctx, nil, target, localpart, filterID)
}
func (d *Database) Events(ctx context.Context, eventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error) { func (d *Database) Events(ctx context.Context, eventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error) {
streamEvents, err := d.OutputEvents.SelectEvents(ctx, nil, eventIDs, nil, false) streamEvents, err := d.OutputEvents.SelectEvents(ctx, nil, eventIDs, nil, false)
if err != nil { if err != nil {

View file

@ -23,17 +23,11 @@ func (p *AccountDataStreamProvider) Setup(
p.latestMutex.Lock() p.latestMutex.Lock()
defer p.latestMutex.Unlock() defer p.latestMutex.Unlock()
p.latest = p.latestPosition(ctx, snapshot)
}
func (p *AccountDataStreamProvider) latestPosition(
ctx context.Context, snapshot storage.DatabaseSnapshot,
) types.StreamPosition {
id, err := snapshot.MaxStreamPositionForAccountData(context.Background()) id, err := snapshot.MaxStreamPositionForAccountData(context.Background())
if err != nil { if err != nil {
panic(err) panic(err)
} }
return id p.latest = id
} }
func (p *AccountDataStreamProvider) CompleteSync( func (p *AccountDataStreamProvider) CompleteSync(
@ -41,7 +35,7 @@ func (p *AccountDataStreamProvider) CompleteSync(
snapshot storage.DatabaseSnapshot, snapshot storage.DatabaseSnapshot,
req *types.SyncRequest, req *types.SyncRequest,
) types.StreamPosition { ) types.StreamPosition {
return p.IncrementalSync(ctx, snapshot, req, 0, p.latestPosition(ctx, snapshot)) return p.IncrementalSync(ctx, snapshot, req, 0, p.LatestPosition(ctx))
} }
func (p *AccountDataStreamProvider) IncrementalSync( func (p *AccountDataStreamProvider) IncrementalSync(

View file

@ -16,18 +16,12 @@ type DeviceListStreamProvider struct {
keyAPI keyapi.SyncKeyAPI keyAPI keyapi.SyncKeyAPI
} }
func (p *DeviceListStreamProvider) latestPosition(
ctx context.Context, snapshot storage.DatabaseSnapshot,
) types.StreamPosition {
return 0 // TODO: is this the right thing to do?
}
func (p *DeviceListStreamProvider) CompleteSync( func (p *DeviceListStreamProvider) CompleteSync(
ctx context.Context, ctx context.Context,
snapshot storage.DatabaseSnapshot, snapshot storage.DatabaseSnapshot,
req *types.SyncRequest, req *types.SyncRequest,
) types.StreamPosition { ) types.StreamPosition {
return p.latestPosition(ctx, snapshot) return p.LatestPosition(ctx)
} }
func (p *DeviceListStreamProvider) IncrementalSync( func (p *DeviceListStreamProvider) IncrementalSync(

View file

@ -25,17 +25,11 @@ func (p *InviteStreamProvider) Setup(
p.latestMutex.Lock() p.latestMutex.Lock()
defer p.latestMutex.Unlock() defer p.latestMutex.Unlock()
p.latest = p.latestPosition(ctx, snapshot) id, err := snapshot.MaxStreamPositionForInvites(context.Background())
}
func (p *InviteStreamProvider) latestPosition(
ctx context.Context, snapshot storage.DatabaseSnapshot,
) types.StreamPosition {
id, err := snapshot.MaxStreamPositionForAccountData(context.Background())
if err != nil { if err != nil {
panic(err) panic(err)
} }
return id p.latest = id
} }
func (p *InviteStreamProvider) CompleteSync( func (p *InviteStreamProvider) CompleteSync(
@ -43,7 +37,7 @@ func (p *InviteStreamProvider) CompleteSync(
snapshot storage.DatabaseSnapshot, snapshot storage.DatabaseSnapshot,
req *types.SyncRequest, req *types.SyncRequest,
) types.StreamPosition { ) types.StreamPosition {
return p.IncrementalSync(ctx, snapshot, req, 0, p.latestPosition(ctx, snapshot)) return p.IncrementalSync(ctx, snapshot, req, 0, p.LatestPosition(ctx))
} }
func (p *InviteStreamProvider) IncrementalSync( func (p *InviteStreamProvider) IncrementalSync(

View file

@ -16,20 +16,11 @@ func (p *NotificationDataStreamProvider) Setup(
) { ) {
p.DefaultStreamProvider.Setup(ctx, snapshot) p.DefaultStreamProvider.Setup(ctx, snapshot)
p.latestMutex.Lock()
defer p.latestMutex.Unlock()
p.latest = p.latestPosition(ctx, snapshot)
}
func (p *NotificationDataStreamProvider) latestPosition(
ctx context.Context, snapshot storage.DatabaseSnapshot,
) types.StreamPosition {
id, err := snapshot.MaxStreamPositionForNotificationData(context.Background()) id, err := snapshot.MaxStreamPositionForNotificationData(context.Background())
if err != nil { if err != nil {
panic(err) panic(err)
} }
return id p.latest = id
} }
func (p *NotificationDataStreamProvider) CompleteSync( func (p *NotificationDataStreamProvider) CompleteSync(
@ -37,7 +28,7 @@ func (p *NotificationDataStreamProvider) CompleteSync(
snapshot storage.DatabaseSnapshot, snapshot storage.DatabaseSnapshot,
req *types.SyncRequest, req *types.SyncRequest,
) types.StreamPosition { ) types.StreamPosition {
return p.IncrementalSync(ctx, snapshot, req, 0, p.latestPosition(ctx, snapshot)) return p.IncrementalSync(ctx, snapshot, req, 0, p.LatestPosition(ctx))
} }
func (p *NotificationDataStreamProvider) IncrementalSync( func (p *NotificationDataStreamProvider) IncrementalSync(
@ -68,5 +59,5 @@ func (p *NotificationDataStreamProvider) IncrementalSync(
req.Response.Rooms.Join[roomID] = jr req.Response.Rooms.Join[roomID] = jr
} }
return p.latestPosition(ctx, snapshot) return p.LatestPosition(ctx)
} }

View file

@ -47,17 +47,11 @@ func (p *PDUStreamProvider) Setup(
p.latestMutex.Lock() p.latestMutex.Lock()
defer p.latestMutex.Unlock() defer p.latestMutex.Unlock()
p.latest = p.latestPosition(ctx, snapshot)
}
func (p *PDUStreamProvider) latestPosition(
ctx context.Context, snapshot storage.DatabaseSnapshot,
) types.StreamPosition {
id, err := snapshot.MaxStreamPositionForPDUs(context.Background()) id, err := snapshot.MaxStreamPositionForPDUs(context.Background())
if err != nil { if err != nil {
panic(err) panic(err)
} }
return id p.latest = id
} }
func (p *PDUStreamProvider) CompleteSync( func (p *PDUStreamProvider) CompleteSync(
@ -66,7 +60,7 @@ func (p *PDUStreamProvider) CompleteSync(
req *types.SyncRequest, req *types.SyncRequest,
) types.StreamPosition { ) types.StreamPosition {
from := types.StreamPosition(0) from := types.StreamPosition(0)
to := p.latestPosition(ctx, snapshot) to := p.LatestPosition(ctx)
// Get the current sync position which we will base the sync response on. // Get the current sync position which we will base the sync response on.
// For complete syncs, we want to start at the most recent events and work // For complete syncs, we want to start at the most recent events and work

View file

@ -39,20 +39,11 @@ func (p *PresenceStreamProvider) Setup(
) { ) {
p.DefaultStreamProvider.Setup(ctx, snapshot) p.DefaultStreamProvider.Setup(ctx, snapshot)
p.latestMutex.Lock()
defer p.latestMutex.Unlock()
p.latest = p.latestPosition(ctx, snapshot)
}
func (p *PresenceStreamProvider) latestPosition(
ctx context.Context, snapshot storage.DatabaseSnapshot,
) types.StreamPosition {
id, err := snapshot.MaxStreamPositionForPresence(context.Background()) id, err := snapshot.MaxStreamPositionForPresence(context.Background())
if err != nil { if err != nil {
panic(err) panic(err)
} }
return id p.latest = id
} }
func (p *PresenceStreamProvider) CompleteSync( func (p *PresenceStreamProvider) CompleteSync(
@ -60,7 +51,7 @@ func (p *PresenceStreamProvider) CompleteSync(
snapshot storage.DatabaseSnapshot, snapshot storage.DatabaseSnapshot,
req *types.SyncRequest, req *types.SyncRequest,
) types.StreamPosition { ) types.StreamPosition {
return p.IncrementalSync(ctx, snapshot, req, 0, p.latestPosition(ctx, snapshot)) return p.IncrementalSync(ctx, snapshot, req, 0, p.LatestPosition(ctx))
} }
func (p *PresenceStreamProvider) IncrementalSync( func (p *PresenceStreamProvider) IncrementalSync(

View file

@ -18,20 +18,11 @@ func (p *ReceiptStreamProvider) Setup(
) { ) {
p.DefaultStreamProvider.Setup(ctx, snapshot) p.DefaultStreamProvider.Setup(ctx, snapshot)
p.latestMutex.Lock()
defer p.latestMutex.Unlock()
p.latest = p.latestPosition(ctx, snapshot)
}
func (p *ReceiptStreamProvider) latestPosition(
ctx context.Context, snapshot storage.DatabaseSnapshot,
) types.StreamPosition {
id, err := snapshot.MaxStreamPositionForReceipts(context.Background()) id, err := snapshot.MaxStreamPositionForReceipts(context.Background())
if err != nil { if err != nil {
panic(err) panic(err)
} }
return id p.latest = id
} }
func (p *ReceiptStreamProvider) CompleteSync( func (p *ReceiptStreamProvider) CompleteSync(
@ -39,7 +30,7 @@ func (p *ReceiptStreamProvider) CompleteSync(
snapshot storage.DatabaseSnapshot, snapshot storage.DatabaseSnapshot,
req *types.SyncRequest, req *types.SyncRequest,
) types.StreamPosition { ) types.StreamPosition {
return p.IncrementalSync(ctx, snapshot, req, 0, p.latestPosition(ctx, snapshot)) return p.IncrementalSync(ctx, snapshot, req, 0, p.LatestPosition(ctx))
} }
func (p *ReceiptStreamProvider) IncrementalSync( func (p *ReceiptStreamProvider) IncrementalSync(

View file

@ -16,20 +16,11 @@ func (p *SendToDeviceStreamProvider) Setup(
) { ) {
p.DefaultStreamProvider.Setup(ctx, snapshot) p.DefaultStreamProvider.Setup(ctx, snapshot)
p.latestMutex.Lock()
defer p.latestMutex.Unlock()
p.latest = p.latestPosition(ctx, snapshot)
}
func (p *SendToDeviceStreamProvider) latestPosition(
ctx context.Context, snapshot storage.DatabaseSnapshot,
) types.StreamPosition {
id, err := snapshot.MaxStreamPositionForSendToDeviceMessages(context.Background()) id, err := snapshot.MaxStreamPositionForSendToDeviceMessages(context.Background())
if err != nil { if err != nil {
panic(err) panic(err)
} }
return id p.latest = id
} }
func (p *SendToDeviceStreamProvider) CompleteSync( func (p *SendToDeviceStreamProvider) CompleteSync(
@ -37,7 +28,7 @@ func (p *SendToDeviceStreamProvider) CompleteSync(
snapshot storage.DatabaseSnapshot, snapshot storage.DatabaseSnapshot,
req *types.SyncRequest, req *types.SyncRequest,
) types.StreamPosition { ) types.StreamPosition {
return p.IncrementalSync(ctx, snapshot, req, 0, p.latestPosition(ctx, snapshot)) return p.IncrementalSync(ctx, snapshot, req, 0, p.LatestPosition(ctx))
} }
func (p *SendToDeviceStreamProvider) IncrementalSync( func (p *SendToDeviceStreamProvider) IncrementalSync(

View file

@ -15,18 +15,12 @@ type TypingStreamProvider struct {
EDUCache *caching.EDUCache EDUCache *caching.EDUCache
} }
func (p *TypingStreamProvider) latestPosition(
ctx context.Context, snapshot storage.DatabaseSnapshot,
) types.StreamPosition {
return types.StreamPosition(p.EDUCache.GetLatestSyncPosition())
}
func (p *TypingStreamProvider) CompleteSync( func (p *TypingStreamProvider) CompleteSync(
ctx context.Context, ctx context.Context,
snapshot storage.DatabaseSnapshot, snapshot storage.DatabaseSnapshot,
req *types.SyncRequest, req *types.SyncRequest,
) types.StreamPosition { ) types.StreamPosition {
return p.IncrementalSync(ctx, snapshot, req, 0, p.latestPosition(ctx, snapshot)) return p.IncrementalSync(ctx, snapshot, req, 0, p.LatestPosition(ctx))
} }
func (p *TypingStreamProvider) IncrementalSync( func (p *TypingStreamProvider) IncrementalSync(

View file

@ -24,8 +24,5 @@ type StreamProvider interface {
IncrementalSync(ctx context.Context, snapshot storage.DatabaseSnapshot, req *types.SyncRequest, from, to types.StreamPosition) types.StreamPosition IncrementalSync(ctx context.Context, snapshot storage.DatabaseSnapshot, req *types.SyncRequest, from, to types.StreamPosition) types.StreamPosition
// LatestPosition returns the latest stream position for this stream. // LatestPosition returns the latest stream position for this stream.
LatestPosition(ctx context.Context, snapshot storage.DatabaseSnapshot) types.StreamPosition LatestPosition(ctx context.Context) types.StreamPosition
// latestPosition gets the latest stream position from the database for this stream.
latestPosition(ctx context.Context, snapshot storage.DatabaseSnapshot) types.StreamPosition
} }

View file

@ -87,16 +87,16 @@ func NewSyncStreamProviders(
return streams return streams
} }
func (s *Streams) Latest(ctx context.Context, snapshot storage.DatabaseSnapshot) types.StreamingToken { func (s *Streams) Latest(ctx context.Context) types.StreamingToken {
return types.StreamingToken{ return types.StreamingToken{
PDUPosition: s.PDUStreamProvider.LatestPosition(ctx, snapshot), PDUPosition: s.PDUStreamProvider.LatestPosition(ctx),
TypingPosition: s.TypingStreamProvider.LatestPosition(ctx, snapshot), TypingPosition: s.TypingStreamProvider.LatestPosition(ctx),
ReceiptPosition: s.ReceiptStreamProvider.LatestPosition(ctx, snapshot), ReceiptPosition: s.ReceiptStreamProvider.LatestPosition(ctx),
InvitePosition: s.InviteStreamProvider.LatestPosition(ctx, snapshot), InvitePosition: s.InviteStreamProvider.LatestPosition(ctx),
SendToDevicePosition: s.SendToDeviceStreamProvider.LatestPosition(ctx, snapshot), SendToDevicePosition: s.SendToDeviceStreamProvider.LatestPosition(ctx),
AccountDataPosition: s.AccountDataStreamProvider.LatestPosition(ctx, snapshot), AccountDataPosition: s.AccountDataStreamProvider.LatestPosition(ctx),
NotificationDataPosition: s.NotificationDataStreamProvider.LatestPosition(ctx, snapshot), NotificationDataPosition: s.NotificationDataStreamProvider.LatestPosition(ctx),
DeviceListPosition: s.DeviceListStreamProvider.LatestPosition(ctx, snapshot), DeviceListPosition: s.DeviceListStreamProvider.LatestPosition(ctx),
PresencePosition: s.PresenceStreamProvider.LatestPosition(ctx, snapshot), PresencePosition: s.PresenceStreamProvider.LatestPosition(ctx),
} }
} }

View file

@ -31,7 +31,7 @@ func (p *DefaultStreamProvider) Advance(
} }
func (p *DefaultStreamProvider) LatestPosition( func (p *DefaultStreamProvider) LatestPosition(
ctx context.Context, snapshot storage.DatabaseSnapshot, ctx context.Context,
) types.StreamPosition { ) types.StreamPosition {
p.latestMutex.RLock() p.latestMutex.RLock()
defer p.latestMutex.RUnlock() defer p.latestMutex.RUnlock()

View file

@ -48,6 +48,13 @@ func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Dat
} }
} }
snapshot, err := syncDB.NewDatabaseSnapshot(req.Context())
if err != nil {
logrus.WithError(err).Error("Failed to acquire database snapshot for sync request")
return nil, err
}
defer snapshot.Rollback() // nolint:errcheck
// Create a default filter and apply a stored filter on top of it (if specified) // Create a default filter and apply a stored filter on top of it (if specified)
filter := gomatrixserverlib.DefaultFilter() filter := gomatrixserverlib.DefaultFilter()
filterQuery := req.URL.Query().Get("filter") filterQuery := req.URL.Query().Get("filter")
@ -64,7 +71,7 @@ func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Dat
util.GetLogger(req.Context()).WithError(err).Error("gomatrixserverlib.SplitID failed") util.GetLogger(req.Context()).WithError(err).Error("gomatrixserverlib.SplitID failed")
return nil, fmt.Errorf("gomatrixserverlib.SplitID: %w", err) return nil, fmt.Errorf("gomatrixserverlib.SplitID: %w", err)
} }
if err := syncDB.GetFilter(req.Context(), &filter, localpart, filterQuery); err != nil && err != sql.ErrNoRows { if err := snapshot.GetFilter(req.Context(), &filter, localpart, filterQuery); err != nil && err != sql.ErrNoRows {
util.GetLogger(req.Context()).WithError(err).Error("syncDB.GetFilter failed") util.GetLogger(req.Context()).WithError(err).Error("syncDB.GetFilter failed")
return nil, fmt.Errorf("syncDB.GetFilter: %w", err) return nil, fmt.Errorf("syncDB.GetFilter: %w", err)
} }

View file

@ -56,16 +56,7 @@ func AddPublicRoutes(
eduCache := caching.NewTypingCache() eduCache := caching.NewTypingCache()
notifier := notifier.NewNotifier() notifier := notifier.NewNotifier()
streams := streams.NewSyncStreamProviders(syncDB, userAPI, rsAPI, keyAPI, eduCache, base.Caches, notifier) streams := streams.NewSyncStreamProviders(syncDB, userAPI, rsAPI, keyAPI, eduCache, base.Caches, notifier)
notifier.SetCurrentPosition(streams.Latest(context.Background()))
snapshot, err := syncDB.NewDatabaseSnapshot(base.ProcessContext.Context())
if err != nil {
logrus.WithError(err).Fatalf("Failed to acquire database snapshot for sync startup")
}
notifier.SetCurrentPosition(streams.Latest(context.Background(), snapshot))
if err = snapshot.Rollback(); err != nil {
logrus.WithError(err).Fatalf("Failed to roll back snapshot for sync startup")
}
if err = notifier.Load(context.Background(), syncDB); err != nil { if err = notifier.Load(context.Background(), syncDB); err != nil {
logrus.WithError(err).Panicf("failed to load notifier ") logrus.WithError(err).Panicf("failed to load notifier ")
} }