mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-26 08:13:09 -06:00
Delete notifier, other tweaks
This commit is contained in:
parent
721a40edb0
commit
954b36a74c
|
|
@ -23,8 +23,6 @@ import (
|
||||||
"github.com/matrix-org/dendrite/internal/eventutil"
|
"github.com/matrix-org/dendrite/internal/eventutil"
|
||||||
"github.com/matrix-org/dendrite/setup/config"
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||||
"github.com/matrix-org/dendrite/syncapi/sync"
|
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -32,14 +30,12 @@ import (
|
||||||
type OutputClientDataConsumer struct {
|
type OutputClientDataConsumer struct {
|
||||||
clientAPIConsumer *internal.ContinualConsumer
|
clientAPIConsumer *internal.ContinualConsumer
|
||||||
db storage.Database
|
db storage.Database
|
||||||
notifier *sync.Notifier
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewOutputClientDataConsumer creates a new OutputClientData consumer. Call Start() to begin consuming from room servers.
|
// NewOutputClientDataConsumer creates a new OutputClientData consumer. Call Start() to begin consuming from room servers.
|
||||||
func NewOutputClientDataConsumer(
|
func NewOutputClientDataConsumer(
|
||||||
cfg *config.SyncAPI,
|
cfg *config.SyncAPI,
|
||||||
kafkaConsumer sarama.Consumer,
|
kafkaConsumer sarama.Consumer,
|
||||||
n *sync.Notifier,
|
|
||||||
store storage.Database,
|
store storage.Database,
|
||||||
) *OutputClientDataConsumer {
|
) *OutputClientDataConsumer {
|
||||||
|
|
||||||
|
|
@ -52,7 +48,6 @@ func NewOutputClientDataConsumer(
|
||||||
s := &OutputClientDataConsumer{
|
s := &OutputClientDataConsumer{
|
||||||
clientAPIConsumer: &consumer,
|
clientAPIConsumer: &consumer,
|
||||||
db: store,
|
db: store,
|
||||||
notifier: n,
|
|
||||||
}
|
}
|
||||||
consumer.ProcessMessage = s.onMessage
|
consumer.ProcessMessage = s.onMessage
|
||||||
|
|
||||||
|
|
@ -92,7 +87,9 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error
|
||||||
}).Panicf("could not save account data")
|
}).Panicf("could not save account data")
|
||||||
}
|
}
|
||||||
|
|
||||||
s.notifier.OnNewEvent(nil, "", []string{string(msg.Key)}, types.StreamingToken{PDUPosition: pduPos})
|
_ = pduPos
|
||||||
|
|
||||||
|
//s.notifier.OnNewEvent(nil, "", []string{string(msg.Key)}, types.StreamingToken{PDUPosition: pduPos})
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -23,7 +23,6 @@ import (
|
||||||
"github.com/matrix-org/dendrite/internal"
|
"github.com/matrix-org/dendrite/internal"
|
||||||
"github.com/matrix-org/dendrite/setup/config"
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||||
"github.com/matrix-org/dendrite/syncapi/sync"
|
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -31,7 +30,6 @@ import (
|
||||||
type OutputReceiptEventConsumer struct {
|
type OutputReceiptEventConsumer struct {
|
||||||
receiptConsumer *internal.ContinualConsumer
|
receiptConsumer *internal.ContinualConsumer
|
||||||
db storage.Database
|
db storage.Database
|
||||||
notifier *sync.Notifier
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewOutputReceiptEventConsumer creates a new OutputReceiptEventConsumer.
|
// NewOutputReceiptEventConsumer creates a new OutputReceiptEventConsumer.
|
||||||
|
|
@ -39,7 +37,6 @@ type OutputReceiptEventConsumer struct {
|
||||||
func NewOutputReceiptEventConsumer(
|
func NewOutputReceiptEventConsumer(
|
||||||
cfg *config.SyncAPI,
|
cfg *config.SyncAPI,
|
||||||
kafkaConsumer sarama.Consumer,
|
kafkaConsumer sarama.Consumer,
|
||||||
n *sync.Notifier,
|
|
||||||
store storage.Database,
|
store storage.Database,
|
||||||
) *OutputReceiptEventConsumer {
|
) *OutputReceiptEventConsumer {
|
||||||
|
|
||||||
|
|
@ -53,7 +50,6 @@ func NewOutputReceiptEventConsumer(
|
||||||
s := &OutputReceiptEventConsumer{
|
s := &OutputReceiptEventConsumer{
|
||||||
receiptConsumer: &consumer,
|
receiptConsumer: &consumer,
|
||||||
db: store,
|
db: store,
|
||||||
notifier: n,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
consumer.ProcessMessage = s.onMessage
|
consumer.ProcessMessage = s.onMessage
|
||||||
|
|
|
||||||
|
|
@ -23,8 +23,6 @@ import (
|
||||||
"github.com/matrix-org/dendrite/internal"
|
"github.com/matrix-org/dendrite/internal"
|
||||||
"github.com/matrix-org/dendrite/setup/config"
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||||
"github.com/matrix-org/dendrite/syncapi/sync"
|
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
@ -35,7 +33,6 @@ type OutputSendToDeviceEventConsumer struct {
|
||||||
sendToDeviceConsumer *internal.ContinualConsumer
|
sendToDeviceConsumer *internal.ContinualConsumer
|
||||||
db storage.Database
|
db storage.Database
|
||||||
serverName gomatrixserverlib.ServerName // our server name
|
serverName gomatrixserverlib.ServerName // our server name
|
||||||
notifier *sync.Notifier
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewOutputSendToDeviceEventConsumer creates a new OutputSendToDeviceEventConsumer.
|
// NewOutputSendToDeviceEventConsumer creates a new OutputSendToDeviceEventConsumer.
|
||||||
|
|
@ -43,7 +40,6 @@ type OutputSendToDeviceEventConsumer struct {
|
||||||
func NewOutputSendToDeviceEventConsumer(
|
func NewOutputSendToDeviceEventConsumer(
|
||||||
cfg *config.SyncAPI,
|
cfg *config.SyncAPI,
|
||||||
kafkaConsumer sarama.Consumer,
|
kafkaConsumer sarama.Consumer,
|
||||||
n *sync.Notifier,
|
|
||||||
store storage.Database,
|
store storage.Database,
|
||||||
) *OutputSendToDeviceEventConsumer {
|
) *OutputSendToDeviceEventConsumer {
|
||||||
|
|
||||||
|
|
@ -58,7 +54,6 @@ func NewOutputSendToDeviceEventConsumer(
|
||||||
sendToDeviceConsumer: &consumer,
|
sendToDeviceConsumer: &consumer,
|
||||||
db: store,
|
db: store,
|
||||||
serverName: cfg.Matrix.ServerName,
|
serverName: cfg.Matrix.ServerName,
|
||||||
notifier: n,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
consumer.ProcessMessage = s.onMessage
|
consumer.ProcessMessage = s.onMessage
|
||||||
|
|
@ -102,11 +97,13 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(msg *sarama.ConsumerMessage)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
s.notifier.OnNewSendToDevice(
|
//s.notifier.OnNewSendToDevice(
|
||||||
output.UserID,
|
// output.UserID,
|
||||||
[]string{output.DeviceID},
|
// []string{output.DeviceID},
|
||||||
types.StreamingToken{SendToDevicePosition: streamPos},
|
// types.StreamingToken{SendToDevicePosition: streamPos},
|
||||||
)
|
//)
|
||||||
|
|
||||||
|
_ = streamPos
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -22,7 +22,6 @@ import (
|
||||||
"github.com/matrix-org/dendrite/internal"
|
"github.com/matrix-org/dendrite/internal"
|
||||||
"github.com/matrix-org/dendrite/setup/config"
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||||
"github.com/matrix-org/dendrite/syncapi/sync"
|
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
@ -31,7 +30,6 @@ import (
|
||||||
type OutputTypingEventConsumer struct {
|
type OutputTypingEventConsumer struct {
|
||||||
typingConsumer *internal.ContinualConsumer
|
typingConsumer *internal.ContinualConsumer
|
||||||
db storage.Database
|
db storage.Database
|
||||||
notifier *sync.Notifier
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewOutputTypingEventConsumer creates a new OutputTypingEventConsumer.
|
// NewOutputTypingEventConsumer creates a new OutputTypingEventConsumer.
|
||||||
|
|
@ -39,7 +37,6 @@ type OutputTypingEventConsumer struct {
|
||||||
func NewOutputTypingEventConsumer(
|
func NewOutputTypingEventConsumer(
|
||||||
cfg *config.SyncAPI,
|
cfg *config.SyncAPI,
|
||||||
kafkaConsumer sarama.Consumer,
|
kafkaConsumer sarama.Consumer,
|
||||||
n *sync.Notifier,
|
|
||||||
store storage.Database,
|
store storage.Database,
|
||||||
) *OutputTypingEventConsumer {
|
) *OutputTypingEventConsumer {
|
||||||
|
|
||||||
|
|
@ -53,7 +50,6 @@ func NewOutputTypingEventConsumer(
|
||||||
s := &OutputTypingEventConsumer{
|
s := &OutputTypingEventConsumer{
|
||||||
typingConsumer: &consumer,
|
typingConsumer: &consumer,
|
||||||
db: store,
|
db: store,
|
||||||
notifier: n,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
consumer.ProcessMessage = s.onMessage
|
consumer.ProcessMessage = s.onMessage
|
||||||
|
|
@ -63,9 +59,11 @@ func NewOutputTypingEventConsumer(
|
||||||
|
|
||||||
// Start consuming from EDU api
|
// Start consuming from EDU api
|
||||||
func (s *OutputTypingEventConsumer) Start() error {
|
func (s *OutputTypingEventConsumer) Start() error {
|
||||||
|
/*
|
||||||
s.db.SetTypingTimeoutCallback(func(userID, roomID string, latestSyncPosition int64) {
|
s.db.SetTypingTimeoutCallback(func(userID, roomID string, latestSyncPosition int64) {
|
||||||
s.notifier.OnNewTyping(roomID, types.StreamingToken{TypingPosition: types.StreamPosition(latestSyncPosition)})
|
s.notifier.OnNewTyping(roomID, types.StreamingToken{TypingPosition: types.StreamPosition(latestSyncPosition)})
|
||||||
})
|
})
|
||||||
|
*/
|
||||||
|
|
||||||
return s.typingConsumer.Start()
|
return s.typingConsumer.Start()
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -24,7 +24,6 @@ import (
|
||||||
"github.com/matrix-org/dendrite/keyserver/api"
|
"github.com/matrix-org/dendrite/keyserver/api"
|
||||||
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
|
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||||
syncapi "github.com/matrix-org/dendrite/syncapi/sync"
|
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
@ -39,7 +38,6 @@ type OutputKeyChangeEventConsumer struct {
|
||||||
keyAPI api.KeyInternalAPI
|
keyAPI api.KeyInternalAPI
|
||||||
partitionToOffset map[int32]int64
|
partitionToOffset map[int32]int64
|
||||||
partitionToOffsetMu sync.Mutex
|
partitionToOffsetMu sync.Mutex
|
||||||
notifier *syncapi.Notifier
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewOutputKeyChangeEventConsumer creates a new OutputKeyChangeEventConsumer.
|
// NewOutputKeyChangeEventConsumer creates a new OutputKeyChangeEventConsumer.
|
||||||
|
|
@ -48,7 +46,6 @@ func NewOutputKeyChangeEventConsumer(
|
||||||
serverName gomatrixserverlib.ServerName,
|
serverName gomatrixserverlib.ServerName,
|
||||||
topic string,
|
topic string,
|
||||||
kafkaConsumer sarama.Consumer,
|
kafkaConsumer sarama.Consumer,
|
||||||
n *syncapi.Notifier,
|
|
||||||
keyAPI api.KeyInternalAPI,
|
keyAPI api.KeyInternalAPI,
|
||||||
rsAPI roomserverAPI.RoomserverInternalAPI,
|
rsAPI roomserverAPI.RoomserverInternalAPI,
|
||||||
store storage.Database,
|
store storage.Database,
|
||||||
|
|
@ -69,7 +66,6 @@ func NewOutputKeyChangeEventConsumer(
|
||||||
rsAPI: rsAPI,
|
rsAPI: rsAPI,
|
||||||
partitionToOffset: make(map[int32]int64),
|
partitionToOffset: make(map[int32]int64),
|
||||||
partitionToOffsetMu: sync.Mutex{},
|
partitionToOffsetMu: sync.Mutex{},
|
||||||
notifier: n,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
consumer.ProcessMessage = s.onMessage
|
consumer.ProcessMessage = s.onMessage
|
||||||
|
|
@ -120,8 +116,11 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er
|
||||||
Partition: msg.Partition,
|
Partition: msg.Partition,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
for userID := range queryRes.UserIDsToCount {
|
|
||||||
s.notifier.OnNewKeyChange(posUpdate, userID, output.UserID)
|
_ = posUpdate
|
||||||
}
|
|
||||||
|
//for userID := range queryRes.UserIDsToCount {
|
||||||
|
// s.notifier.OnNewKeyChange(posUpdate, userID, output.UserID)
|
||||||
|
//}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -24,7 +24,6 @@ import (
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/dendrite/setup/config"
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||||
"github.com/matrix-org/dendrite/syncapi/sync"
|
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
@ -36,14 +35,12 @@ type OutputRoomEventConsumer struct {
|
||||||
rsAPI api.RoomserverInternalAPI
|
rsAPI api.RoomserverInternalAPI
|
||||||
rsConsumer *internal.ContinualConsumer
|
rsConsumer *internal.ContinualConsumer
|
||||||
db storage.Database
|
db storage.Database
|
||||||
notifier *sync.Notifier
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
|
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
|
||||||
func NewOutputRoomEventConsumer(
|
func NewOutputRoomEventConsumer(
|
||||||
cfg *config.SyncAPI,
|
cfg *config.SyncAPI,
|
||||||
kafkaConsumer sarama.Consumer,
|
kafkaConsumer sarama.Consumer,
|
||||||
n *sync.Notifier,
|
|
||||||
store storage.Database,
|
store storage.Database,
|
||||||
rsAPI api.RoomserverInternalAPI,
|
rsAPI api.RoomserverInternalAPI,
|
||||||
) *OutputRoomEventConsumer {
|
) *OutputRoomEventConsumer {
|
||||||
|
|
@ -58,7 +55,6 @@ func NewOutputRoomEventConsumer(
|
||||||
cfg: cfg,
|
cfg: cfg,
|
||||||
rsConsumer: &consumer,
|
rsConsumer: &consumer,
|
||||||
db: store,
|
db: store,
|
||||||
notifier: n,
|
|
||||||
rsAPI: rsAPI,
|
rsAPI: rsAPI,
|
||||||
}
|
}
|
||||||
consumer.ProcessMessage = s.onMessage
|
consumer.ProcessMessage = s.onMessage
|
||||||
|
|
@ -274,7 +270,9 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent(
|
||||||
}).Panicf("roomserver output log: write invite failure")
|
}).Panicf("roomserver output log: write invite failure")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
s.notifier.OnNewInvite(types.StreamingToken{InvitePosition: pduPos}, *msg.Event.StateKey())
|
|
||||||
|
_ = pduPos
|
||||||
|
//s.notifier.OnNewInvite(types.StreamingToken{InvitePosition: pduPos}, *msg.Event.StateKey())
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -292,7 +290,9 @@ func (s *OutputRoomEventConsumer) onRetireInviteEvent(
|
||||||
}
|
}
|
||||||
// Notify any active sync requests that the invite has been retired.
|
// Notify any active sync requests that the invite has been retired.
|
||||||
// Invites share the same stream counter as PDUs
|
// Invites share the same stream counter as PDUs
|
||||||
s.notifier.OnNewInvite(types.StreamingToken{InvitePosition: pduPos}, msg.TargetUserID)
|
|
||||||
|
_ = pduPos
|
||||||
|
//s.notifier.OnNewInvite(types.StreamingToken{InvitePosition: pduPos}, msg.TargetUserID)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -308,11 +308,11 @@ func (s *OutputRoomEventConsumer) onNewPeek(
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
// tell the notifier about the new peek so it knows to wake up new devices
|
// tell the notifier about the new peek so it knows to wake up new devices
|
||||||
s.notifier.OnNewPeek(msg.RoomID, msg.UserID, msg.DeviceID)
|
//s.notifier.OnNewPeek(msg.RoomID, msg.UserID, msg.DeviceID)
|
||||||
|
_ = sp
|
||||||
// we need to wake up the users who might need to now be peeking into this room,
|
// we need to wake up the users who might need to now be peeking into this room,
|
||||||
// so we send in a dummy event to trigger a wakeup
|
// so we send in a dummy event to trigger a wakeup
|
||||||
s.notifier.OnNewEvent(nil, msg.RoomID, nil, types.StreamingToken{PDUPosition: sp})
|
//s.notifier.OnNewEvent(nil, msg.RoomID, nil, types.StreamingToken{PDUPosition: sp})
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -328,11 +328,11 @@ func (s *OutputRoomEventConsumer) onRetirePeek(
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
// tell the notifier about the new peek so it knows to wake up new devices
|
// tell the notifier about the new peek so it knows to wake up new devices
|
||||||
s.notifier.OnRetirePeek(msg.RoomID, msg.UserID, msg.DeviceID)
|
//s.notifier.OnRetirePeek(msg.RoomID, msg.UserID, msg.DeviceID)
|
||||||
|
_ = sp
|
||||||
// we need to wake up the users who might need to now be peeking into this room,
|
// we need to wake up the users who might need to now be peeking into this room,
|
||||||
// so we send in a dummy event to trigger a wakeup
|
// so we send in a dummy event to trigger a wakeup
|
||||||
s.notifier.OnNewEvent(nil, msg.RoomID, nil, types.StreamingToken{PDUPosition: sp})
|
//s.notifier.OnNewEvent(nil, msg.RoomID, nil, types.StreamingToken{PDUPosition: sp})
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -96,7 +96,7 @@ func (r *receiptStatements) UpsertReceipt(ctx context.Context, txn *sql.Tx, room
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *receiptStatements) SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []api.OutputReceiptEvent, error) {
|
func (r *receiptStatements) SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []api.OutputReceiptEvent, error) {
|
||||||
lastPos := types.StreamPosition(0)
|
lastPos := streamPos
|
||||||
rows, err := r.selectRoomReceipts.QueryContext(ctx, pq.Array(roomIDs), streamPos)
|
rows, err := r.selectRoomReceipts.QueryContext(ctx, pq.Array(roomIDs), streamPos)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, nil, fmt.Errorf("unable to query room receipts: %w", err)
|
return 0, nil, fmt.Errorf("unable to query room receipts: %w", err)
|
||||||
|
|
|
||||||
|
|
@ -41,6 +41,7 @@ func (p *PDUStreamProvider) StreamAdvance(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// nolint:gocyclo
|
||||||
func (p *PDUStreamProvider) StreamRange(
|
func (p *PDUStreamProvider) StreamRange(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
req *types.StreamRangeRequest,
|
req *types.StreamRangeRequest,
|
||||||
|
|
@ -63,12 +64,12 @@ func (p *PDUStreamProvider) StreamRange(
|
||||||
// TODO: use filter provided in request
|
// TODO: use filter provided in request
|
||||||
stateFilter := gomatrixserverlib.DefaultStateFilter()
|
stateFilter := gomatrixserverlib.DefaultStateFilter()
|
||||||
|
|
||||||
if from.IsEmpty() {
|
if req.WantFullState {
|
||||||
if stateDeltas, joinedRooms, err = p.DB.getStateDeltas(ctx, req.Device, nil, r, req.Device.UserID, &stateFilter); err != nil {
|
if stateDeltas, joinedRooms, err = p.DB.getStateDeltasForFullStateSync(ctx, req.Device, nil, r, req.Device.UserID, &stateFilter); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if stateDeltas, joinedRooms, err = p.DB.getStateDeltasForFullStateSync(ctx, req.Device, nil, r, req.Device.UserID, &stateFilter); err != nil {
|
if stateDeltas, joinedRooms, err = p.DB.getStateDeltas(ctx, req.Device, nil, r, req.Device.UserID, &stateFilter); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -93,7 +94,7 @@ func (p *PDUStreamProvider) StreamRange(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, event := range events {
|
for _, event := range p.DB.StreamEventsToEvents(req.Device, events) {
|
||||||
room.Timeline.Events = append(
|
room.Timeline.Events = append(
|
||||||
room.Timeline.Events,
|
room.Timeline.Events,
|
||||||
gomatrixserverlib.ToClientEvent(
|
gomatrixserverlib.ToClientEvent(
|
||||||
|
|
@ -101,7 +102,9 @@ func (p *PDUStreamProvider) StreamRange(
|
||||||
gomatrixserverlib.FormatSync,
|
gomatrixserverlib.FormatSync,
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, event := range events {
|
||||||
if event.StreamPosition > newPos.PDUPosition {
|
if event.StreamPosition > newPos.PDUPosition {
|
||||||
newPos.PDUPosition = event.StreamPosition
|
newPos.PDUPosition = event.StreamPosition
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -53,9 +53,13 @@ func (p *ReceiptStreamProvider) StreamRange(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
lastPos, receipts, err := p.DB.Receipts.SelectRoomReceiptsAfter(context.TODO(), joinedRooms, from.ReceiptPosition)
|
lastPos, receipts, err := p.DB.Receipts.SelectRoomReceiptsAfter(ctx, joinedRooms, from.ReceiptPosition)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return types.StreamingToken{} //fmt.Errorf("unable to select receipts for rooms: %w", err)
|
return to //fmt.Errorf("unable to select receipts for rooms: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(receipts) == 0 || lastPos == 0 {
|
||||||
|
return to
|
||||||
}
|
}
|
||||||
|
|
||||||
// Group receipts by room, so we can create one ClientEvent for every room
|
// Group receipts by room, so we can create one ClientEvent for every room
|
||||||
|
|
@ -85,22 +89,16 @@ func (p *ReceiptStreamProvider) StreamRange(
|
||||||
}
|
}
|
||||||
ev.Content, err = json.Marshal(content)
|
ev.Content, err = json.Marshal(content)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return types.StreamingToken{} // err
|
return to // err
|
||||||
}
|
}
|
||||||
|
|
||||||
jr.Ephemeral.Events = append(jr.Ephemeral.Events, ev)
|
jr.Ephemeral.Events = append(jr.Ephemeral.Events, ev)
|
||||||
req.Response.Rooms.Join[roomID] = jr
|
req.Response.Rooms.Join[roomID] = jr
|
||||||
}
|
}
|
||||||
|
|
||||||
if lastPos > 0 {
|
|
||||||
return types.StreamingToken{
|
return types.StreamingToken{
|
||||||
ReceiptPosition: lastPos,
|
ReceiptPosition: lastPos,
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
return types.StreamingToken{
|
|
||||||
ReceiptPosition: to.ReceiptPosition,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *ReceiptStreamProvider) StreamNotifyAfter(
|
func (p *ReceiptStreamProvider) StreamNotifyAfter(
|
||||||
|
|
|
||||||
|
|
@ -54,7 +54,9 @@ func (p *TypingStreamProvider) StreamRange(
|
||||||
"user_ids": users,
|
"user_ids": users,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return types.StreamingToken{}
|
return types.StreamingToken{
|
||||||
|
TypingPosition: from.TypingPosition,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
jr.Ephemeral.Events = append(jr.Ephemeral.Events, ev)
|
jr.Ephemeral.Events = append(jr.Ephemeral.Events, ev)
|
||||||
|
|
|
||||||
|
|
@ -101,7 +101,7 @@ func (r *receiptStatements) UpsertReceipt(ctx context.Context, txn *sql.Tx, room
|
||||||
// SelectRoomReceiptsAfter select all receipts for a given room after a specific timestamp
|
// SelectRoomReceiptsAfter select all receipts for a given room after a specific timestamp
|
||||||
func (r *receiptStatements) SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []api.OutputReceiptEvent, error) {
|
func (r *receiptStatements) SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []api.OutputReceiptEvent, error) {
|
||||||
selectSQL := strings.Replace(selectRoomReceipts, "($2)", sqlutil.QueryVariadicOffset(len(roomIDs), 1), 1)
|
selectSQL := strings.Replace(selectRoomReceipts, "($2)", sqlutil.QueryVariadicOffset(len(roomIDs), 1), 1)
|
||||||
lastPos := types.StreamPosition(0)
|
lastPos := streamPos
|
||||||
params := make([]interface{}, len(roomIDs)+1)
|
params := make([]interface{}, len(roomIDs)+1)
|
||||||
params[0] = streamPos
|
params[0] = streamPos
|
||||||
for k, v := range roomIDs {
|
for k, v := range roomIDs {
|
||||||
|
|
|
||||||
|
|
@ -1,467 +0,0 @@
|
||||||
// Copyright 2017 Vector Creations Ltd
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package sync
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
|
||||||
log "github.com/sirupsen/logrus"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Notifier will wake up sleeping requests when there is some new data.
|
|
||||||
// It does not tell requests what that data is, only the sync position which
|
|
||||||
// they can use to get at it. This is done to prevent races whereby we tell the caller
|
|
||||||
// the event, but the token has already advanced by the time they fetch it, resulting
|
|
||||||
// in missed events.
|
|
||||||
type Notifier struct {
|
|
||||||
// A map of RoomID => Set<UserID> : Must only be accessed by the OnNewEvent goroutine
|
|
||||||
roomIDToJoinedUsers map[string]userIDSet
|
|
||||||
// A map of RoomID => Set<UserID> : Must only be accessed by the OnNewEvent goroutine
|
|
||||||
roomIDToPeekingDevices map[string]peekingDeviceSet
|
|
||||||
// Protects currPos and userStreams.
|
|
||||||
streamLock *sync.Mutex
|
|
||||||
// The latest sync position
|
|
||||||
currPos types.StreamingToken
|
|
||||||
// A map of user_id => device_id => UserStream which can be used to wake a given user's /sync request.
|
|
||||||
userDeviceStreams map[string]map[string]*UserDeviceStream
|
|
||||||
// The last time we cleaned out stale entries from the userStreams map
|
|
||||||
lastCleanUpTime time.Time
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewNotifier creates a new notifier set to the given sync position.
|
|
||||||
// In order for this to be of any use, the Notifier needs to be told all rooms and
|
|
||||||
// the joined users within each of them by calling Notifier.Load(*storage.SyncServerDatabase).
|
|
||||||
func NewNotifier(pos types.StreamingToken) *Notifier {
|
|
||||||
return &Notifier{
|
|
||||||
currPos: pos,
|
|
||||||
roomIDToJoinedUsers: make(map[string]userIDSet),
|
|
||||||
roomIDToPeekingDevices: make(map[string]peekingDeviceSet),
|
|
||||||
userDeviceStreams: make(map[string]map[string]*UserDeviceStream),
|
|
||||||
streamLock: &sync.Mutex{},
|
|
||||||
lastCleanUpTime: time.Now(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// OnNewEvent is called when a new event is received from the room server. Must only be
|
|
||||||
// called from a single goroutine, to avoid races between updates which could set the
|
|
||||||
// current sync position incorrectly.
|
|
||||||
// Chooses which user sync streams to update by a provided *gomatrixserverlib.Event
|
|
||||||
// (based on the users in the event's room),
|
|
||||||
// a roomID directly, or a list of user IDs, prioritised by parameter ordering.
|
|
||||||
// posUpdate contains the latest position(s) for one or more types of events.
|
|
||||||
// If a position in posUpdate is 0, it means no updates are available of that type.
|
|
||||||
// Typically a consumer supplies a posUpdate with the latest sync position for the
|
|
||||||
// event type it handles, leaving other fields as 0.
|
|
||||||
func (n *Notifier) OnNewEvent(
|
|
||||||
ev *gomatrixserverlib.HeaderedEvent, roomID string, userIDs []string,
|
|
||||||
posUpdate types.StreamingToken,
|
|
||||||
) {
|
|
||||||
// update the current position then notify relevant /sync streams.
|
|
||||||
// This needs to be done PRIOR to waking up users as they will read this value.
|
|
||||||
n.streamLock.Lock()
|
|
||||||
defer n.streamLock.Unlock()
|
|
||||||
|
|
||||||
n.currPos.ApplyUpdates(posUpdate)
|
|
||||||
n.removeEmptyUserStreams()
|
|
||||||
|
|
||||||
if ev != nil {
|
|
||||||
// Map this event's room_id to a list of joined users, and wake them up.
|
|
||||||
usersToNotify := n.joinedUsers(ev.RoomID())
|
|
||||||
// Map this event's room_id to a list of peeking devices, and wake them up.
|
|
||||||
peekingDevicesToNotify := n.PeekingDevices(ev.RoomID())
|
|
||||||
// If this is an invite, also add in the invitee to this list.
|
|
||||||
if ev.Type() == "m.room.member" && ev.StateKey() != nil {
|
|
||||||
targetUserID := *ev.StateKey()
|
|
||||||
membership, err := ev.Membership()
|
|
||||||
if err != nil {
|
|
||||||
log.WithError(err).WithField("event_id", ev.EventID()).Errorf(
|
|
||||||
"Notifier.OnNewEvent: Failed to unmarshal member event",
|
|
||||||
)
|
|
||||||
} else {
|
|
||||||
// Keep the joined user map up-to-date
|
|
||||||
switch membership {
|
|
||||||
case gomatrixserverlib.Invite:
|
|
||||||
usersToNotify = append(usersToNotify, targetUserID)
|
|
||||||
case gomatrixserverlib.Join:
|
|
||||||
// Manually append the new user's ID so they get notified
|
|
||||||
// along all members in the room
|
|
||||||
usersToNotify = append(usersToNotify, targetUserID)
|
|
||||||
n.addJoinedUser(ev.RoomID(), targetUserID)
|
|
||||||
case gomatrixserverlib.Leave:
|
|
||||||
fallthrough
|
|
||||||
case gomatrixserverlib.Ban:
|
|
||||||
n.removeJoinedUser(ev.RoomID(), targetUserID)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
n.wakeupUsers(usersToNotify, peekingDevicesToNotify, n.currPos)
|
|
||||||
} else if roomID != "" {
|
|
||||||
n.wakeupUsers(n.joinedUsers(roomID), n.PeekingDevices(roomID), n.currPos)
|
|
||||||
} else if len(userIDs) > 0 {
|
|
||||||
n.wakeupUsers(userIDs, nil, n.currPos)
|
|
||||||
} else {
|
|
||||||
log.WithFields(log.Fields{
|
|
||||||
"posUpdate": posUpdate.String,
|
|
||||||
}).Warn("Notifier.OnNewEvent called but caller supplied no user to wake up")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *Notifier) OnNewPeek(
|
|
||||||
roomID, userID, deviceID string,
|
|
||||||
) {
|
|
||||||
n.streamLock.Lock()
|
|
||||||
defer n.streamLock.Unlock()
|
|
||||||
|
|
||||||
n.addPeekingDevice(roomID, userID, deviceID)
|
|
||||||
|
|
||||||
// we don't wake up devices here given the roomserver consumer will do this shortly afterwards
|
|
||||||
// by calling OnNewEvent.
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *Notifier) OnRetirePeek(
|
|
||||||
roomID, userID, deviceID string,
|
|
||||||
) {
|
|
||||||
n.streamLock.Lock()
|
|
||||||
defer n.streamLock.Unlock()
|
|
||||||
|
|
||||||
n.removePeekingDevice(roomID, userID, deviceID)
|
|
||||||
|
|
||||||
// we don't wake up devices here given the roomserver consumer will do this shortly afterwards
|
|
||||||
// by calling OnRetireEvent.
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *Notifier) OnNewSendToDevice(
|
|
||||||
userID string, deviceIDs []string,
|
|
||||||
posUpdate types.StreamingToken,
|
|
||||||
) {
|
|
||||||
n.streamLock.Lock()
|
|
||||||
defer n.streamLock.Unlock()
|
|
||||||
|
|
||||||
n.currPos.ApplyUpdates(posUpdate)
|
|
||||||
n.wakeupUserDevice(userID, deviceIDs, n.currPos)
|
|
||||||
}
|
|
||||||
|
|
||||||
// OnNewReceipt updates the current position
|
|
||||||
func (n *Notifier) OnNewTyping(
|
|
||||||
roomID string,
|
|
||||||
posUpdate types.StreamingToken,
|
|
||||||
) {
|
|
||||||
n.streamLock.Lock()
|
|
||||||
defer n.streamLock.Unlock()
|
|
||||||
|
|
||||||
n.currPos.ApplyUpdates(posUpdate)
|
|
||||||
n.wakeupUsers(n.joinedUsers(roomID), nil, n.currPos)
|
|
||||||
}
|
|
||||||
|
|
||||||
// OnNewReceipt updates the current position
|
|
||||||
func (n *Notifier) OnNewReceipt(
|
|
||||||
roomID string,
|
|
||||||
posUpdate types.StreamingToken,
|
|
||||||
) {
|
|
||||||
n.streamLock.Lock()
|
|
||||||
defer n.streamLock.Unlock()
|
|
||||||
|
|
||||||
n.currPos.ApplyUpdates(posUpdate)
|
|
||||||
n.wakeupUsers(n.joinedUsers(roomID), nil, n.currPos)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *Notifier) OnNewKeyChange(
|
|
||||||
posUpdate types.StreamingToken, wakeUserID, keyChangeUserID string,
|
|
||||||
) {
|
|
||||||
n.streamLock.Lock()
|
|
||||||
defer n.streamLock.Unlock()
|
|
||||||
|
|
||||||
n.currPos.ApplyUpdates(posUpdate)
|
|
||||||
n.wakeupUsers([]string{wakeUserID}, nil, n.currPos)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *Notifier) OnNewInvite(
|
|
||||||
posUpdate types.StreamingToken, wakeUserID string,
|
|
||||||
) {
|
|
||||||
n.streamLock.Lock()
|
|
||||||
defer n.streamLock.Unlock()
|
|
||||||
|
|
||||||
n.currPos.ApplyUpdates(posUpdate)
|
|
||||||
n.wakeupUsers([]string{wakeUserID}, nil, n.currPos)
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetListener returns a UserStreamListener that can be used to wait for
|
|
||||||
// updates for a user. Must be closed.
|
|
||||||
// notify for anything before sincePos
|
|
||||||
func (n *Notifier) GetListener(req syncRequest) UserDeviceStreamListener {
|
|
||||||
// Do what synapse does: https://github.com/matrix-org/synapse/blob/v0.20.0/synapse/notifier.py#L298
|
|
||||||
// - Bucket request into a lookup map keyed off a list of joined room IDs and separately a user ID
|
|
||||||
// - Incoming events wake requests for a matching room ID
|
|
||||||
// - Incoming events wake requests for a matching user ID (needed for invites)
|
|
||||||
|
|
||||||
// TODO: v1 /events 'peeking' has an 'explicit room ID' which is also tracked,
|
|
||||||
// but given we don't do /events, let's pretend it doesn't exist.
|
|
||||||
|
|
||||||
n.streamLock.Lock()
|
|
||||||
defer n.streamLock.Unlock()
|
|
||||||
|
|
||||||
n.removeEmptyUserStreams()
|
|
||||||
|
|
||||||
return n.fetchUserDeviceStream(req.device.UserID, req.device.ID, true).GetListener(req.ctx)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Load the membership states required to notify users correctly.
|
|
||||||
func (n *Notifier) Load(ctx context.Context, db storage.Database) error {
|
|
||||||
roomToUsers, err := db.AllJoinedUsersInRooms(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
n.setUsersJoinedToRooms(roomToUsers)
|
|
||||||
|
|
||||||
roomToPeekingDevices, err := db.AllPeekingDevicesInRooms(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
n.setPeekingDevices(roomToPeekingDevices)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// CurrentPosition returns the current sync position
|
|
||||||
func (n *Notifier) CurrentPosition() types.StreamingToken {
|
|
||||||
n.streamLock.Lock()
|
|
||||||
defer n.streamLock.Unlock()
|
|
||||||
|
|
||||||
return n.currPos
|
|
||||||
}
|
|
||||||
|
|
||||||
// setUsersJoinedToRooms marks the given users as 'joined' to the given rooms, such that new events from
|
|
||||||
// these rooms will wake the given users /sync requests. This should be called prior to ANY calls to
|
|
||||||
// OnNewEvent (eg on startup) to prevent racing.
|
|
||||||
func (n *Notifier) setUsersJoinedToRooms(roomIDToUserIDs map[string][]string) {
|
|
||||||
// This is just the bulk form of addJoinedUser
|
|
||||||
for roomID, userIDs := range roomIDToUserIDs {
|
|
||||||
if _, ok := n.roomIDToJoinedUsers[roomID]; !ok {
|
|
||||||
n.roomIDToJoinedUsers[roomID] = make(userIDSet)
|
|
||||||
}
|
|
||||||
for _, userID := range userIDs {
|
|
||||||
n.roomIDToJoinedUsers[roomID].add(userID)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// setPeekingDevices marks the given devices as peeking in the given rooms, such that new events from
|
|
||||||
// these rooms will wake the given devices' /sync requests. This should be called prior to ANY calls to
|
|
||||||
// OnNewEvent (eg on startup) to prevent racing.
|
|
||||||
func (n *Notifier) setPeekingDevices(roomIDToPeekingDevices map[string][]types.PeekingDevice) {
|
|
||||||
// This is just the bulk form of addPeekingDevice
|
|
||||||
for roomID, peekingDevices := range roomIDToPeekingDevices {
|
|
||||||
if _, ok := n.roomIDToPeekingDevices[roomID]; !ok {
|
|
||||||
n.roomIDToPeekingDevices[roomID] = make(peekingDeviceSet)
|
|
||||||
}
|
|
||||||
for _, peekingDevice := range peekingDevices {
|
|
||||||
n.roomIDToPeekingDevices[roomID].add(peekingDevice)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// wakeupUsers will wake up the sync strems for all of the devices for all of the
|
|
||||||
// specified user IDs, and also the specified peekingDevices
|
|
||||||
func (n *Notifier) wakeupUsers(userIDs []string, peekingDevices []types.PeekingDevice, newPos types.StreamingToken) {
|
|
||||||
for _, userID := range userIDs {
|
|
||||||
for _, stream := range n.fetchUserStreams(userID) {
|
|
||||||
if stream == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
stream.Broadcast(newPos) // wake up all goroutines Wait()ing on this stream
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, peekingDevice := range peekingDevices {
|
|
||||||
// TODO: don't bother waking up for devices whose users we already woke up
|
|
||||||
if stream := n.fetchUserDeviceStream(peekingDevice.UserID, peekingDevice.DeviceID, false); stream != nil {
|
|
||||||
stream.Broadcast(newPos) // wake up all goroutines Wait()ing on this stream
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// wakeupUserDevice will wake up the sync stream for a specific user device. Other
|
|
||||||
// device streams will be left alone.
|
|
||||||
// nolint:unused
|
|
||||||
func (n *Notifier) wakeupUserDevice(userID string, deviceIDs []string, newPos types.StreamingToken) {
|
|
||||||
for _, deviceID := range deviceIDs {
|
|
||||||
if stream := n.fetchUserDeviceStream(userID, deviceID, false); stream != nil {
|
|
||||||
stream.Broadcast(newPos) // wake up all goroutines Wait()ing on this stream
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// fetchUserDeviceStream retrieves a stream unique to the given device. If makeIfNotExists is true,
|
|
||||||
// a stream will be made for this device if one doesn't exist and it will be returned. This
|
|
||||||
// function does not wait for data to be available on the stream.
|
|
||||||
// NB: Callers should have locked the mutex before calling this function.
|
|
||||||
func (n *Notifier) fetchUserDeviceStream(userID, deviceID string, makeIfNotExists bool) *UserDeviceStream {
|
|
||||||
_, ok := n.userDeviceStreams[userID]
|
|
||||||
if !ok {
|
|
||||||
if !makeIfNotExists {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
n.userDeviceStreams[userID] = map[string]*UserDeviceStream{}
|
|
||||||
}
|
|
||||||
stream, ok := n.userDeviceStreams[userID][deviceID]
|
|
||||||
if !ok {
|
|
||||||
if !makeIfNotExists {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
// TODO: Unbounded growth of streams (1 per user)
|
|
||||||
if stream = NewUserDeviceStream(userID, deviceID, n.currPos); stream != nil {
|
|
||||||
n.userDeviceStreams[userID][deviceID] = stream
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return stream
|
|
||||||
}
|
|
||||||
|
|
||||||
// fetchUserStreams retrieves all streams for the given user. If makeIfNotExists is true,
|
|
||||||
// a stream will be made for this user if one doesn't exist and it will be returned. This
|
|
||||||
// function does not wait for data to be available on the stream.
|
|
||||||
// NB: Callers should have locked the mutex before calling this function.
|
|
||||||
func (n *Notifier) fetchUserStreams(userID string) []*UserDeviceStream {
|
|
||||||
user, ok := n.userDeviceStreams[userID]
|
|
||||||
if !ok {
|
|
||||||
return []*UserDeviceStream{}
|
|
||||||
}
|
|
||||||
streams := []*UserDeviceStream{}
|
|
||||||
for _, stream := range user {
|
|
||||||
streams = append(streams, stream)
|
|
||||||
}
|
|
||||||
return streams
|
|
||||||
}
|
|
||||||
|
|
||||||
// Not thread-safe: must be called on the OnNewEvent goroutine only
|
|
||||||
func (n *Notifier) addJoinedUser(roomID, userID string) {
|
|
||||||
if _, ok := n.roomIDToJoinedUsers[roomID]; !ok {
|
|
||||||
n.roomIDToJoinedUsers[roomID] = make(userIDSet)
|
|
||||||
}
|
|
||||||
n.roomIDToJoinedUsers[roomID].add(userID)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Not thread-safe: must be called on the OnNewEvent goroutine only
|
|
||||||
func (n *Notifier) removeJoinedUser(roomID, userID string) {
|
|
||||||
if _, ok := n.roomIDToJoinedUsers[roomID]; !ok {
|
|
||||||
n.roomIDToJoinedUsers[roomID] = make(userIDSet)
|
|
||||||
}
|
|
||||||
n.roomIDToJoinedUsers[roomID].remove(userID)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Not thread-safe: must be called on the OnNewEvent goroutine only
|
|
||||||
func (n *Notifier) joinedUsers(roomID string) (userIDs []string) {
|
|
||||||
if _, ok := n.roomIDToJoinedUsers[roomID]; !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
return n.roomIDToJoinedUsers[roomID].values()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Not thread-safe: must be called on the OnNewEvent goroutine only
|
|
||||||
func (n *Notifier) addPeekingDevice(roomID, userID, deviceID string) {
|
|
||||||
if _, ok := n.roomIDToPeekingDevices[roomID]; !ok {
|
|
||||||
n.roomIDToPeekingDevices[roomID] = make(peekingDeviceSet)
|
|
||||||
}
|
|
||||||
n.roomIDToPeekingDevices[roomID].add(types.PeekingDevice{UserID: userID, DeviceID: deviceID})
|
|
||||||
}
|
|
||||||
|
|
||||||
// Not thread-safe: must be called on the OnNewEvent goroutine only
|
|
||||||
// nolint:unused
|
|
||||||
func (n *Notifier) removePeekingDevice(roomID, userID, deviceID string) {
|
|
||||||
if _, ok := n.roomIDToPeekingDevices[roomID]; !ok {
|
|
||||||
n.roomIDToPeekingDevices[roomID] = make(peekingDeviceSet)
|
|
||||||
}
|
|
||||||
// XXX: is this going to work as a key?
|
|
||||||
n.roomIDToPeekingDevices[roomID].remove(types.PeekingDevice{UserID: userID, DeviceID: deviceID})
|
|
||||||
}
|
|
||||||
|
|
||||||
// Not thread-safe: must be called on the OnNewEvent goroutine only
|
|
||||||
func (n *Notifier) PeekingDevices(roomID string) (peekingDevices []types.PeekingDevice) {
|
|
||||||
if _, ok := n.roomIDToPeekingDevices[roomID]; !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
return n.roomIDToPeekingDevices[roomID].values()
|
|
||||||
}
|
|
||||||
|
|
||||||
// removeEmptyUserStreams iterates through the user stream map and removes any
|
|
||||||
// that have been empty for a certain amount of time. This is a crude way of
|
|
||||||
// ensuring that the userStreams map doesn't grow forver.
|
|
||||||
// This should be called when the notifier gets called for whatever reason,
|
|
||||||
// the function itself is responsible for ensuring it doesn't iterate too
|
|
||||||
// often.
|
|
||||||
// NB: Callers should have locked the mutex before calling this function.
|
|
||||||
func (n *Notifier) removeEmptyUserStreams() {
|
|
||||||
// Only clean up now and again
|
|
||||||
now := time.Now()
|
|
||||||
if n.lastCleanUpTime.Add(time.Minute).After(now) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
n.lastCleanUpTime = now
|
|
||||||
|
|
||||||
deleteBefore := now.Add(-5 * time.Minute)
|
|
||||||
for user, byUser := range n.userDeviceStreams {
|
|
||||||
for device, stream := range byUser {
|
|
||||||
if stream.TimeOfLastNonEmpty().Before(deleteBefore) {
|
|
||||||
delete(n.userDeviceStreams[user], device)
|
|
||||||
}
|
|
||||||
if len(n.userDeviceStreams[user]) == 0 {
|
|
||||||
delete(n.userDeviceStreams, user)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// A string set, mainly existing for improving clarity of structs in this file.
|
|
||||||
type userIDSet map[string]bool
|
|
||||||
|
|
||||||
func (s userIDSet) add(str string) {
|
|
||||||
s[str] = true
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s userIDSet) remove(str string) {
|
|
||||||
delete(s, str)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s userIDSet) values() (vals []string) {
|
|
||||||
for str := range s {
|
|
||||||
vals = append(vals, str)
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// A set of PeekingDevices, similar to userIDSet
|
|
||||||
|
|
||||||
type peekingDeviceSet map[types.PeekingDevice]bool
|
|
||||||
|
|
||||||
func (s peekingDeviceSet) add(d types.PeekingDevice) {
|
|
||||||
s[d] = true
|
|
||||||
}
|
|
||||||
|
|
||||||
// nolint:unused
|
|
||||||
func (s peekingDeviceSet) remove(d types.PeekingDevice) {
|
|
||||||
delete(s, d)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s peekingDeviceSet) values() (vals []types.PeekingDevice) {
|
|
||||||
for d := range s {
|
|
||||||
vals = append(vals, d)
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
@ -1,374 +0,0 @@
|
||||||
// Copyright 2017 Vector Creations Ltd
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package sync
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
|
||||||
"sync"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
|
||||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
|
||||||
"github.com/matrix-org/util"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
randomMessageEvent gomatrixserverlib.HeaderedEvent
|
|
||||||
aliceInviteBobEvent gomatrixserverlib.HeaderedEvent
|
|
||||||
bobLeaveEvent gomatrixserverlib.HeaderedEvent
|
|
||||||
syncPositionVeryOld = types.StreamingToken{PDUPosition: 5}
|
|
||||||
syncPositionBefore = types.StreamingToken{PDUPosition: 11}
|
|
||||||
syncPositionAfter = types.StreamingToken{PDUPosition: 12}
|
|
||||||
//syncPositionNewEDU = types.NewStreamToken(syncPositionAfter.PDUPosition, 1, 0, 0, nil)
|
|
||||||
syncPositionAfter2 = types.StreamingToken{PDUPosition: 13}
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
roomID = "!test:localhost"
|
|
||||||
alice = "@alice:localhost"
|
|
||||||
aliceDev = "alicedevice"
|
|
||||||
bob = "@bob:localhost"
|
|
||||||
bobDev = "bobdev"
|
|
||||||
)
|
|
||||||
|
|
||||||
func init() {
|
|
||||||
var err error
|
|
||||||
err = json.Unmarshal([]byte(`{
|
|
||||||
"_room_version": "1",
|
|
||||||
"type": "m.room.message",
|
|
||||||
"content": {
|
|
||||||
"body": "Hello World",
|
|
||||||
"msgtype": "m.text"
|
|
||||||
},
|
|
||||||
"sender": "@noone:localhost",
|
|
||||||
"room_id": "`+roomID+`",
|
|
||||||
"origin": "localhost",
|
|
||||||
"origin_server_ts": 12345,
|
|
||||||
"event_id": "$randomMessageEvent:localhost"
|
|
||||||
}`), &randomMessageEvent)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
err = json.Unmarshal([]byte(`{
|
|
||||||
"_room_version": "1",
|
|
||||||
"type": "m.room.member",
|
|
||||||
"state_key": "`+bob+`",
|
|
||||||
"content": {
|
|
||||||
"membership": "invite"
|
|
||||||
},
|
|
||||||
"sender": "`+alice+`",
|
|
||||||
"room_id": "`+roomID+`",
|
|
||||||
"origin": "localhost",
|
|
||||||
"origin_server_ts": 12345,
|
|
||||||
"event_id": "$aliceInviteBobEvent:localhost"
|
|
||||||
}`), &aliceInviteBobEvent)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
err = json.Unmarshal([]byte(`{
|
|
||||||
"_room_version": "1",
|
|
||||||
"type": "m.room.member",
|
|
||||||
"state_key": "`+bob+`",
|
|
||||||
"content": {
|
|
||||||
"membership": "leave"
|
|
||||||
},
|
|
||||||
"sender": "`+bob+`",
|
|
||||||
"room_id": "`+roomID+`",
|
|
||||||
"origin": "localhost",
|
|
||||||
"origin_server_ts": 12345,
|
|
||||||
"event_id": "$bobLeaveEvent:localhost"
|
|
||||||
}`), &bobLeaveEvent)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func mustEqualPositions(t *testing.T, got, want types.StreamingToken) {
|
|
||||||
if got.String() != want.String() {
|
|
||||||
t.Fatalf("mustEqualPositions got %s want %s", got.String(), want.String())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test that the current position is returned if a request is already behind.
|
|
||||||
func TestImmediateNotification(t *testing.T) {
|
|
||||||
n := NewNotifier(syncPositionBefore)
|
|
||||||
pos, err := waitForEvents(n, newTestSyncRequest(alice, aliceDev, syncPositionVeryOld))
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("TestImmediateNotification error: %s", err)
|
|
||||||
}
|
|
||||||
mustEqualPositions(t, pos, syncPositionBefore)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test that new events to a joined room unblocks the request.
|
|
||||||
func TestNewEventAndJoinedToRoom(t *testing.T) {
|
|
||||||
n := NewNotifier(syncPositionBefore)
|
|
||||||
n.setUsersJoinedToRooms(map[string][]string{
|
|
||||||
roomID: {alice, bob},
|
|
||||||
})
|
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
pos, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionBefore))
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("TestNewEventAndJoinedToRoom error: %w", err)
|
|
||||||
}
|
|
||||||
mustEqualPositions(t, pos, syncPositionAfter)
|
|
||||||
wg.Done()
|
|
||||||
}()
|
|
||||||
|
|
||||||
stream := lockedFetchUserStream(n, bob, bobDev)
|
|
||||||
waitForBlocking(stream, 1)
|
|
||||||
|
|
||||||
n.OnNewEvent(&randomMessageEvent, "", nil, syncPositionAfter)
|
|
||||||
|
|
||||||
wg.Wait()
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestCorrectStream(t *testing.T) {
|
|
||||||
n := NewNotifier(syncPositionBefore)
|
|
||||||
stream := lockedFetchUserStream(n, bob, bobDev)
|
|
||||||
if stream.UserID != bob {
|
|
||||||
t.Fatalf("expected user %q, got %q", bob, stream.UserID)
|
|
||||||
}
|
|
||||||
if stream.DeviceID != bobDev {
|
|
||||||
t.Fatalf("expected device %q, got %q", bobDev, stream.DeviceID)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestCorrectStreamWakeup(t *testing.T) {
|
|
||||||
n := NewNotifier(syncPositionBefore)
|
|
||||||
awoken := make(chan string)
|
|
||||||
|
|
||||||
streamone := lockedFetchUserStream(n, alice, "one")
|
|
||||||
streamtwo := lockedFetchUserStream(n, alice, "two")
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
select {
|
|
||||||
case <-streamone.signalChannel:
|
|
||||||
awoken <- "one"
|
|
||||||
case <-streamtwo.signalChannel:
|
|
||||||
awoken <- "two"
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
time.Sleep(1 * time.Second)
|
|
||||||
|
|
||||||
wake := "two"
|
|
||||||
n.wakeupUserDevice(alice, []string{wake}, syncPositionAfter)
|
|
||||||
|
|
||||||
if result := <-awoken; result != wake {
|
|
||||||
t.Fatalf("expected to wake %q, got %q", wake, result)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test that an invite unblocks the request
|
|
||||||
func TestNewInviteEventForUser(t *testing.T) {
|
|
||||||
n := NewNotifier(syncPositionBefore)
|
|
||||||
n.setUsersJoinedToRooms(map[string][]string{
|
|
||||||
roomID: {alice, bob},
|
|
||||||
})
|
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
pos, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionBefore))
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("TestNewInviteEventForUser error: %w", err)
|
|
||||||
}
|
|
||||||
mustEqualPositions(t, pos, syncPositionAfter)
|
|
||||||
wg.Done()
|
|
||||||
}()
|
|
||||||
|
|
||||||
stream := lockedFetchUserStream(n, bob, bobDev)
|
|
||||||
waitForBlocking(stream, 1)
|
|
||||||
|
|
||||||
n.OnNewEvent(&aliceInviteBobEvent, "", nil, syncPositionAfter)
|
|
||||||
|
|
||||||
wg.Wait()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test an EDU-only update wakes up the request.
|
|
||||||
// TODO: Fix this test, invites wake up with an incremented
|
|
||||||
// PDU position, not EDU position
|
|
||||||
/*
|
|
||||||
func TestEDUWakeup(t *testing.T) {
|
|
||||||
n := NewNotifier(syncPositionAfter)
|
|
||||||
n.setUsersJoinedToRooms(map[string][]string{
|
|
||||||
roomID: {alice, bob},
|
|
||||||
})
|
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
pos, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionAfter))
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("TestNewInviteEventForUser error: %w", err)
|
|
||||||
}
|
|
||||||
mustEqualPositions(t, pos, syncPositionNewEDU)
|
|
||||||
wg.Done()
|
|
||||||
}()
|
|
||||||
|
|
||||||
stream := lockedFetchUserStream(n, bob, bobDev)
|
|
||||||
waitForBlocking(stream, 1)
|
|
||||||
|
|
||||||
n.OnNewEvent(&aliceInviteBobEvent, "", nil, syncPositionNewEDU)
|
|
||||||
|
|
||||||
wg.Wait()
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
|
||||||
// Test that all blocked requests get woken up on a new event.
|
|
||||||
func TestMultipleRequestWakeup(t *testing.T) {
|
|
||||||
n := NewNotifier(syncPositionBefore)
|
|
||||||
n.setUsersJoinedToRooms(map[string][]string{
|
|
||||||
roomID: {alice, bob},
|
|
||||||
})
|
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
wg.Add(3)
|
|
||||||
poll := func() {
|
|
||||||
pos, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionBefore))
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("TestMultipleRequestWakeup error: %w", err)
|
|
||||||
}
|
|
||||||
mustEqualPositions(t, pos, syncPositionAfter)
|
|
||||||
wg.Done()
|
|
||||||
}
|
|
||||||
go poll()
|
|
||||||
go poll()
|
|
||||||
go poll()
|
|
||||||
|
|
||||||
stream := lockedFetchUserStream(n, bob, bobDev)
|
|
||||||
waitForBlocking(stream, 3)
|
|
||||||
|
|
||||||
n.OnNewEvent(&randomMessageEvent, "", nil, syncPositionAfter)
|
|
||||||
|
|
||||||
wg.Wait()
|
|
||||||
|
|
||||||
numWaiting := stream.NumWaiting()
|
|
||||||
if numWaiting != 0 {
|
|
||||||
t.Errorf("TestMultipleRequestWakeup NumWaiting() want 0, got %d", numWaiting)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test that you stop getting woken up when you leave a room.
|
|
||||||
func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
|
|
||||||
// listen as bob. Make bob leave room. Make alice send event to room.
|
|
||||||
// Make sure alice gets woken up only and not bob as well.
|
|
||||||
n := NewNotifier(syncPositionBefore)
|
|
||||||
n.setUsersJoinedToRooms(map[string][]string{
|
|
||||||
roomID: {alice, bob},
|
|
||||||
})
|
|
||||||
|
|
||||||
var leaveWG sync.WaitGroup
|
|
||||||
|
|
||||||
// Make bob leave the room
|
|
||||||
leaveWG.Add(1)
|
|
||||||
go func() {
|
|
||||||
pos, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionBefore))
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom error: %w", err)
|
|
||||||
}
|
|
||||||
mustEqualPositions(t, pos, syncPositionAfter)
|
|
||||||
leaveWG.Done()
|
|
||||||
}()
|
|
||||||
bobStream := lockedFetchUserStream(n, bob, bobDev)
|
|
||||||
waitForBlocking(bobStream, 1)
|
|
||||||
n.OnNewEvent(&bobLeaveEvent, "", nil, syncPositionAfter)
|
|
||||||
leaveWG.Wait()
|
|
||||||
|
|
||||||
// send an event into the room. Make sure alice gets it. Bob should not.
|
|
||||||
var aliceWG sync.WaitGroup
|
|
||||||
aliceStream := lockedFetchUserStream(n, alice, aliceDev)
|
|
||||||
aliceWG.Add(1)
|
|
||||||
go func() {
|
|
||||||
pos, err := waitForEvents(n, newTestSyncRequest(alice, aliceDev, syncPositionAfter))
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom error: %w", err)
|
|
||||||
}
|
|
||||||
mustEqualPositions(t, pos, syncPositionAfter2)
|
|
||||||
aliceWG.Done()
|
|
||||||
}()
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
// this should timeout with an error (but the main goroutine won't wait for the timeout explicitly)
|
|
||||||
_, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionAfter))
|
|
||||||
if err == nil {
|
|
||||||
t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom expect error but got nil")
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
waitForBlocking(aliceStream, 1)
|
|
||||||
waitForBlocking(bobStream, 1)
|
|
||||||
|
|
||||||
n.OnNewEvent(&randomMessageEvent, "", nil, syncPositionAfter2)
|
|
||||||
aliceWG.Wait()
|
|
||||||
|
|
||||||
// it's possible that at this point alice has been informed and bob is about to be informed, so wait
|
|
||||||
// for a fraction of a second to account for this race
|
|
||||||
time.Sleep(1 * time.Millisecond)
|
|
||||||
}
|
|
||||||
|
|
||||||
func waitForEvents(n *Notifier, req syncRequest) (types.StreamingToken, error) {
|
|
||||||
listener := n.GetListener(req)
|
|
||||||
defer listener.Close()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-time.After(5 * time.Second):
|
|
||||||
return types.StreamingToken{}, fmt.Errorf(
|
|
||||||
"waitForEvents timed out waiting for %s (pos=%v)", req.device.UserID, req.since,
|
|
||||||
)
|
|
||||||
case <-listener.GetNotifyChannel(req.since):
|
|
||||||
p := listener.GetSyncPosition()
|
|
||||||
return p, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait until something is Wait()ing on the user stream.
|
|
||||||
func waitForBlocking(s *UserDeviceStream, numBlocking uint) {
|
|
||||||
for numBlocking != s.NumWaiting() {
|
|
||||||
// This is horrible but I don't want to add a signalling mechanism JUST for testing.
|
|
||||||
time.Sleep(1 * time.Microsecond)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// lockedFetchUserStream invokes Notifier.fetchUserStream, respecting Notifier.streamLock.
|
|
||||||
// A new stream is made if it doesn't exist already.
|
|
||||||
func lockedFetchUserStream(n *Notifier, userID, deviceID string) *UserDeviceStream {
|
|
||||||
n.streamLock.Lock()
|
|
||||||
defer n.streamLock.Unlock()
|
|
||||||
|
|
||||||
return n.fetchUserDeviceStream(userID, deviceID, true)
|
|
||||||
}
|
|
||||||
|
|
||||||
func newTestSyncRequest(userID, deviceID string, since types.StreamingToken) syncRequest {
|
|
||||||
return syncRequest{
|
|
||||||
device: userapi.Device{
|
|
||||||
UserID: userID,
|
|
||||||
ID: deviceID,
|
|
||||||
},
|
|
||||||
timeout: 1 * time.Minute,
|
|
||||||
since: since,
|
|
||||||
wantFullState: false,
|
|
||||||
limit: DefaultTimelineLimit,
|
|
||||||
log: util.GetLogger(context.TODO()),
|
|
||||||
ctx: context.TODO(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -15,7 +15,6 @@
|
||||||
package sync
|
package sync
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
|
@ -26,7 +25,6 @@ import (
|
||||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
log "github.com/sirupsen/logrus"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const defaultSyncTimeout = time.Duration(0)
|
const defaultSyncTimeout = time.Duration(0)
|
||||||
|
|
@ -40,18 +38,7 @@ type filter struct {
|
||||||
} `json:"room"`
|
} `json:"room"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// syncRequest represents a /sync request, with sensible defaults/sanity checks applied.
|
func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Database) (*types.StreamRangeRequest, error) {
|
||||||
type syncRequest struct {
|
|
||||||
ctx context.Context
|
|
||||||
device userapi.Device
|
|
||||||
limit int
|
|
||||||
timeout time.Duration
|
|
||||||
since types.StreamingToken // nil means that no since token was supplied
|
|
||||||
wantFullState bool
|
|
||||||
log *log.Entry
|
|
||||||
}
|
|
||||||
|
|
||||||
func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Database) (*syncRequest, error) {
|
|
||||||
timeout := getTimeout(req.URL.Query().Get("timeout"))
|
timeout := getTimeout(req.URL.Query().Get("timeout"))
|
||||||
fullState := req.URL.Query().Get("full_state")
|
fullState := req.URL.Query().Get("full_state")
|
||||||
wantFullState := fullState != "" && fullState != "false"
|
wantFullState := fullState != "" && fullState != "false"
|
||||||
|
|
@ -88,14 +75,17 @@ func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Dat
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// TODO: Additional query params: set_presence, filter
|
// TODO: Additional query params: set_presence, filter
|
||||||
return &syncRequest{
|
|
||||||
ctx: req.Context(),
|
return &types.StreamRangeRequest{
|
||||||
device: device,
|
Context: req.Context(), //
|
||||||
timeout: timeout,
|
Device: &device, //
|
||||||
since: since,
|
Response: types.NewResponse(), // Populated by all streams
|
||||||
wantFullState: wantFullState,
|
Filter: gomatrixserverlib.DefaultEventFilter(), //
|
||||||
limit: timelineLimit,
|
Since: since, //
|
||||||
log: util.GetLogger(req.Context()),
|
Timeout: timeout, //
|
||||||
|
Limit: timelineLimit, //
|
||||||
|
Rooms: make(map[string]string), // Populated by the PDU stream
|
||||||
|
WantFullState: wantFullState, //
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -33,7 +33,6 @@ import (
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
@ -57,7 +56,7 @@ type RequestPool struct {
|
||||||
|
|
||||||
// NewRequestPool makes a new RequestPool
|
// NewRequestPool makes a new RequestPool
|
||||||
func NewRequestPool(
|
func NewRequestPool(
|
||||||
db storage.Database, cfg *config.SyncAPI, n *Notifier,
|
db storage.Database, cfg *config.SyncAPI,
|
||||||
userAPI userapi.UserInternalAPI, keyAPI keyapi.KeyInternalAPI,
|
userAPI userapi.UserInternalAPI, keyAPI keyapi.KeyInternalAPI,
|
||||||
rsAPI roomserverAPI.RoomserverInternalAPI,
|
rsAPI roomserverAPI.RoomserverInternalAPI,
|
||||||
) *RequestPool {
|
) *RequestPool {
|
||||||
|
|
@ -160,13 +159,11 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
|
||||||
logger := util.GetLogger(req.Context()).WithFields(log.Fields{
|
logger := util.GetLogger(req.Context()).WithFields(log.Fields{
|
||||||
"user_id": device.UserID,
|
"user_id": device.UserID,
|
||||||
"device_id": device.ID,
|
"device_id": device.ID,
|
||||||
"since": syncReq.since,
|
"since": syncReq.Since,
|
||||||
"timeout": syncReq.timeout,
|
"timeout": syncReq.Timeout,
|
||||||
"limit": syncReq.limit,
|
"limit": syncReq.Limit,
|
||||||
})
|
})
|
||||||
|
|
||||||
_ = logger
|
|
||||||
|
|
||||||
activeSyncRequests.Inc()
|
activeSyncRequests.Inc()
|
||||||
defer activeSyncRequests.Dec()
|
defer activeSyncRequests.Dec()
|
||||||
|
|
||||||
|
|
@ -176,50 +173,54 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
|
||||||
defer waitingSyncRequests.Dec()
|
defer waitingSyncRequests.Dec()
|
||||||
|
|
||||||
if !rp.shouldReturnImmediately(syncReq) {
|
if !rp.shouldReturnImmediately(syncReq) {
|
||||||
timer := time.NewTimer(syncReq.timeout) // case of timeout=0 is handled above
|
timer := time.NewTimer(syncReq.Timeout) // case of timeout=0 is handled above
|
||||||
defer timer.Stop()
|
defer timer.Stop()
|
||||||
|
|
||||||
|
// Use a subcontext so that we don't keep the StreamNotifyAfter
|
||||||
|
// goroutines alive any longer than they really need to be.
|
||||||
|
waitctx, waitcancel := context.WithCancel(syncReq.Context)
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-syncReq.ctx.Done(): // Caller gave up
|
case <-waitctx.Done(): // Caller gave up
|
||||||
|
waitcancel()
|
||||||
return util.JSONResponse{Code: http.StatusOK, JSON: syncData}
|
return util.JSONResponse{Code: http.StatusOK, JSON: syncData}
|
||||||
|
|
||||||
case <-timer.C: // Timeout reached
|
case <-timer.C: // Timeout reached
|
||||||
|
waitcancel()
|
||||||
return util.JSONResponse{Code: http.StatusOK, JSON: syncData}
|
return util.JSONResponse{Code: http.StatusOK, JSON: syncData}
|
||||||
|
|
||||||
case <-rp.pduStream.StreamNotifyAfter(syncReq.ctx, syncReq.since):
|
case <-rp.pduStream.StreamNotifyAfter(waitctx, syncReq.Since):
|
||||||
case <-rp.typingStream.StreamNotifyAfter(syncReq.ctx, syncReq.since):
|
case <-rp.typingStream.StreamNotifyAfter(waitctx, syncReq.Since):
|
||||||
case <-rp.receiptStream.StreamNotifyAfter(syncReq.ctx, syncReq.since):
|
case <-rp.receiptStream.StreamNotifyAfter(waitctx, syncReq.Since):
|
||||||
// case <-rp.sendToDeviceStream.StreamNotifyAfter(syncReq.ctx, syncReq.since):
|
// case <-rp.sendToDeviceStream.StreamNotifyAfter(waitctx, syncReq.Since):
|
||||||
// case <-rp.inviteStream.StreamNotifyAfter(syncReq.ctx, syncReq.since):
|
// case <-rp.inviteStream.StreamNotifyAfter(waitctx, syncReq.Since):
|
||||||
// case <-rp.deviceListStream.StreamNotifyAfter(syncReq.ctx, syncReq.since):
|
// case <-rp.deviceListStream.StreamNotifyAfter(waitctx, syncReq.Since):
|
||||||
}
|
}
|
||||||
|
|
||||||
|
waitcancel()
|
||||||
|
logger.Println("Responding to sync after notify")
|
||||||
|
} else {
|
||||||
|
logger.Println("Responding to sync immediately")
|
||||||
}
|
}
|
||||||
|
|
||||||
var latest types.StreamingToken
|
var latest types.StreamingToken
|
||||||
latest.ApplyUpdates(rp.pduStream.StreamLatestPosition(syncReq.ctx))
|
latest.ApplyUpdates(rp.pduStream.StreamLatestPosition(syncReq.Context))
|
||||||
latest.ApplyUpdates(rp.typingStream.StreamLatestPosition(syncReq.ctx))
|
latest.ApplyUpdates(rp.typingStream.StreamLatestPosition(syncReq.Context))
|
||||||
latest.ApplyUpdates(rp.receiptStream.StreamLatestPosition(syncReq.ctx))
|
latest.ApplyUpdates(rp.receiptStream.StreamLatestPosition(syncReq.Context))
|
||||||
// latest.ApplyUpdates(rp.sendToDeviceStream.StreamLatestPosition(syncReq.ctx))
|
// latest.ApplyUpdates(rp.sendToDeviceStream.StreamLatestPosition(syncReq.Context))
|
||||||
// latest.ApplyUpdates(rp.inviteStream.StreamLatestPosition(syncReq.ctx))
|
// latest.ApplyUpdates(rp.inviteStream.StreamLatestPosition(syncReq.Context))
|
||||||
// latest.ApplyUpdates(rp.deviceListStream.StreamLatestPosition(syncReq.ctx))
|
// latest.ApplyUpdates(rp.deviceListStream.StreamLatestPosition(syncReq.Context))
|
||||||
|
|
||||||
sr := &types.StreamRangeRequest{
|
syncReq.Response.NextBatch.ApplyUpdates(rp.pduStream.StreamRange(syncReq.Context, syncReq, syncReq.Since, latest))
|
||||||
Device: device, //
|
syncReq.Response.NextBatch.ApplyUpdates(rp.typingStream.StreamRange(syncReq.Context, syncReq, syncReq.Since, latest))
|
||||||
Response: types.NewResponse(), // Populated by all streams
|
syncReq.Response.NextBatch.ApplyUpdates(rp.receiptStream.StreamRange(syncReq.Context, syncReq, syncReq.Since, latest))
|
||||||
Filter: gomatrixserverlib.DefaultEventFilter(), //
|
// syncReq.Response.NextBatch.ApplyUpdates(rp.sendToDeviceStream.StreamRange(syncReq.Context, syncReq, syncReq.Since, latest))
|
||||||
Rooms: make(map[string]string), // Populated by the PDU stream
|
// syncReq.Response.NextBatch.ApplyUpdates(rp.inviteStream.StreamRange(syncReq.Context, syncReq, syncReq.Since, latest))
|
||||||
}
|
// syncReq.Response.NextBatch.ApplyUpdates(rp.inviteStream.StreamRange(syncReq.Context, syncReq, syncReq.Since, latest))
|
||||||
|
|
||||||
sr.Response.NextBatch.ApplyUpdates(rp.pduStream.StreamRange(syncReq.ctx, sr, syncReq.since, latest))
|
|
||||||
sr.Response.NextBatch.ApplyUpdates(rp.typingStream.StreamRange(syncReq.ctx, sr, syncReq.since, latest))
|
|
||||||
sr.Response.NextBatch.ApplyUpdates(rp.receiptStream.StreamRange(syncReq.ctx, sr, syncReq.since, latest))
|
|
||||||
// sr.Response.NextBatch.ApplyUpdates(rp.sendToDeviceStream.StreamRange(syncReq.ctx, sr, syncReq.since, latest))
|
|
||||||
// sr.Response.NextBatch.ApplyUpdates(rp.inviteStream.StreamRange(syncReq.ctx, sr, syncReq.since, latest))
|
|
||||||
// sr.Response.NextBatch.ApplyUpdates(rp.inviteStream.StreamRange(syncReq.ctx, sr, syncReq.since, latest))
|
|
||||||
|
|
||||||
return util.JSONResponse{
|
return util.JSONResponse{
|
||||||
Code: http.StatusOK,
|
Code: http.StatusOK,
|
||||||
JSON: sr.Response,
|
JSON: syncReq.Response,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -458,10 +459,10 @@ func (rp *RequestPool) appendAccountData(
|
||||||
// shouldReturnImmediately returns whether the /sync request is an initial sync,
|
// shouldReturnImmediately returns whether the /sync request is an initial sync,
|
||||||
// or timeout=0, or full_state=true, in any of the cases the request should
|
// or timeout=0, or full_state=true, in any of the cases the request should
|
||||||
// return immediately.
|
// return immediately.
|
||||||
func (rp *RequestPool) shouldReturnImmediately(syncReq *syncRequest) bool {
|
func (rp *RequestPool) shouldReturnImmediately(syncReq *types.StreamRangeRequest) bool {
|
||||||
if syncReq.since.IsEmpty() || syncReq.timeout == 0 || syncReq.wantFullState {
|
if syncReq.Since.IsEmpty() || syncReq.Timeout == 0 || syncReq.WantFullState {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
waiting, werr := rp.db.SendToDeviceUpdatesWaiting(context.TODO(), syncReq.device.UserID, syncReq.device.ID)
|
waiting, werr := rp.db.SendToDeviceUpdatesWaiting(context.TODO(), syncReq.Device.UserID, syncReq.Device.ID)
|
||||||
return werr == nil && waiting
|
return werr == nil && waiting
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -15,8 +15,6 @@
|
||||||
package syncapi
|
package syncapi
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
|
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
|
|
||||||
|
|
@ -50,57 +48,46 @@ func AddPublicRoutes(
|
||||||
logrus.WithError(err).Panicf("failed to connect to sync db")
|
logrus.WithError(err).Panicf("failed to connect to sync db")
|
||||||
}
|
}
|
||||||
|
|
||||||
pos, err := syncDB.SyncPosition(context.Background())
|
requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI)
|
||||||
if err != nil {
|
|
||||||
logrus.WithError(err).Panicf("failed to get sync position")
|
|
||||||
}
|
|
||||||
|
|
||||||
notifier := sync.NewNotifier(pos)
|
|
||||||
err = notifier.Load(context.Background(), syncDB)
|
|
||||||
if err != nil {
|
|
||||||
logrus.WithError(err).Panicf("failed to start notifier")
|
|
||||||
}
|
|
||||||
|
|
||||||
requestPool := sync.NewRequestPool(syncDB, cfg, notifier, userAPI, keyAPI, rsAPI)
|
|
||||||
|
|
||||||
keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer(
|
keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer(
|
||||||
cfg.Matrix.ServerName, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputKeyChangeEvent)),
|
cfg.Matrix.ServerName, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputKeyChangeEvent)),
|
||||||
consumer, notifier, keyAPI, rsAPI, syncDB,
|
consumer, keyAPI, rsAPI, syncDB,
|
||||||
)
|
)
|
||||||
if err = keyChangeConsumer.Start(); err != nil {
|
if err = keyChangeConsumer.Start(); err != nil {
|
||||||
logrus.WithError(err).Panicf("failed to start key change consumer")
|
logrus.WithError(err).Panicf("failed to start key change consumer")
|
||||||
}
|
}
|
||||||
|
|
||||||
roomConsumer := consumers.NewOutputRoomEventConsumer(
|
roomConsumer := consumers.NewOutputRoomEventConsumer(
|
||||||
cfg, consumer, notifier, syncDB, rsAPI,
|
cfg, consumer, syncDB, rsAPI,
|
||||||
)
|
)
|
||||||
if err = roomConsumer.Start(); err != nil {
|
if err = roomConsumer.Start(); err != nil {
|
||||||
logrus.WithError(err).Panicf("failed to start room server consumer")
|
logrus.WithError(err).Panicf("failed to start room server consumer")
|
||||||
}
|
}
|
||||||
|
|
||||||
clientConsumer := consumers.NewOutputClientDataConsumer(
|
clientConsumer := consumers.NewOutputClientDataConsumer(
|
||||||
cfg, consumer, notifier, syncDB,
|
cfg, consumer, syncDB,
|
||||||
)
|
)
|
||||||
if err = clientConsumer.Start(); err != nil {
|
if err = clientConsumer.Start(); err != nil {
|
||||||
logrus.WithError(err).Panicf("failed to start client data consumer")
|
logrus.WithError(err).Panicf("failed to start client data consumer")
|
||||||
}
|
}
|
||||||
|
|
||||||
typingConsumer := consumers.NewOutputTypingEventConsumer(
|
typingConsumer := consumers.NewOutputTypingEventConsumer(
|
||||||
cfg, consumer, notifier, syncDB,
|
cfg, consumer, syncDB,
|
||||||
)
|
)
|
||||||
if err = typingConsumer.Start(); err != nil {
|
if err = typingConsumer.Start(); err != nil {
|
||||||
logrus.WithError(err).Panicf("failed to start typing consumer")
|
logrus.WithError(err).Panicf("failed to start typing consumer")
|
||||||
}
|
}
|
||||||
|
|
||||||
sendToDeviceConsumer := consumers.NewOutputSendToDeviceEventConsumer(
|
sendToDeviceConsumer := consumers.NewOutputSendToDeviceEventConsumer(
|
||||||
cfg, consumer, notifier, syncDB,
|
cfg, consumer, syncDB,
|
||||||
)
|
)
|
||||||
if err = sendToDeviceConsumer.Start(); err != nil {
|
if err = sendToDeviceConsumer.Start(); err != nil {
|
||||||
logrus.WithError(err).Panicf("failed to start send-to-device consumer")
|
logrus.WithError(err).Panicf("failed to start send-to-device consumer")
|
||||||
}
|
}
|
||||||
|
|
||||||
receiptConsumer := consumers.NewOutputReceiptEventConsumer(
|
receiptConsumer := consumers.NewOutputReceiptEventConsumer(
|
||||||
cfg, consumer, notifier, syncDB,
|
cfg, consumer, syncDB,
|
||||||
)
|
)
|
||||||
if err = receiptConsumer.Start(); err != nil {
|
if err = receiptConsumer.Start(); err != nil {
|
||||||
logrus.WithError(err).Panicf("failed to start receipts consumer")
|
logrus.WithError(err).Panicf("failed to start receipts consumer")
|
||||||
|
|
|
||||||
|
|
@ -2,15 +2,24 @@ package types
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
)
|
)
|
||||||
|
|
||||||
type StreamRangeRequest struct {
|
type StreamRangeRequest struct {
|
||||||
|
Context context.Context
|
||||||
Device *userapi.Device
|
Device *userapi.Device
|
||||||
Response *Response
|
Response *Response
|
||||||
Filter gomatrixserverlib.EventFilter
|
Filter gomatrixserverlib.EventFilter
|
||||||
|
Since StreamingToken
|
||||||
|
Limit int
|
||||||
|
Timeout time.Duration
|
||||||
|
WantFullState bool
|
||||||
|
|
||||||
|
// Below this line are items updated by the
|
||||||
|
// stream providers. Not thread-safe.
|
||||||
Rooms map[string]string
|
Rooms map[string]string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue