Review comments @Kegsay

This commit is contained in:
Neil Alexander 2021-01-08 16:04:02 +00:00
parent e5d7a00d82
commit 0291be9e7f
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
14 changed files with 46 additions and 46 deletions

View file

@ -24,7 +24,6 @@ import (
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/streams"
"github.com/matrix-org/dendrite/syncapi/types"
log "github.com/sirupsen/logrus"
)
@ -33,7 +32,7 @@ import (
type OutputClientDataConsumer struct {
clientAPIConsumer *internal.ContinualConsumer
db storage.Database
streams *streams.Streams
stream types.StreamProvider
notifier *notifier.Notifier
}
@ -43,7 +42,7 @@ func NewOutputClientDataConsumer(
kafkaConsumer sarama.Consumer,
store storage.Database,
notifier *notifier.Notifier,
streams *streams.Streams,
stream types.StreamProvider,
) *OutputClientDataConsumer {
consumer := internal.ContinualConsumer{
@ -56,7 +55,7 @@ func NewOutputClientDataConsumer(
clientAPIConsumer: &consumer,
db: store,
notifier: notifier,
streams: streams,
stream: stream,
}
consumer.ProcessMessage = s.onMessage
@ -96,7 +95,7 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error
}).Panicf("could not save account data")
}
s.streams.AccountDataStreamProvider.Advance(streamPos)
s.stream.Advance(streamPos)
s.notifier.OnNewAccountData(string(msg.Key), types.StreamingToken{AccountDataPosition: streamPos})
return nil

View file

@ -24,7 +24,6 @@ import (
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/streams"
"github.com/matrix-org/dendrite/syncapi/types"
log "github.com/sirupsen/logrus"
)
@ -33,7 +32,7 @@ import (
type OutputReceiptEventConsumer struct {
receiptConsumer *internal.ContinualConsumer
db storage.Database
streams *streams.Streams
stream types.StreamProvider
notifier *notifier.Notifier
}
@ -44,7 +43,7 @@ func NewOutputReceiptEventConsumer(
kafkaConsumer sarama.Consumer,
store storage.Database,
notifier *notifier.Notifier,
streams *streams.Streams,
stream types.StreamProvider,
) *OutputReceiptEventConsumer {
consumer := internal.ContinualConsumer{
@ -58,7 +57,7 @@ func NewOutputReceiptEventConsumer(
receiptConsumer: &consumer,
db: store,
notifier: notifier,
streams: streams,
stream: stream,
}
consumer.ProcessMessage = s.onMessage
@ -91,7 +90,7 @@ func (s *OutputReceiptEventConsumer) onMessage(msg *sarama.ConsumerMessage) erro
return err
}
s.streams.ReceiptStreamProvider.Advance(streamPos)
s.stream.Advance(streamPos)
s.notifier.OnNewReceipt(output.RoomID, types.StreamingToken{ReceiptPosition: streamPos})
return nil

View file

@ -24,7 +24,6 @@ import (
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/streams"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
@ -36,7 +35,7 @@ type OutputSendToDeviceEventConsumer struct {
sendToDeviceConsumer *internal.ContinualConsumer
db storage.Database
serverName gomatrixserverlib.ServerName // our server name
streams *streams.Streams
stream types.StreamProvider
notifier *notifier.Notifier
}
@ -47,7 +46,7 @@ func NewOutputSendToDeviceEventConsumer(
kafkaConsumer sarama.Consumer,
store storage.Database,
notifier *notifier.Notifier,
streams *streams.Streams,
stream types.StreamProvider,
) *OutputSendToDeviceEventConsumer {
consumer := internal.ContinualConsumer{
@ -62,7 +61,7 @@ func NewOutputSendToDeviceEventConsumer(
db: store,
serverName: cfg.Matrix.ServerName,
notifier: notifier,
streams: streams,
stream: stream,
}
consumer.ProcessMessage = s.onMessage
@ -106,7 +105,7 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(msg *sarama.ConsumerMessage)
return err
}
s.streams.SendToDeviceStreamProvider.Advance(streamPos)
s.stream.Advance(streamPos)
s.notifier.OnNewSendToDevice(
output.UserID,
[]string{output.DeviceID},

View file

@ -24,7 +24,6 @@ import (
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/streams"
"github.com/matrix-org/dendrite/syncapi/types"
log "github.com/sirupsen/logrus"
)
@ -33,7 +32,7 @@ import (
type OutputTypingEventConsumer struct {
typingConsumer *internal.ContinualConsumer
eduCache *cache.EDUCache
streams *streams.Streams
stream types.StreamProvider
notifier *notifier.Notifier
}
@ -45,7 +44,7 @@ func NewOutputTypingEventConsumer(
store storage.Database,
eduCache *cache.EDUCache,
notifier *notifier.Notifier,
streams *streams.Streams,
stream types.StreamProvider,
) *OutputTypingEventConsumer {
consumer := internal.ContinualConsumer{
@ -59,7 +58,7 @@ func NewOutputTypingEventConsumer(
typingConsumer: &consumer,
eduCache: eduCache,
notifier: notifier,
streams: streams,
stream: stream,
}
consumer.ProcessMessage = s.onMessage
@ -102,7 +101,7 @@ func (s *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error
)
}
s.streams.TypingStreamProvider.Advance(typingPos)
s.stream.Advance(typingPos)
s.notifier.OnNewTyping(output.Event.RoomID, types.StreamingToken{TypingPosition: typingPos})
return nil

View file

@ -25,7 +25,6 @@ import (
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/streams"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus"
@ -36,7 +35,7 @@ type OutputKeyChangeEventConsumer struct {
keyChangeConsumer *internal.ContinualConsumer
db storage.Database
notifier *notifier.Notifier
streams *streams.Streams
stream types.PartitionedStreamProvider
serverName gomatrixserverlib.ServerName // our server name
rsAPI roomserverAPI.RoomserverInternalAPI
keyAPI api.KeyInternalAPI
@ -54,7 +53,7 @@ func NewOutputKeyChangeEventConsumer(
rsAPI roomserverAPI.RoomserverInternalAPI,
store storage.Database,
notifier *notifier.Notifier,
streams *streams.Streams,
stream types.PartitionedStreamProvider,
) *OutputKeyChangeEventConsumer {
consumer := internal.ContinualConsumer{
@ -73,7 +72,7 @@ func NewOutputKeyChangeEventConsumer(
partitionToOffset: make(map[int32]int64),
partitionToOffsetMu: sync.Mutex{},
notifier: notifier,
streams: streams,
stream: stream,
}
consumer.ProcessMessage = s.onMessage
@ -123,7 +122,7 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er
Partition: msg.Partition,
}
s.streams.DeviceListStreamProvider.Advance(posUpdate)
s.stream.Advance(posUpdate)
for userID := range queryRes.UserIDsToCount {
s.notifier.OnNewKeyChange(types.StreamingToken{DeviceListPosition: posUpdate}, userID, output.UserID)
}

View file

@ -301,7 +301,6 @@ func (s *OutputRoomEventConsumer) onRetireInviteEvent(
}
// Notify any active sync requests that the invite has been retired.
// Invites share the same stream counter as PDUs
s.streams.InviteStreamProvider.Advance(pduPos)
s.notifier.OnNewInvite(types.StreamingToken{InvitePosition: pduPos}, msg.TargetUserID)
@ -324,7 +323,7 @@ func (s *OutputRoomEventConsumer) onNewPeek(
// TODO: This only works because the peeks table is reusing the same
// index as PDUs, but we should fix this
s.streams.PDUStreamProvider.Advance(sp)
s.notifier.OnNewEvent(nil, msg.RoomID, nil, types.StreamingToken{PDUPosition: sp})
s.notifier.OnNewPeek(msg.RoomID, msg.UserID, msg.DeviceID, types.StreamingToken{PDUPosition: sp})
return nil
}
@ -345,7 +344,7 @@ func (s *OutputRoomEventConsumer) onRetirePeek(
// TODO: This only works because the peeks table is reusing the same
// index as PDUs, but we should fix this
s.streams.PDUStreamProvider.Advance(sp)
s.notifier.OnNewEvent(nil, msg.RoomID, nil, types.StreamingToken{PDUPosition: sp})
s.notifier.OnRetirePeek(msg.RoomID, msg.UserID, msg.DeviceID, types.StreamingToken{PDUPosition: sp})
return nil
}

View file

@ -136,10 +136,12 @@ func (n *Notifier) OnNewAccountData(
func (n *Notifier) OnNewPeek(
roomID, userID, deviceID string,
posUpdate types.StreamingToken,
) {
n.streamLock.Lock()
defer n.streamLock.Unlock()
n.currPos.ApplyUpdates(posUpdate)
n.addPeekingDevice(roomID, userID, deviceID)
// we don't wake up devices here given the roomserver consumer will do this shortly afterwards
@ -148,10 +150,12 @@ func (n *Notifier) OnNewPeek(
func (n *Notifier) OnRetirePeek(
roomID, userID, deviceID string,
posUpdate types.StreamingToken,
) {
n.streamLock.Lock()
defer n.streamLock.Unlock()
n.currPos.ApplyUpdates(posUpdate)
n.removePeekingDevice(roomID, userID, deviceID)
// we don't wake up devices here given the roomserver consumer will do this shortly afterwards

View file

@ -29,10 +29,10 @@ import (
type Database interface {
internal.PartitionStorer
MaxStreamTokenForPDUs(ctx context.Context) (types.StreamPosition, error)
MaxStreamTokenForReceipts(ctx context.Context) (types.StreamPosition, error)
MaxStreamTokenForInvites(ctx context.Context) (types.StreamPosition, error)
MaxStreamTokenForAccountData(ctx context.Context) (types.StreamPosition, error)
MaxStreamPositionForPDUs(ctx context.Context) (types.StreamPosition, error)
MaxStreamPositionForReceipts(ctx context.Context) (types.StreamPosition, error)
MaxStreamPositionForInvites(ctx context.Context) (types.StreamPosition, error)
MaxStreamPositionForAccountData(ctx context.Context) (types.StreamPosition, error)
CurrentState(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter) ([]*gomatrixserverlib.HeaderedEvent, error)
GetStateDeltasForFullStateSync(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter) ([]types.StateDelta, []string, error)

View file

@ -61,7 +61,7 @@ func (d *Database) readOnlySnapshot(ctx context.Context) (*sql.Tx, error) {
})
}
func (d *Database) MaxStreamTokenForPDUs(ctx context.Context) (types.StreamPosition, error) {
func (d *Database) MaxStreamPositionForPDUs(ctx context.Context) (types.StreamPosition, error) {
id, err := d.OutputEvents.SelectMaxEventID(ctx, nil)
if err != nil {
return 0, fmt.Errorf("d.OutputEvents.SelectMaxEventID: %w", err)
@ -69,7 +69,7 @@ func (d *Database) MaxStreamTokenForPDUs(ctx context.Context) (types.StreamPosit
return types.StreamPosition(id), nil
}
func (d *Database) MaxStreamTokenForReceipts(ctx context.Context) (types.StreamPosition, error) {
func (d *Database) MaxStreamPositionForReceipts(ctx context.Context) (types.StreamPosition, error) {
id, err := d.Receipts.SelectMaxReceiptID(ctx, nil)
if err != nil {
return 0, fmt.Errorf("d.Receipts.SelectMaxReceiptID: %w", err)
@ -77,7 +77,7 @@ func (d *Database) MaxStreamTokenForReceipts(ctx context.Context) (types.StreamP
return types.StreamPosition(id), nil
}
func (d *Database) MaxStreamTokenForInvites(ctx context.Context) (types.StreamPosition, error) {
func (d *Database) MaxStreamPositionForInvites(ctx context.Context) (types.StreamPosition, error) {
id, err := d.Invites.SelectMaxInviteID(ctx, nil)
if err != nil {
return 0, fmt.Errorf("d.Invites.SelectMaxInviteID: %w", err)
@ -85,7 +85,7 @@ func (d *Database) MaxStreamTokenForInvites(ctx context.Context) (types.StreamPo
return types.StreamPosition(id), nil
}
func (d *Database) MaxStreamTokenForAccountData(ctx context.Context) (types.StreamPosition, error) {
func (d *Database) MaxStreamPositionForAccountData(ctx context.Context) (types.StreamPosition, error) {
id, err := d.AccountData.SelectMaxAccountDataID(ctx, nil)
if err != nil {
return 0, fmt.Errorf("d.Invites.SelectMaxAccountDataID: %w", err)

View file

@ -19,7 +19,7 @@ func (p *AccountDataStreamProvider) Setup() {
p.latestMutex.Lock()
defer p.latestMutex.Unlock()
id, err := p.DB.MaxStreamTokenForAccountData(context.Background())
id, err := p.DB.MaxStreamPositionForAccountData(context.Background())
if err != nil {
panic(err)
}

View file

@ -16,7 +16,7 @@ func (p *InviteStreamProvider) Setup() {
p.latestMutex.Lock()
defer p.latestMutex.Unlock()
id, err := p.DB.MaxStreamTokenForInvites(context.Background())
id, err := p.DB.MaxStreamPositionForInvites(context.Background())
if err != nil {
panic(err)
}

View file

@ -18,7 +18,7 @@ func (p *PDUStreamProvider) Setup() {
p.latestMutex.Lock()
defer p.latestMutex.Unlock()
id, err := p.DB.MaxStreamTokenForPDUs(context.Background())
id, err := p.DB.MaxStreamPositionForPDUs(context.Background())
if err != nil {
panic(err)
}
@ -33,6 +33,8 @@ func (p *PDUStreamProvider) CompleteSync(
to := p.LatestPosition(ctx)
// Get the current sync position which we will base the sync response on.
// For complete syncs, we want to start at the most recent events and work
// backwards, so that we show the most recent events in the room.
r := types.Range{
From: to,
To: 0,

View file

@ -16,7 +16,7 @@ type ReceiptStreamProvider struct {
func (p *ReceiptStreamProvider) Setup() {
p.StreamProvider.Setup()
id, err := p.DB.MaxStreamTokenForReceipts(context.Background())
id, err := p.DB.MaxStreamPositionForReceipts(context.Background())
if err != nil {
panic(err)
}

View file

@ -57,14 +57,14 @@ func AddPublicRoutes(
streams := streams.NewSyncStreamProviders(syncDB, userAPI, rsAPI, keyAPI, eduCache)
notifier := notifier.NewNotifier(streams.Latest(context.Background()))
if err = notifier.Load(context.Background(), syncDB); err != nil {
logrus.WithError(err).Panicf("failed to load notifier")
logrus.WithError(err).Panicf("failed to load notifier ")
}
requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier)
keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer(
cfg.Matrix.ServerName, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputKeyChangeEvent)),
consumer, keyAPI, rsAPI, syncDB, notifier, streams,
consumer, keyAPI, rsAPI, syncDB, notifier, streams.DeviceListStreamProvider,
)
if err = keyChangeConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start key change consumer")
@ -78,28 +78,28 @@ func AddPublicRoutes(
}
clientConsumer := consumers.NewOutputClientDataConsumer(
cfg, consumer, syncDB, notifier, streams,
cfg, consumer, syncDB, notifier, streams.AccountDataStreamProvider,
)
if err = clientConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start client data consumer")
}
typingConsumer := consumers.NewOutputTypingEventConsumer(
cfg, consumer, syncDB, eduCache, notifier, streams,
cfg, consumer, syncDB, eduCache, notifier, streams.TypingStreamProvider,
)
if err = typingConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start typing consumer")
}
sendToDeviceConsumer := consumers.NewOutputSendToDeviceEventConsumer(
cfg, consumer, syncDB, notifier, streams,
cfg, consumer, syncDB, notifier, streams.SendToDeviceStreamProvider,
)
if err = sendToDeviceConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start send-to-device consumer")
}
receiptConsumer := consumers.NewOutputReceiptEventConsumer(
cfg, consumer, syncDB, notifier, streams,
cfg, consumer, syncDB, notifier, streams.ReceiptStreamProvider,
)
if err = receiptConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start receipts consumer")