Merge branch 'master' into patch-1

This commit is contained in:
Matt Voboril 2021-01-12 15:27:05 -06:00 committed by GitHub
commit 2851a9a605
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
39 changed files with 1496 additions and 1150 deletions

View file

@ -1,5 +1,18 @@
# Changelog
## Dendrite 0.3.5 (2021-01-11)
### Features
* All `/sync` streams are now logically separate after a refactoring exercise
## Fixes
* Event references are now deeply checked properly when calculating forward extremities, reducing the amount of forward extremities in most cases, which improves RAM utilisation and reduces the work done by state resolution
* Sync no longer sends incorrect `next_batch` tokens with old stream positions, reducing flashbacks of old messages in clients
* The federation `/send` endpoint no longer uses the request context, which could result in some events failing to be persisted if the sending server gave up the HTTP connection
* Appservices can now auth as users in their namespaces properly
## Dendrite 0.3.4 (2020-12-18)
### Features

View file

@ -113,19 +113,6 @@ func (t *EDUCache) AddTypingUser(
return t.GetLatestSyncPosition()
}
// AddSendToDeviceMessage increases the sync position for
// send-to-device updates.
// Returns the sync position before update, as the caller
// will use this to record the current stream position
// at the time that the send-to-device message was sent.
func (t *EDUCache) AddSendToDeviceMessage() int64 {
t.Lock()
defer t.Unlock()
latestSyncPosition := t.latestSyncPosition
t.latestSyncPosition++
return latestSyncPosition
}
// addUser with mutex lock & replace the previous timer.
// Returns the latest typing sync position after update.
func (t *EDUCache) addUser(

View file

@ -17,7 +17,7 @@ var build string
const (
VersionMajor = 0
VersionMinor = 3
VersionPatch = 4
VersionPatch = 5
VersionTag = "" // example: "rc1"
)

View file

@ -319,7 +319,7 @@ func (u *DeviceListUpdater) worker(ch chan gomatrixserverlib.ServerName) {
}
func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerName) (time.Duration, bool) {
requestTimeout := time.Minute // max amount of time we want to spend on each request
requestTimeout := time.Second * 30 // max amount of time we want to spend on each request
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
defer cancel()
logger := util.GetLogger(ctx).WithField("server_name", serverName)

View file

@ -259,34 +259,8 @@ func (u *latestEventsUpdater) calculateLatest(
// First of all, get a list of all of the events in our current
// set of forward extremities.
existingRefs := make(map[string]*types.StateAtEventAndReference)
existingNIDs := make([]types.EventNID, len(oldLatest))
for i, old := range oldLatest {
existingRefs[old.EventID] = &oldLatest[i]
existingNIDs[i] = old.EventNID
}
// Look up the old extremity events. This allows us to find their
// prev events.
events, err := u.api.DB.Events(u.ctx, existingNIDs)
if err != nil {
return false, fmt.Errorf("u.api.DB.Events: %w", err)
}
// Make a list of all of the prev events as referenced by all of
// the current forward extremities.
existingPrevs := make(map[string]struct{})
for _, old := range events {
for _, prevEventID := range old.PrevEventIDs() {
existingPrevs[prevEventID] = struct{}{}
}
}
// If the "new" event is already referenced by a forward extremity
// then do nothing - it's not a candidate to be a new extremity if
// it has been referenced.
if _, ok := existingPrevs[newEvent.EventID()]; ok {
u.latest = oldLatest
return false, nil
}
// If the "new" event is already a forward extremity then stop, as
@ -296,6 +270,29 @@ func (u *latestEventsUpdater) calculateLatest(
return false, nil
}
// If the "new" event is already referenced by an existing event
// then do nothing - it's not a candidate to be a new extremity if
// it has been referenced.
if referenced, err := u.updater.IsReferenced(newEvent.EventReference()); err != nil {
return false, fmt.Errorf("u.updater.IsReferenced(new): %w", err)
} else if referenced {
u.latest = oldLatest
return false, nil
}
// Then let's see if any of the existing forward extremities now
// have entries in the previous events table. If they do then we
// will no longer include them as forward extremities.
existingPrevs := make(map[string]struct{})
for _, l := range existingRefs {
referenced, err := u.updater.IsReferenced(l.EventReference)
if err != nil {
return false, fmt.Errorf("u.updater.IsReferenced: %w", err)
} else if referenced {
existingPrevs[l.EventID] = struct{}{}
}
}
// Include our new event in the extremities.
newLatest := []types.StateAtEventAndReference{newStateAndRef}

View file

@ -22,8 +22,8 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/sync"
"github.com/matrix-org/dendrite/syncapi/types"
log "github.com/sirupsen/logrus"
)
@ -32,15 +32,17 @@ import (
type OutputClientDataConsumer struct {
clientAPIConsumer *internal.ContinualConsumer
db storage.Database
notifier *sync.Notifier
stream types.StreamProvider
notifier *notifier.Notifier
}
// NewOutputClientDataConsumer creates a new OutputClientData consumer. Call Start() to begin consuming from room servers.
func NewOutputClientDataConsumer(
cfg *config.SyncAPI,
kafkaConsumer sarama.Consumer,
n *sync.Notifier,
store storage.Database,
notifier *notifier.Notifier,
stream types.StreamProvider,
) *OutputClientDataConsumer {
consumer := internal.ContinualConsumer{
@ -52,7 +54,8 @@ func NewOutputClientDataConsumer(
s := &OutputClientDataConsumer{
clientAPIConsumer: &consumer,
db: store,
notifier: n,
notifier: notifier,
stream: stream,
}
consumer.ProcessMessage = s.onMessage
@ -81,7 +84,7 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error
"room_id": output.RoomID,
}).Info("received data from client API server")
pduPos, err := s.db.UpsertAccountData(
streamPos, err := s.db.UpsertAccountData(
context.TODO(), string(msg.Key), output.RoomID, output.Type,
)
if err != nil {
@ -92,7 +95,8 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error
}).Panicf("could not save account data")
}
s.notifier.OnNewEvent(nil, "", []string{string(msg.Key)}, types.StreamingToken{PDUPosition: pduPos})
s.stream.Advance(streamPos)
s.notifier.OnNewAccountData(string(msg.Key), types.StreamingToken{AccountDataPosition: streamPos})
return nil
}

View file

@ -18,14 +18,13 @@ import (
"context"
"encoding/json"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/sync"
"github.com/matrix-org/dendrite/syncapi/types"
log "github.com/sirupsen/logrus"
)
@ -33,7 +32,8 @@ import (
type OutputReceiptEventConsumer struct {
receiptConsumer *internal.ContinualConsumer
db storage.Database
notifier *sync.Notifier
stream types.StreamProvider
notifier *notifier.Notifier
}
// NewOutputReceiptEventConsumer creates a new OutputReceiptEventConsumer.
@ -41,8 +41,9 @@ type OutputReceiptEventConsumer struct {
func NewOutputReceiptEventConsumer(
cfg *config.SyncAPI,
kafkaConsumer sarama.Consumer,
n *sync.Notifier,
store storage.Database,
notifier *notifier.Notifier,
stream types.StreamProvider,
) *OutputReceiptEventConsumer {
consumer := internal.ContinualConsumer{
@ -55,7 +56,8 @@ func NewOutputReceiptEventConsumer(
s := &OutputReceiptEventConsumer{
receiptConsumer: &consumer,
db: store,
notifier: n,
notifier: notifier,
stream: stream,
}
consumer.ProcessMessage = s.onMessage
@ -87,7 +89,8 @@ func (s *OutputReceiptEventConsumer) onMessage(msg *sarama.ConsumerMessage) erro
if err != nil {
return err
}
// update stream position
s.stream.Advance(streamPos)
s.notifier.OnNewReceipt(output.RoomID, types.StreamingToken{ReceiptPosition: streamPos})
return nil

View file

@ -22,8 +22,8 @@ import (
"github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/sync"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
@ -35,7 +35,8 @@ type OutputSendToDeviceEventConsumer struct {
sendToDeviceConsumer *internal.ContinualConsumer
db storage.Database
serverName gomatrixserverlib.ServerName // our server name
notifier *sync.Notifier
stream types.StreamProvider
notifier *notifier.Notifier
}
// NewOutputSendToDeviceEventConsumer creates a new OutputSendToDeviceEventConsumer.
@ -43,8 +44,9 @@ type OutputSendToDeviceEventConsumer struct {
func NewOutputSendToDeviceEventConsumer(
cfg *config.SyncAPI,
kafkaConsumer sarama.Consumer,
n *sync.Notifier,
store storage.Database,
notifier *notifier.Notifier,
stream types.StreamProvider,
) *OutputSendToDeviceEventConsumer {
consumer := internal.ContinualConsumer{
@ -58,7 +60,8 @@ func NewOutputSendToDeviceEventConsumer(
sendToDeviceConsumer: &consumer,
db: store,
serverName: cfg.Matrix.ServerName,
notifier: n,
notifier: notifier,
stream: stream,
}
consumer.ProcessMessage = s.onMessage
@ -102,6 +105,7 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(msg *sarama.ConsumerMessage)
return err
}
s.stream.Advance(streamPos)
s.notifier.OnNewSendToDevice(
output.UserID,
[]string{output.DeviceID},

View file

@ -19,10 +19,11 @@ import (
"github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/sync"
"github.com/matrix-org/dendrite/syncapi/types"
log "github.com/sirupsen/logrus"
)
@ -30,8 +31,9 @@ import (
// OutputTypingEventConsumer consumes events that originated in the EDU server.
type OutputTypingEventConsumer struct {
typingConsumer *internal.ContinualConsumer
db storage.Database
notifier *sync.Notifier
eduCache *cache.EDUCache
stream types.StreamProvider
notifier *notifier.Notifier
}
// NewOutputTypingEventConsumer creates a new OutputTypingEventConsumer.
@ -39,8 +41,10 @@ type OutputTypingEventConsumer struct {
func NewOutputTypingEventConsumer(
cfg *config.SyncAPI,
kafkaConsumer sarama.Consumer,
n *sync.Notifier,
store storage.Database,
eduCache *cache.EDUCache,
notifier *notifier.Notifier,
stream types.StreamProvider,
) *OutputTypingEventConsumer {
consumer := internal.ContinualConsumer{
@ -52,8 +56,9 @@ func NewOutputTypingEventConsumer(
s := &OutputTypingEventConsumer{
typingConsumer: &consumer,
db: store,
notifier: n,
eduCache: eduCache,
notifier: notifier,
stream: stream,
}
consumer.ProcessMessage = s.onMessage
@ -63,10 +68,10 @@ func NewOutputTypingEventConsumer(
// Start consuming from EDU api
func (s *OutputTypingEventConsumer) Start() error {
s.db.SetTypingTimeoutCallback(func(userID, roomID string, latestSyncPosition int64) {
s.notifier.OnNewTyping(roomID, types.StreamingToken{TypingPosition: types.StreamPosition(latestSyncPosition)})
s.eduCache.SetTimeoutCallback(func(userID, roomID string, latestSyncPosition int64) {
pos := types.StreamPosition(latestSyncPosition)
s.notifier.OnNewTyping(roomID, types.StreamingToken{TypingPosition: pos})
})
return s.typingConsumer.Start()
}
@ -87,11 +92,17 @@ func (s *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error
var typingPos types.StreamPosition
typingEvent := output.Event
if typingEvent.Typing {
typingPos = s.db.AddTypingUser(typingEvent.UserID, typingEvent.RoomID, output.ExpireTime)
typingPos = types.StreamPosition(
s.eduCache.AddTypingUser(typingEvent.UserID, typingEvent.RoomID, output.ExpireTime),
)
} else {
typingPos = s.db.RemoveTypingUser(typingEvent.UserID, typingEvent.RoomID)
typingPos = types.StreamPosition(
s.eduCache.RemoveUser(typingEvent.UserID, typingEvent.RoomID),
)
}
s.stream.Advance(typingPos)
s.notifier.OnNewTyping(output.Event.RoomID, types.StreamingToken{TypingPosition: typingPos})
return nil
}

View file

@ -23,8 +23,8 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/keyserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
syncapi "github.com/matrix-org/dendrite/syncapi/sync"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus"
@ -34,12 +34,13 @@ import (
type OutputKeyChangeEventConsumer struct {
keyChangeConsumer *internal.ContinualConsumer
db storage.Database
notifier *notifier.Notifier
stream types.PartitionedStreamProvider
serverName gomatrixserverlib.ServerName // our server name
rsAPI roomserverAPI.RoomserverInternalAPI
keyAPI api.KeyInternalAPI
partitionToOffset map[int32]int64
partitionToOffsetMu sync.Mutex
notifier *syncapi.Notifier
}
// NewOutputKeyChangeEventConsumer creates a new OutputKeyChangeEventConsumer.
@ -48,10 +49,11 @@ func NewOutputKeyChangeEventConsumer(
serverName gomatrixserverlib.ServerName,
topic string,
kafkaConsumer sarama.Consumer,
n *syncapi.Notifier,
keyAPI api.KeyInternalAPI,
rsAPI roomserverAPI.RoomserverInternalAPI,
store storage.Database,
notifier *notifier.Notifier,
stream types.PartitionedStreamProvider,
) *OutputKeyChangeEventConsumer {
consumer := internal.ContinualConsumer{
@ -69,7 +71,8 @@ func NewOutputKeyChangeEventConsumer(
rsAPI: rsAPI,
partitionToOffset: make(map[int32]int64),
partitionToOffsetMu: sync.Mutex{},
notifier: n,
notifier: notifier,
stream: stream,
}
consumer.ProcessMessage = s.onMessage
@ -114,14 +117,15 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er
}
// make sure we get our own key updates too!
queryRes.UserIDsToCount[output.UserID] = 1
posUpdate := types.StreamingToken{
DeviceListPosition: types.LogPosition{
Offset: msg.Offset,
Partition: msg.Partition,
},
posUpdate := types.LogPosition{
Offset: msg.Offset,
Partition: msg.Partition,
}
s.stream.Advance(posUpdate)
for userID := range queryRes.UserIDsToCount {
s.notifier.OnNewKeyChange(posUpdate, userID, output.UserID)
s.notifier.OnNewKeyChange(types.StreamingToken{DeviceListPosition: posUpdate}, userID, output.UserID)
}
return nil
}

View file

@ -23,8 +23,8 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/sync"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus"
@ -32,19 +32,23 @@ import (
// OutputRoomEventConsumer consumes events that originated in the room server.
type OutputRoomEventConsumer struct {
cfg *config.SyncAPI
rsAPI api.RoomserverInternalAPI
rsConsumer *internal.ContinualConsumer
db storage.Database
notifier *sync.Notifier
cfg *config.SyncAPI
rsAPI api.RoomserverInternalAPI
rsConsumer *internal.ContinualConsumer
db storage.Database
pduStream types.StreamProvider
inviteStream types.StreamProvider
notifier *notifier.Notifier
}
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
func NewOutputRoomEventConsumer(
cfg *config.SyncAPI,
kafkaConsumer sarama.Consumer,
n *sync.Notifier,
store storage.Database,
notifier *notifier.Notifier,
pduStream types.StreamProvider,
inviteStream types.StreamProvider,
rsAPI api.RoomserverInternalAPI,
) *OutputRoomEventConsumer {
@ -55,11 +59,13 @@ func NewOutputRoomEventConsumer(
PartitionStore: store,
}
s := &OutputRoomEventConsumer{
cfg: cfg,
rsConsumer: &consumer,
db: store,
notifier: n,
rsAPI: rsAPI,
cfg: cfg,
rsConsumer: &consumer,
db: store,
notifier: notifier,
pduStream: pduStream,
inviteStream: inviteStream,
rsAPI: rsAPI,
}
consumer.ProcessMessage = s.onMessage
@ -180,7 +186,8 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
return err
}
s.notifier.OnNewEvent(ev, "", nil, types.StreamingToken{PDUPosition: pduPos})
s.pduStream.Advance(pduPos)
s.notifier.OnNewEvent(ev, ev.RoomID(), nil, types.StreamingToken{PDUPosition: pduPos})
return nil
}
@ -219,7 +226,8 @@ func (s *OutputRoomEventConsumer) onOldRoomEvent(
return err
}
s.notifier.OnNewEvent(ev, "", nil, types.StreamingToken{PDUPosition: pduPos})
s.pduStream.Advance(pduPos)
s.notifier.OnNewEvent(ev, ev.RoomID(), nil, types.StreamingToken{PDUPosition: pduPos})
return nil
}
@ -274,7 +282,10 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent(
}).Panicf("roomserver output log: write invite failure")
return nil
}
s.inviteStream.Advance(pduPos)
s.notifier.OnNewInvite(types.StreamingToken{InvitePosition: pduPos}, *msg.Event.StateKey())
return nil
}
@ -290,9 +301,11 @@ func (s *OutputRoomEventConsumer) onRetireInviteEvent(
}).Panicf("roomserver output log: remove invite failure")
return nil
}
// Notify any active sync requests that the invite has been retired.
// Invites share the same stream counter as PDUs
s.inviteStream.Advance(pduPos)
s.notifier.OnNewInvite(types.StreamingToken{InvitePosition: pduPos}, msg.TargetUserID)
return nil
}
@ -307,12 +320,13 @@ func (s *OutputRoomEventConsumer) onNewPeek(
}).Panicf("roomserver output log: write peek failure")
return nil
}
// tell the notifier about the new peek so it knows to wake up new devices
s.notifier.OnNewPeek(msg.RoomID, msg.UserID, msg.DeviceID)
// we need to wake up the users who might need to now be peeking into this room,
// so we send in a dummy event to trigger a wakeup
s.notifier.OnNewEvent(nil, msg.RoomID, nil, types.StreamingToken{PDUPosition: sp})
// tell the notifier about the new peek so it knows to wake up new devices
// TODO: This only works because the peeks table is reusing the same
// index as PDUs, but we should fix this
s.pduStream.Advance(sp)
s.notifier.OnNewPeek(msg.RoomID, msg.UserID, msg.DeviceID, types.StreamingToken{PDUPosition: sp})
return nil
}
@ -327,12 +341,13 @@ func (s *OutputRoomEventConsumer) onRetirePeek(
}).Panicf("roomserver output log: write peek failure")
return nil
}
// tell the notifier about the new peek so it knows to wake up new devices
s.notifier.OnRetirePeek(msg.RoomID, msg.UserID, msg.DeviceID)
// we need to wake up the users who might need to now be peeking into this room,
// so we send in a dummy event to trigger a wakeup
s.notifier.OnNewEvent(nil, msg.RoomID, nil, types.StreamingToken{PDUPosition: sp})
// tell the notifier about the new peek so it knows to wake up new devices
// TODO: This only works because the peeks table is reusing the same
// index as PDUs, but we should fix this
s.pduStream.Advance(sp)
s.notifier.OnRetirePeek(msg.RoomID, msg.UserID, msg.DeviceID, types.StreamingToken{PDUPosition: sp})
return nil
}

View file

@ -49,8 +49,8 @@ func DeviceOTKCounts(ctx context.Context, keyAPI keyapi.KeyInternalAPI, userID,
// nolint:gocyclo
func DeviceListCatchup(
ctx context.Context, keyAPI keyapi.KeyInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI,
userID string, res *types.Response, from, to types.StreamingToken,
) (hasNew bool, err error) {
userID string, res *types.Response, from, to types.LogPosition,
) (newPos types.LogPosition, hasNew bool, err error) {
// Track users who we didn't track before but now do by virtue of sharing a room with them, or not.
newlyJoinedRooms := joinedRooms(res, userID)
@ -58,7 +58,7 @@ func DeviceListCatchup(
if len(newlyJoinedRooms) > 0 || len(newlyLeftRooms) > 0 {
changed, left, err := TrackChangedUsers(ctx, rsAPI, userID, newlyJoinedRooms, newlyLeftRooms)
if err != nil {
return false, err
return to, false, err
}
res.DeviceLists.Changed = changed
res.DeviceLists.Left = left
@ -73,13 +73,13 @@ func DeviceListCatchup(
offset = sarama.OffsetOldest
// Extract partition/offset from sync token
// TODO: In a world where keyserver is sharded there will be multiple partitions and hence multiple QueryKeyChanges to make.
if !from.DeviceListPosition.IsEmpty() {
partition = from.DeviceListPosition.Partition
offset = from.DeviceListPosition.Offset
if !from.IsEmpty() {
partition = from.Partition
offset = from.Offset
}
var toOffset int64
toOffset = sarama.OffsetNewest
if toLog := to.DeviceListPosition; toLog.Partition == partition && toLog.Offset > 0 {
if toLog := to; toLog.Partition == partition && toLog.Offset > 0 {
toOffset = toLog.Offset
}
var queryRes api.QueryKeyChangesResponse
@ -91,7 +91,7 @@ func DeviceListCatchup(
if queryRes.Error != nil {
// don't fail the catchup because we may have got useful information by tracking membership
util.GetLogger(ctx).WithError(queryRes.Error).Error("QueryKeyChanges failed")
return hasNew, nil
return to, hasNew, nil
}
// QueryKeyChanges gets ALL users who have changed keys, we want the ones who share rooms with the user.
var sharedUsersMap map[string]int
@ -128,13 +128,12 @@ func DeviceListCatchup(
}
}
// set the new token
to.DeviceListPosition = types.LogPosition{
to = types.LogPosition{
Partition: queryRes.Partition,
Offset: queryRes.Offset,
}
res.NextBatch.ApplyUpdates(to)
return hasNew, nil
return to, hasNew, nil
}
// TrackChangedUsers calculates the values of device_lists.changed|left in the /sync response.

View file

@ -16,12 +16,10 @@ import (
var (
syncingUser = "@alice:localhost"
emptyToken = types.StreamingToken{}
newestToken = types.StreamingToken{
DeviceListPosition: types.LogPosition{
Offset: sarama.OffsetNewest,
Partition: 0,
},
emptyToken = types.LogPosition{}
newestToken = types.LogPosition{
Offset: sarama.OffsetNewest,
Partition: 0,
}
)
@ -180,7 +178,7 @@ func TestKeyChangeCatchupOnJoinShareNewUser(t *testing.T) {
"!another:room": {syncingUser},
},
}
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
_, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil {
t.Fatalf("DeviceListCatchup returned an error: %s", err)
}
@ -203,7 +201,7 @@ func TestKeyChangeCatchupOnLeaveShareLeftUser(t *testing.T) {
"!another:room": {syncingUser},
},
}
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
_, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil {
t.Fatalf("DeviceListCatchup returned an error: %s", err)
}
@ -226,7 +224,7 @@ func TestKeyChangeCatchupOnJoinShareNoNewUsers(t *testing.T) {
"!another:room": {syncingUser, existingUser},
},
}
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
_, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil {
t.Fatalf("Catchup returned an error: %s", err)
}
@ -248,7 +246,7 @@ func TestKeyChangeCatchupOnLeaveShareNoUsers(t *testing.T) {
"!another:room": {syncingUser, existingUser},
},
}
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
_, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil {
t.Fatalf("DeviceListCatchup returned an error: %s", err)
}
@ -307,7 +305,7 @@ func TestKeyChangeCatchupNoNewJoinsButMessages(t *testing.T) {
roomID: {syncingUser, existingUser},
},
}
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
_, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil {
t.Fatalf("DeviceListCatchup returned an error: %s", err)
}
@ -335,7 +333,7 @@ func TestKeyChangeCatchupChangeAndLeft(t *testing.T) {
"!another:room": {syncingUser},
},
}
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
_, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil {
t.Fatalf("Catchup returned an error: %s", err)
}
@ -420,7 +418,7 @@ func TestKeyChangeCatchupChangeAndLeftSameRoom(t *testing.T) {
"!another:room": {syncingUser},
},
}
hasNew, err := DeviceListCatchup(
_, hasNew, err := DeviceListCatchup(
context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken,
)
if err != nil {

View file

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package sync
package notifier
import (
"context"
@ -48,9 +48,9 @@ type Notifier struct {
// NewNotifier creates a new notifier set to the given sync position.
// In order for this to be of any use, the Notifier needs to be told all rooms and
// the joined users within each of them by calling Notifier.Load(*storage.SyncServerDatabase).
func NewNotifier(pos types.StreamingToken) *Notifier {
func NewNotifier(currPos types.StreamingToken) *Notifier {
return &Notifier{
currPos: pos,
currPos: currPos,
roomIDToJoinedUsers: make(map[string]userIDSet),
roomIDToPeekingDevices: make(map[string]peekingDeviceSet),
userDeviceStreams: make(map[string]map[string]*UserDeviceStream),
@ -124,12 +124,24 @@ func (n *Notifier) OnNewEvent(
}
}
func (n *Notifier) OnNewPeek(
roomID, userID, deviceID string,
func (n *Notifier) OnNewAccountData(
userID string, posUpdate types.StreamingToken,
) {
n.streamLock.Lock()
defer n.streamLock.Unlock()
n.currPos.ApplyUpdates(posUpdate)
n.wakeupUsers([]string{userID}, nil, posUpdate)
}
func (n *Notifier) OnNewPeek(
roomID, userID, deviceID string,
posUpdate types.StreamingToken,
) {
n.streamLock.Lock()
defer n.streamLock.Unlock()
n.currPos.ApplyUpdates(posUpdate)
n.addPeekingDevice(roomID, userID, deviceID)
// we don't wake up devices here given the roomserver consumer will do this shortly afterwards
@ -138,10 +150,12 @@ func (n *Notifier) OnNewPeek(
func (n *Notifier) OnRetirePeek(
roomID, userID, deviceID string,
posUpdate types.StreamingToken,
) {
n.streamLock.Lock()
defer n.streamLock.Unlock()
n.currPos.ApplyUpdates(posUpdate)
n.removePeekingDevice(roomID, userID, deviceID)
// we don't wake up devices here given the roomserver consumer will do this shortly afterwards
@ -206,7 +220,7 @@ func (n *Notifier) OnNewInvite(
// GetListener returns a UserStreamListener that can be used to wait for
// updates for a user. Must be closed.
// notify for anything before sincePos
func (n *Notifier) GetListener(req syncRequest) UserDeviceStreamListener {
func (n *Notifier) GetListener(req types.SyncRequest) UserDeviceStreamListener {
// Do what synapse does: https://github.com/matrix-org/synapse/blob/v0.20.0/synapse/notifier.py#L298
// - Bucket request into a lookup map keyed off a list of joined room IDs and separately a user ID
// - Incoming events wake requests for a matching room ID
@ -220,7 +234,7 @@ func (n *Notifier) GetListener(req syncRequest) UserDeviceStreamListener {
n.removeEmptyUserStreams()
return n.fetchUserDeviceStream(req.device.UserID, req.device.ID, true).GetListener(req.ctx)
return n.fetchUserDeviceStream(req.Device.UserID, req.Device.ID, true).GetListener(req.Context)
}
// Load the membership states required to notify users correctly.

View file

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package sync
package notifier
import (
"context"
@ -326,16 +326,16 @@ func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
time.Sleep(1 * time.Millisecond)
}
func waitForEvents(n *Notifier, req syncRequest) (types.StreamingToken, error) {
func waitForEvents(n *Notifier, req types.SyncRequest) (types.StreamingToken, error) {
listener := n.GetListener(req)
defer listener.Close()
select {
case <-time.After(5 * time.Second):
return types.StreamingToken{}, fmt.Errorf(
"waitForEvents timed out waiting for %s (pos=%v)", req.device.UserID, req.since,
"waitForEvents timed out waiting for %s (pos=%v)", req.Device.UserID, req.Since,
)
case <-listener.GetNotifyChannel(req.since):
case <-listener.GetNotifyChannel(req.Since):
p := listener.GetSyncPosition()
return p, nil
}
@ -358,17 +358,17 @@ func lockedFetchUserStream(n *Notifier, userID, deviceID string) *UserDeviceStre
return n.fetchUserDeviceStream(userID, deviceID, true)
}
func newTestSyncRequest(userID, deviceID string, since types.StreamingToken) syncRequest {
return syncRequest{
device: userapi.Device{
func newTestSyncRequest(userID, deviceID string, since types.StreamingToken) types.SyncRequest {
return types.SyncRequest{
Device: &userapi.Device{
UserID: userID,
ID: deviceID,
},
timeout: 1 * time.Minute,
since: since,
wantFullState: false,
limit: DefaultTimelineLimit,
log: util.GetLogger(context.TODO()),
ctx: context.TODO(),
Timeout: 1 * time.Minute,
Since: since,
WantFullState: false,
Limit: 20,
Log: util.GetLogger(context.TODO()),
Context: context.TODO(),
}
}

View file

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package sync
package notifier
import (
"context"

View file

@ -16,11 +16,9 @@ package storage
import (
"context"
"time"
eduAPI "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/types"
@ -30,6 +28,26 @@ import (
type Database interface {
internal.PartitionStorer
MaxStreamPositionForPDUs(ctx context.Context) (types.StreamPosition, error)
MaxStreamPositionForReceipts(ctx context.Context) (types.StreamPosition, error)
MaxStreamPositionForInvites(ctx context.Context) (types.StreamPosition, error)
MaxStreamPositionForAccountData(ctx context.Context) (types.StreamPosition, error)
CurrentState(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter) ([]*gomatrixserverlib.HeaderedEvent, error)
GetStateDeltasForFullStateSync(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter) ([]types.StateDelta, []string, error)
GetStateDeltas(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter) ([]types.StateDelta, []string, error)
RoomIDsWithMembership(ctx context.Context, userID string, membership string) ([]string, error)
RecentEvents(ctx context.Context, roomID string, r types.Range, limit int, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error)
GetBackwardTopologyPos(ctx context.Context, events []types.StreamEvent) (types.TopologyToken, error)
PositionInTopology(ctx context.Context, eventID string) (pos types.StreamPosition, spos types.StreamPosition, err error)
InviteEventsInRange(ctx context.Context, targetUserID string, r types.Range) (map[string]*gomatrixserverlib.HeaderedEvent, map[string]*gomatrixserverlib.HeaderedEvent, error)
PeeksInRange(ctx context.Context, userID, deviceID string, r types.Range) (peeks []types.Peek, err error)
RoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []eduAPI.OutputReceiptEvent, error)
// AllJoinedUsersInRooms returns a map of room ID to a list of all joined user IDs.
AllJoinedUsersInRooms(ctx context.Context) (map[string][]string, error)
// AllPeekingDevicesInRooms returns a map of room ID to a list of all peeking devices.
@ -56,18 +74,6 @@ type Database interface {
// Returns an empty slice if no state events could be found for this room.
// Returns an error if there was an issue with the retrieval.
GetStateEventsForRoom(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter) (stateEvents []*gomatrixserverlib.HeaderedEvent, err error)
// SyncPosition returns the latest positions for syncing.
SyncPosition(ctx context.Context) (types.StreamingToken, error)
// IncrementalSync returns all the data needed in order to create an incremental
// sync response for the given user. Events returned will include any client
// transaction IDs associated with the given device. These transaction IDs come
// from when the device sent the event via an API that included a transaction
// ID. A response object must be provided for IncrementaSync to populate - it
// will not create one.
IncrementalSync(ctx context.Context, res *types.Response, device userapi.Device, fromPos, toPos types.StreamingToken, numRecentEventsPerRoom int, wantFullState bool) (*types.Response, error)
// CompleteSync returns a complete /sync API response for the given user. A response object
// must be provided for CompleteSync to populate - it will not create one.
CompleteSync(ctx context.Context, res *types.Response, device userapi.Device, numRecentEventsPerRoom int) (*types.Response, error)
// GetAccountDataInRange returns all account data for a given user inserted or
// updated between two given positions
// Returns a map following the format data[roomID] = []dataTypes
@ -97,15 +103,6 @@ type Database interface {
// DeletePeek deletes all peeks for a given room by a given user
// Returns an error if there was a problem communicating with the database.
DeletePeeks(ctx context.Context, RoomID, UserID string) (types.StreamPosition, error)
// SetTypingTimeoutCallback sets a callback function that is called right after
// a user is removed from the typing user list due to timeout.
SetTypingTimeoutCallback(fn cache.TimeoutCallbackFn)
// AddTypingUser adds a typing user to the typing cache.
// Returns the newly calculated sync position for typing notifications.
AddTypingUser(userID, roomID string, expireTime *time.Time) types.StreamPosition
// RemoveTypingUser removes a typing user from the typing cache.
// Returns the newly calculated sync position for typing notifications.
RemoveTypingUser(userID, roomID string) types.StreamPosition
// GetEventsInStreamingRange retrieves all of the events on a given ordering using the given extremities and limit.
GetEventsInStreamingRange(ctx context.Context, from, to *types.StreamingToken, roomID string, limit int, backwardOrdering bool) (events []types.StreamEvent, err error)
// GetEventsInTopologicalRange retrieves all of the events on a given ordering using the given extremities and limit.
@ -120,8 +117,6 @@ type Database interface {
// matches the streamevent.transactionID device then the transaction ID gets
// added to the unsigned section of the output event.
StreamEventsToEvents(device *userapi.Device, in []types.StreamEvent) []*gomatrixserverlib.HeaderedEvent
// AddSendToDevice increases the EDU position in the cache and returns the stream position.
AddSendToDevice() types.StreamPosition
// SendToDeviceUpdatesForSync returns a list of send-to-device updates. It returns three lists:
// - "events": a list of send-to-device events that should be included in the sync
// - "changes": a list of send-to-device events that should be updated in the database by

View file

@ -96,7 +96,7 @@ func (r *receiptStatements) UpsertReceipt(ctx context.Context, txn *sql.Tx, room
}
func (r *receiptStatements) SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []api.OutputReceiptEvent, error) {
lastPos := types.StreamPosition(0)
lastPos := streamPos
rows, err := r.selectRoomReceipts.QueryContext(ctx, pq.Array(roomIDs), streamPos)
if err != nil {
return 0, nil, fmt.Errorf("unable to query room receipts: %w", err)

View file

@ -20,7 +20,6 @@ import (
// Import the postgres database driver.
_ "github.com/lib/pq"
"github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/storage/postgres/deltas"
@ -106,7 +105,6 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e
Filter: filter,
SendToDevice: sendToDevice,
Receipts: receipts,
EDUCache: cache.New(),
}
return &d, nil
}

View file

@ -19,12 +19,10 @@ import (
"database/sql"
"encoding/json"
"fmt"
"time"
eduAPI "github.com/matrix-org/dendrite/eduserver/api"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/api"
@ -49,7 +47,78 @@ type Database struct {
SendToDevice tables.SendToDevice
Filter tables.Filter
Receipts tables.Receipts
EDUCache *cache.EDUCache
}
func (d *Database) readOnlySnapshot(ctx context.Context) (*sql.Tx, error) {
return d.DB.BeginTx(ctx, &sql.TxOptions{
// Set the isolation level so that we see a snapshot of the database.
// In PostgreSQL repeatable read transactions will see a snapshot taken
// at the first query, and since the transaction is read-only it can't
// run into any serialisation errors.
// https://www.postgresql.org/docs/9.5/static/transaction-iso.html#XACT-REPEATABLE-READ
Isolation: sql.LevelRepeatableRead,
ReadOnly: true,
})
}
func (d *Database) MaxStreamPositionForPDUs(ctx context.Context) (types.StreamPosition, error) {
id, err := d.OutputEvents.SelectMaxEventID(ctx, nil)
if err != nil {
return 0, fmt.Errorf("d.OutputEvents.SelectMaxEventID: %w", err)
}
return types.StreamPosition(id), nil
}
func (d *Database) MaxStreamPositionForReceipts(ctx context.Context) (types.StreamPosition, error) {
id, err := d.Receipts.SelectMaxReceiptID(ctx, nil)
if err != nil {
return 0, fmt.Errorf("d.Receipts.SelectMaxReceiptID: %w", err)
}
return types.StreamPosition(id), nil
}
func (d *Database) MaxStreamPositionForInvites(ctx context.Context) (types.StreamPosition, error) {
id, err := d.Invites.SelectMaxInviteID(ctx, nil)
if err != nil {
return 0, fmt.Errorf("d.Invites.SelectMaxInviteID: %w", err)
}
return types.StreamPosition(id), nil
}
func (d *Database) MaxStreamPositionForAccountData(ctx context.Context) (types.StreamPosition, error) {
id, err := d.AccountData.SelectMaxAccountDataID(ctx, nil)
if err != nil {
return 0, fmt.Errorf("d.Invites.SelectMaxAccountDataID: %w", err)
}
return types.StreamPosition(id), nil
}
func (d *Database) CurrentState(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter) ([]*gomatrixserverlib.HeaderedEvent, error) {
return d.CurrentRoomState.SelectCurrentState(ctx, nil, roomID, stateFilterPart)
}
func (d *Database) RoomIDsWithMembership(ctx context.Context, userID string, membership string) ([]string, error) {
return d.CurrentRoomState.SelectRoomIDsWithMembership(ctx, nil, userID, membership)
}
func (d *Database) RecentEvents(ctx context.Context, roomID string, r types.Range, limit int, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error) {
return d.OutputEvents.SelectRecentEvents(ctx, nil, roomID, r, limit, chronologicalOrder, onlySyncEvents)
}
func (d *Database) PositionInTopology(ctx context.Context, eventID string) (pos types.StreamPosition, spos types.StreamPosition, err error) {
return d.Topology.SelectPositionInTopology(ctx, nil, eventID)
}
func (d *Database) InviteEventsInRange(ctx context.Context, targetUserID string, r types.Range) (map[string]*gomatrixserverlib.HeaderedEvent, map[string]*gomatrixserverlib.HeaderedEvent, error) {
return d.Invites.SelectInviteEventsInRange(ctx, nil, targetUserID, r)
}
func (d *Database) PeeksInRange(ctx context.Context, userID, deviceID string, r types.Range) (peeks []types.Peek, err error) {
return d.Peeks.SelectPeeksInRange(ctx, nil, userID, deviceID, r)
}
func (d *Database) RoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []eduAPI.OutputReceiptEvent, error) {
return d.Receipts.SelectRoomReceiptsAfter(ctx, roomIDs, streamPos)
}
// Events lookups a list of event by their event ID.
@ -99,6 +168,7 @@ func (d *Database) GetEventsInStreamingRange(
return events, err
}
/*
func (d *Database) AddTypingUser(
userID, roomID string, expireTime *time.Time,
) types.StreamPosition {
@ -111,13 +181,16 @@ func (d *Database) RemoveTypingUser(
return types.StreamPosition(d.EDUCache.RemoveUser(userID, roomID))
}
func (d *Database) AddSendToDevice() types.StreamPosition {
return types.StreamPosition(d.EDUCache.AddSendToDeviceMessage())
}
func (d *Database) SetTypingTimeoutCallback(fn cache.TimeoutCallbackFn) {
d.EDUCache.SetTimeoutCallback(fn)
}
*/
/*
func (d *Database) AddSendToDevice() types.StreamPosition {
return types.StreamPosition(d.EDUCache.AddSendToDeviceMessage())
}
*/
func (d *Database) AllJoinedUsersInRooms(ctx context.Context) (map[string][]string, error) {
return d.CurrentRoomState.SelectJoinedUsers(ctx)
@ -416,18 +489,6 @@ func (d *Database) GetEventsInTopologicalRange(
return
}
func (d *Database) SyncPosition(ctx context.Context) (tok types.StreamingToken, err error) {
err = sqlutil.WithTransaction(d.DB, func(txn *sql.Tx) error {
pos, err := d.syncPositionTx(ctx, txn)
if err != nil {
return err
}
tok = pos
return nil
})
return
}
func (d *Database) BackwardExtremitiesForRoom(
ctx context.Context, roomID string,
) (backwardExtremities map[string][]string, err error) {
@ -454,215 +515,6 @@ func (d *Database) EventPositionInTopology(
return types.TopologyToken{Depth: depth, PDUPosition: stream}, nil
}
func (d *Database) syncPositionTx(
ctx context.Context, txn *sql.Tx,
) (sp types.StreamingToken, err error) {
maxEventID, err := d.OutputEvents.SelectMaxEventID(ctx, txn)
if err != nil {
return sp, err
}
maxAccountDataID, err := d.AccountData.SelectMaxAccountDataID(ctx, txn)
if err != nil {
return sp, err
}
if maxAccountDataID > maxEventID {
maxEventID = maxAccountDataID
}
maxInviteID, err := d.Invites.SelectMaxInviteID(ctx, txn)
if err != nil {
return sp, err
}
if maxInviteID > maxEventID {
maxEventID = maxInviteID
}
maxPeekID, err := d.Peeks.SelectMaxPeekID(ctx, txn)
if err != nil {
return sp, err
}
if maxPeekID > maxEventID {
maxEventID = maxPeekID
}
maxReceiptID, err := d.Receipts.SelectMaxReceiptID(ctx, txn)
if err != nil {
return sp, err
}
// TODO: complete these positions
sp = types.StreamingToken{
PDUPosition: types.StreamPosition(maxEventID),
TypingPosition: types.StreamPosition(d.EDUCache.GetLatestSyncPosition()),
ReceiptPosition: types.StreamPosition(maxReceiptID),
InvitePosition: types.StreamPosition(maxInviteID),
}
return
}
// addPDUDeltaToResponse adds all PDU deltas to a sync response.
// IDs of all rooms the user joined are returned so EDU deltas can be added for them.
func (d *Database) addPDUDeltaToResponse(
ctx context.Context,
device userapi.Device,
r types.Range,
numRecentEventsPerRoom int,
wantFullState bool,
res *types.Response,
) (joinedRoomIDs []string, err error) {
txn, err := d.DB.BeginTx(ctx, &txReadOnlySnapshot)
if err != nil {
return nil, err
}
succeeded := false
defer sqlutil.EndTransactionWithCheck(txn, &succeeded, &err)
stateFilter := gomatrixserverlib.DefaultStateFilter() // TODO: use filter provided in request
// Work out which rooms to return in the response. This is done by getting not only the currently
// joined rooms, but also which rooms have membership transitions for this user between the 2 PDU stream positions.
// This works out what the 'state' key should be for each room as well as which membership block
// to put the room into.
var deltas []stateDelta
if !wantFullState {
deltas, joinedRoomIDs, err = d.getStateDeltas(
ctx, &device, txn, r, device.UserID, &stateFilter,
)
if err != nil {
return nil, fmt.Errorf("d.getStateDeltas: %w", err)
}
} else {
deltas, joinedRoomIDs, err = d.getStateDeltasForFullStateSync(
ctx, &device, txn, r, device.UserID, &stateFilter,
)
if err != nil {
return nil, fmt.Errorf("d.getStateDeltasForFullStateSync: %w", err)
}
}
for _, delta := range deltas {
err = d.addRoomDeltaToResponse(ctx, &device, txn, r, delta, numRecentEventsPerRoom, res)
if err != nil {
return nil, fmt.Errorf("d.addRoomDeltaToResponse: %w", err)
}
}
succeeded = true
return joinedRoomIDs, nil
}
// addTypingDeltaToResponse adds all typing notifications to a sync response
// since the specified position.
func (d *Database) addTypingDeltaToResponse(
since types.StreamingToken,
joinedRoomIDs []string,
res *types.Response,
) error {
var ok bool
var err error
for _, roomID := range joinedRoomIDs {
var jr types.JoinResponse
if typingUsers, updated := d.EDUCache.GetTypingUsersIfUpdatedAfter(
roomID, int64(since.TypingPosition),
); updated {
ev := gomatrixserverlib.ClientEvent{
Type: gomatrixserverlib.MTyping,
}
ev.Content, err = json.Marshal(map[string]interface{}{
"user_ids": typingUsers,
})
if err != nil {
return err
}
if jr, ok = res.Rooms.Join[roomID]; !ok {
jr = *types.NewJoinResponse()
}
jr.Ephemeral.Events = append(jr.Ephemeral.Events, ev)
res.Rooms.Join[roomID] = jr
}
}
res.NextBatch.TypingPosition = types.StreamPosition(d.EDUCache.GetLatestSyncPosition())
return nil
}
// addReceiptDeltaToResponse adds all receipt information to a sync response
// since the specified position
func (d *Database) addReceiptDeltaToResponse(
since types.StreamingToken,
joinedRoomIDs []string,
res *types.Response,
) error {
lastPos, receipts, err := d.Receipts.SelectRoomReceiptsAfter(context.TODO(), joinedRoomIDs, since.ReceiptPosition)
if err != nil {
return fmt.Errorf("unable to select receipts for rooms: %w", err)
}
// Group receipts by room, so we can create one ClientEvent for every room
receiptsByRoom := make(map[string][]eduAPI.OutputReceiptEvent)
for _, receipt := range receipts {
receiptsByRoom[receipt.RoomID] = append(receiptsByRoom[receipt.RoomID], receipt)
}
for roomID, receipts := range receiptsByRoom {
var jr types.JoinResponse
var ok bool
// Make sure we use an existing JoinResponse if there is one.
// If not, we'll create a new one
if jr, ok = res.Rooms.Join[roomID]; !ok {
jr = types.JoinResponse{}
}
ev := gomatrixserverlib.ClientEvent{
Type: gomatrixserverlib.MReceipt,
RoomID: roomID,
}
content := make(map[string]eduAPI.ReceiptMRead)
for _, receipt := range receipts {
var read eduAPI.ReceiptMRead
if read, ok = content[receipt.EventID]; !ok {
read = eduAPI.ReceiptMRead{
User: make(map[string]eduAPI.ReceiptTS),
}
}
read.User[receipt.UserID] = eduAPI.ReceiptTS{TS: receipt.Timestamp}
content[receipt.EventID] = read
}
ev.Content, err = json.Marshal(content)
if err != nil {
return err
}
jr.Ephemeral.Events = append(jr.Ephemeral.Events, ev)
res.Rooms.Join[roomID] = jr
}
res.NextBatch.ReceiptPosition = lastPos
return nil
}
// addEDUDeltaToResponse adds updates for EDUs of each type since fromPos if
// the positions of that type are not equal in fromPos and toPos.
func (d *Database) addEDUDeltaToResponse(
fromPos, toPos types.StreamingToken,
joinedRoomIDs []string,
res *types.Response,
) error {
if fromPos.TypingPosition != toPos.TypingPosition {
// add typing deltas
if err := d.addTypingDeltaToResponse(fromPos, joinedRoomIDs, res); err != nil {
return fmt.Errorf("unable to apply typing delta to response: %w", err)
}
}
// Check on initial sync and if EDUPositions differ
if (fromPos.ReceiptPosition == 0 && toPos.ReceiptPosition == 0) ||
fromPos.ReceiptPosition != toPos.ReceiptPosition {
if err := d.addReceiptDeltaToResponse(fromPos, joinedRoomIDs, res); err != nil {
return fmt.Errorf("unable to apply receipts to response: %w", err)
}
}
return nil
}
func (d *Database) GetFilter(
ctx context.Context, localpart string, filterID string,
) (*gomatrixserverlib.Filter, error) {
@ -681,57 +533,6 @@ func (d *Database) PutFilter(
return filterID, err
}
func (d *Database) IncrementalSync(
ctx context.Context, res *types.Response,
device userapi.Device,
fromPos, toPos types.StreamingToken,
numRecentEventsPerRoom int,
wantFullState bool,
) (*types.Response, error) {
res.NextBatch = fromPos.WithUpdates(toPos)
var joinedRoomIDs []string
var err error
if fromPos.PDUPosition != toPos.PDUPosition || wantFullState {
r := types.Range{
From: fromPos.PDUPosition,
To: toPos.PDUPosition,
}
joinedRoomIDs, err = d.addPDUDeltaToResponse(
ctx, device, r, numRecentEventsPerRoom, wantFullState, res,
)
if err != nil {
return nil, fmt.Errorf("d.addPDUDeltaToResponse: %w", err)
}
} else {
joinedRoomIDs, err = d.CurrentRoomState.SelectRoomIDsWithMembership(
ctx, nil, device.UserID, gomatrixserverlib.Join,
)
if err != nil {
return nil, fmt.Errorf("d.CurrentRoomState.SelectRoomIDsWithMembership: %w", err)
}
}
// TODO: handle EDUs in peeked rooms
err = d.addEDUDeltaToResponse(
fromPos, toPos, joinedRoomIDs, res,
)
if err != nil {
return nil, fmt.Errorf("d.addEDUDeltaToResponse: %w", err)
}
ir := types.Range{
From: fromPos.InvitePosition,
To: toPos.InvitePosition,
}
if err = d.addInvitesToResponse(ctx, nil, device.UserID, ir, res); err != nil {
return nil, fmt.Errorf("d.addInvitesToResponse: %w", err)
}
return res, nil
}
func (d *Database) RedactEvent(ctx context.Context, redactedEventID string, redactedBecause *gomatrixserverlib.HeaderedEvent) error {
redactedEvents, err := d.Events(ctx, []string{redactedEventID})
if err != nil {
@ -755,240 +556,17 @@ func (d *Database) RedactEvent(ctx context.Context, redactedEventID string, reda
return err
}
// getResponseWithPDUsForCompleteSync creates a response and adds all PDUs needed
// to it. It returns toPos and joinedRoomIDs for use of adding EDUs.
// nolint:nakedret
func (d *Database) getResponseWithPDUsForCompleteSync(
ctx context.Context, res *types.Response,
userID string, device userapi.Device,
numRecentEventsPerRoom int,
) (
toPos types.StreamingToken,
joinedRoomIDs []string,
err error,
) {
// This needs to be all done in a transaction as we need to do multiple SELECTs, and we need to have
// a consistent view of the database throughout. This includes extracting the sync position.
// This does have the unfortunate side-effect that all the matrixy logic resides in this function,
// but it's better to not hide the fact that this is being done in a transaction.
txn, err := d.DB.BeginTx(ctx, &txReadOnlySnapshot)
if err != nil {
return
}
succeeded := false
defer sqlutil.EndTransactionWithCheck(txn, &succeeded, &err)
// Get the current sync position which we will base the sync response on.
toPos, err = d.syncPositionTx(ctx, txn)
if err != nil {
return
}
r := types.Range{
From: 0,
To: toPos.PDUPosition,
}
ir := types.Range{
From: 0,
To: toPos.InvitePosition,
}
res.NextBatch.ApplyUpdates(toPos)
// Extract room state and recent events for all rooms the user is joined to.
joinedRoomIDs, err = d.CurrentRoomState.SelectRoomIDsWithMembership(ctx, txn, userID, gomatrixserverlib.Join)
if err != nil {
return
}
stateFilter := gomatrixserverlib.DefaultStateFilter() // TODO: use filter provided in request
// Build up a /sync response. Add joined rooms.
for _, roomID := range joinedRoomIDs {
var jr *types.JoinResponse
jr, err = d.getJoinResponseForCompleteSync(
ctx, txn, roomID, r, &stateFilter, numRecentEventsPerRoom, device,
)
if err != nil {
return
}
res.Rooms.Join[roomID] = *jr
}
// Add peeked rooms.
peeks, err := d.Peeks.SelectPeeksInRange(ctx, txn, userID, device.ID, r)
if err != nil {
return
}
for _, peek := range peeks {
if !peek.Deleted {
var jr *types.JoinResponse
jr, err = d.getJoinResponseForCompleteSync(
ctx, txn, peek.RoomID, r, &stateFilter, numRecentEventsPerRoom, device,
)
if err != nil {
return
}
res.Rooms.Peek[peek.RoomID] = *jr
}
}
if err = d.addInvitesToResponse(ctx, txn, userID, ir, res); err != nil {
return
}
succeeded = true
return //res, toPos, joinedRoomIDs, err
}
func (d *Database) getJoinResponseForCompleteSync(
ctx context.Context, txn *sql.Tx,
roomID string,
r types.Range,
stateFilter *gomatrixserverlib.StateFilter,
numRecentEventsPerRoom int, device userapi.Device,
) (jr *types.JoinResponse, err error) {
var stateEvents []*gomatrixserverlib.HeaderedEvent
stateEvents, err = d.CurrentRoomState.SelectCurrentState(ctx, txn, roomID, stateFilter)
if err != nil {
return
}
// TODO: When filters are added, we may need to call this multiple times to get enough events.
// See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316
var recentStreamEvents []types.StreamEvent
var limited bool
recentStreamEvents, limited, err = d.OutputEvents.SelectRecentEvents(
ctx, txn, roomID, r, numRecentEventsPerRoom, true, true,
)
if err != nil {
return
}
// TODO FIXME: We don't fully implement history visibility yet. To avoid leaking events which the
// user shouldn't see, we check the recent events and remove any prior to the join event of the user
// which is equiv to history_visibility: joined
joinEventIndex := -1
for i := len(recentStreamEvents) - 1; i >= 0; i-- {
ev := recentStreamEvents[i]
if ev.Type() == gomatrixserverlib.MRoomMember && ev.StateKeyEquals(device.UserID) {
membership, _ := ev.Membership()
if membership == "join" {
joinEventIndex = i
if i > 0 {
// the create event happens before the first join, so we should cut it at that point instead
if recentStreamEvents[i-1].Type() == gomatrixserverlib.MRoomCreate && recentStreamEvents[i-1].StateKeyEquals("") {
joinEventIndex = i - 1
break
}
}
break
}
}
}
if joinEventIndex != -1 {
// cut all events earlier than the join (but not the join itself)
recentStreamEvents = recentStreamEvents[joinEventIndex:]
limited = false // so clients know not to try to backpaginate
}
// Retrieve the backward topology position, i.e. the position of the
// oldest event in the room's topology.
var prevBatch *types.TopologyToken
if len(recentStreamEvents) > 0 {
var backwardTopologyPos, backwardStreamPos types.StreamPosition
backwardTopologyPos, backwardStreamPos, err = d.Topology.SelectPositionInTopology(ctx, txn, recentStreamEvents[0].EventID())
if err != nil {
return
}
prevBatch = &types.TopologyToken{
Depth: backwardTopologyPos,
PDUPosition: backwardStreamPos,
}
prevBatch.Decrement()
}
// We don't include a device here as we don't need to send down
// transaction IDs for complete syncs, but we do it anyway because Sytest demands it for:
// "Can sync a room with a message with a transaction id" - which does a complete sync to check.
recentEvents := d.StreamEventsToEvents(&device, recentStreamEvents)
stateEvents = removeDuplicates(stateEvents, recentEvents)
jr = types.NewJoinResponse()
jr.Timeline.PrevBatch = prevBatch
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = limited
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(stateEvents, gomatrixserverlib.FormatSync)
return jr, nil
}
func (d *Database) CompleteSync(
ctx context.Context, res *types.Response,
device userapi.Device, numRecentEventsPerRoom int,
) (*types.Response, error) {
toPos, joinedRoomIDs, err := d.getResponseWithPDUsForCompleteSync(
ctx, res, device.UserID, device, numRecentEventsPerRoom,
)
if err != nil {
return nil, fmt.Errorf("d.getResponseWithPDUsForCompleteSync: %w", err)
}
// TODO: handle EDUs in peeked rooms
// Use a zero value SyncPosition for fromPos so all EDU states are added.
err = d.addEDUDeltaToResponse(
types.StreamingToken{}, toPos, joinedRoomIDs, res,
)
if err != nil {
return nil, fmt.Errorf("d.addEDUDeltaToResponse: %w", err)
}
return res, nil
}
var txReadOnlySnapshot = sql.TxOptions{
// Set the isolation level so that we see a snapshot of the database.
// In PostgreSQL repeatable read transactions will see a snapshot taken
// at the first query, and since the transaction is read-only it can't
// run into any serialisation errors.
// https://www.postgresql.org/docs/9.5/static/transaction-iso.html#XACT-REPEATABLE-READ
Isolation: sql.LevelRepeatableRead,
ReadOnly: true,
}
func (d *Database) addInvitesToResponse(
ctx context.Context, txn *sql.Tx,
userID string,
r types.Range,
res *types.Response,
) error {
invites, retiredInvites, err := d.Invites.SelectInviteEventsInRange(
ctx, txn, userID, r,
)
if err != nil {
return fmt.Errorf("d.Invites.SelectInviteEventsInRange: %w", err)
}
for roomID, inviteEvent := range invites {
ir := types.NewInviteResponse(inviteEvent)
res.Rooms.Invite[roomID] = *ir
}
for roomID := range retiredInvites {
if _, ok := res.Rooms.Join[roomID]; !ok {
lr := types.NewLeaveResponse()
res.Rooms.Leave[roomID] = *lr
}
}
return nil
}
// Retrieve the backward topology position, i.e. the position of the
// oldest event in the room's topology.
func (d *Database) getBackwardTopologyPos(
ctx context.Context, txn *sql.Tx,
func (d *Database) GetBackwardTopologyPos(
ctx context.Context,
events []types.StreamEvent,
) (types.TopologyToken, error) {
zeroToken := types.TopologyToken{}
if len(events) == 0 {
return zeroToken, nil
}
pos, spos, err := d.Topology.SelectPositionInTopology(ctx, txn, events[0].EventID())
pos, spos, err := d.Topology.SelectPositionInTopology(ctx, nil, events[0].EventID())
if err != nil {
return zeroToken, err
}
@ -997,78 +575,6 @@ func (d *Database) getBackwardTopologyPos(
return tok, nil
}
// addRoomDeltaToResponse adds a room state delta to a sync response
func (d *Database) addRoomDeltaToResponse(
ctx context.Context,
device *userapi.Device,
txn *sql.Tx,
r types.Range,
delta stateDelta,
numRecentEventsPerRoom int,
res *types.Response,
) error {
if delta.membershipPos > 0 && delta.membership == gomatrixserverlib.Leave {
// make sure we don't leak recent events after the leave event.
// TODO: History visibility makes this somewhat complex to handle correctly. For example:
// TODO: This doesn't work for join -> leave in a single /sync request (see events prior to join).
// TODO: This will fail on join -> leave -> sensitive msg -> join -> leave
// in a single /sync request
// This is all "okay" assuming history_visibility == "shared" which it is by default.
r.To = delta.membershipPos
}
recentStreamEvents, limited, err := d.OutputEvents.SelectRecentEvents(
ctx, txn, delta.roomID, r,
numRecentEventsPerRoom, true, true,
)
if err != nil {
return err
}
recentEvents := d.StreamEventsToEvents(device, recentStreamEvents)
delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back
prevBatch, err := d.getBackwardTopologyPos(ctx, txn, recentStreamEvents)
if err != nil {
return err
}
// XXX: should we ever get this far if we have no recent events or state in this room?
// in practice we do for peeks, but possibly not joins?
if len(recentEvents) == 0 && len(delta.stateEvents) == 0 {
return nil
}
switch delta.membership {
case gomatrixserverlib.Join:
jr := types.NewJoinResponse()
jr.Timeline.PrevBatch = &prevBatch
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = limited
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Join[delta.roomID] = *jr
case gomatrixserverlib.Peek:
jr := types.NewJoinResponse()
jr.Timeline.PrevBatch = &prevBatch
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = limited
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Peek[delta.roomID] = *jr
case gomatrixserverlib.Leave:
fallthrough // transitions to leave are the same as ban
case gomatrixserverlib.Ban:
// TODO: recentEvents may contain events that this user is not allowed to see because they are
// no longer in the room.
lr := types.NewLeaveResponse()
lr.Timeline.PrevBatch = &prevBatch
lr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
lr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Leave[delta.roomID] = *lr
}
return nil
}
// fetchStateEvents converts the set of event IDs into a set of events. It will fetch any which are missing from the database.
// Returns a map of room ID to list of events.
func (d *Database) fetchStateEvents(
@ -1166,11 +672,11 @@ func (d *Database) fetchMissingStateEvents(
// the user has new membership events.
// A list of joined room IDs is also returned in case the caller needs it.
// nolint:gocyclo
func (d *Database) getStateDeltas(
ctx context.Context, device *userapi.Device, txn *sql.Tx,
func (d *Database) GetStateDeltas(
ctx context.Context, device *userapi.Device,
r types.Range, userID string,
stateFilter *gomatrixserverlib.StateFilter,
) ([]stateDelta, []string, error) {
) ([]types.StateDelta, []string, error) {
// Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821
// - Get membership list changes for this user in this sync response
// - For each room which has membership list changes:
@ -1179,7 +685,14 @@ func (d *Database) getStateDeltas(
// * Check if user is still CURRENTLY invited to the room. If so, add room to 'invited' block.
// * Check if the user is CURRENTLY (TODO) left/banned. If so, add room to 'archived' block.
// - Get all CURRENTLY joined rooms, and add them to 'joined' block.
var deltas []stateDelta
txn, err := d.readOnlySnapshot(ctx)
if err != nil {
return nil, nil, fmt.Errorf("d.readOnlySnapshot: %w", err)
}
var succeeded bool
defer sqlutil.EndTransactionWithCheck(txn, &succeeded, &err)
var deltas []types.StateDelta
// get all the state events ever (i.e. for all available rooms) between these two positions
stateNeeded, eventMap, err := d.OutputEvents.SelectStateInRange(ctx, txn, r, stateFilter)
@ -1210,10 +723,10 @@ func (d *Database) getStateDeltas(
state[peek.RoomID] = s
}
if !peek.Deleted {
deltas = append(deltas, stateDelta{
membership: gomatrixserverlib.Peek,
stateEvents: d.StreamEventsToEvents(device, state[peek.RoomID]),
roomID: peek.RoomID,
deltas = append(deltas, types.StateDelta{
Membership: gomatrixserverlib.Peek,
StateEvents: d.StreamEventsToEvents(device, state[peek.RoomID]),
RoomID: peek.RoomID,
})
}
}
@ -1238,11 +751,11 @@ func (d *Database) getStateDeltas(
continue // we'll add this room in when we do joined rooms
}
deltas = append(deltas, stateDelta{
membership: membership,
membershipPos: ev.StreamPosition,
stateEvents: d.StreamEventsToEvents(device, stateStreamEvents),
roomID: roomID,
deltas = append(deltas, types.StateDelta{
Membership: membership,
MembershipPos: ev.StreamPosition,
StateEvents: d.StreamEventsToEvents(device, stateStreamEvents),
RoomID: roomID,
})
break
}
@ -1255,13 +768,14 @@ func (d *Database) getStateDeltas(
return nil, nil, err
}
for _, joinedRoomID := range joinedRoomIDs {
deltas = append(deltas, stateDelta{
membership: gomatrixserverlib.Join,
stateEvents: d.StreamEventsToEvents(device, state[joinedRoomID]),
roomID: joinedRoomID,
deltas = append(deltas, types.StateDelta{
Membership: gomatrixserverlib.Join,
StateEvents: d.StreamEventsToEvents(device, state[joinedRoomID]),
RoomID: joinedRoomID,
})
}
succeeded = true
return deltas, joinedRoomIDs, nil
}
@ -1270,13 +784,20 @@ func (d *Database) getStateDeltas(
// Fetches full state for all joined rooms and uses selectStateInRange to get
// updates for other rooms.
// nolint:gocyclo
func (d *Database) getStateDeltasForFullStateSync(
ctx context.Context, device *userapi.Device, txn *sql.Tx,
func (d *Database) GetStateDeltasForFullStateSync(
ctx context.Context, device *userapi.Device,
r types.Range, userID string,
stateFilter *gomatrixserverlib.StateFilter,
) ([]stateDelta, []string, error) {
) ([]types.StateDelta, []string, error) {
txn, err := d.readOnlySnapshot(ctx)
if err != nil {
return nil, nil, fmt.Errorf("d.readOnlySnapshot: %w", err)
}
var succeeded bool
defer sqlutil.EndTransactionWithCheck(txn, &succeeded, &err)
// Use a reasonable initial capacity
deltas := make(map[string]stateDelta)
deltas := make(map[string]types.StateDelta)
peeks, err := d.Peeks.SelectPeeksInRange(ctx, txn, userID, device.ID, r)
if err != nil {
@ -1290,10 +811,10 @@ func (d *Database) getStateDeltasForFullStateSync(
if stateErr != nil {
return nil, nil, stateErr
}
deltas[peek.RoomID] = stateDelta{
membership: gomatrixserverlib.Peek,
stateEvents: d.StreamEventsToEvents(device, s),
roomID: peek.RoomID,
deltas[peek.RoomID] = types.StateDelta{
Membership: gomatrixserverlib.Peek,
StateEvents: d.StreamEventsToEvents(device, s),
RoomID: peek.RoomID,
}
}
}
@ -1312,11 +833,11 @@ func (d *Database) getStateDeltasForFullStateSync(
for _, ev := range stateStreamEvents {
if membership := getMembershipFromEvent(ev.Event, userID); membership != "" {
if membership != gomatrixserverlib.Join { // We've already added full state for all joined rooms above.
deltas[roomID] = stateDelta{
membership: membership,
membershipPos: ev.StreamPosition,
stateEvents: d.StreamEventsToEvents(device, stateStreamEvents),
roomID: roomID,
deltas[roomID] = types.StateDelta{
Membership: membership,
MembershipPos: ev.StreamPosition,
StateEvents: d.StreamEventsToEvents(device, stateStreamEvents),
RoomID: roomID,
}
}
@ -1336,21 +857,22 @@ func (d *Database) getStateDeltasForFullStateSync(
if stateErr != nil {
return nil, nil, stateErr
}
deltas[joinedRoomID] = stateDelta{
membership: gomatrixserverlib.Join,
stateEvents: d.StreamEventsToEvents(device, s),
roomID: joinedRoomID,
deltas[joinedRoomID] = types.StateDelta{
Membership: gomatrixserverlib.Join,
StateEvents: d.StreamEventsToEvents(device, s),
RoomID: joinedRoomID,
}
}
// Create a response array.
result := make([]stateDelta, len(deltas))
result := make([]types.StateDelta, len(deltas))
i := 0
for _, delta := range deltas {
result[i] = delta
i++
}
succeeded = true
return result, joinedRoomIDs, nil
}
@ -1470,31 +992,6 @@ func (d *Database) CleanSendToDeviceUpdates(
return
}
// There may be some overlap where events in stateEvents are already in recentEvents, so filter
// them out so we don't include them twice in the /sync response. They should be in recentEvents
// only, so clients get to the correct state once they have rolled forward.
func removeDuplicates(stateEvents, recentEvents []*gomatrixserverlib.HeaderedEvent) []*gomatrixserverlib.HeaderedEvent {
for _, recentEv := range recentEvents {
if recentEv.StateKey() == nil {
continue // not a state event
}
// TODO: This is a linear scan over all the current state events in this room. This will
// be slow for big rooms. We should instead sort the state events by event ID (ORDER BY)
// then do a binary search to find matching events, similar to what roomserver does.
for j := 0; j < len(stateEvents); j++ {
if stateEvents[j].EventID() == recentEv.EventID() {
// overwrite the element to remove with the last element then pop the last element.
// This is orders of magnitude faster than re-slicing, but doesn't preserve ordering
// (we don't care about the order of stateEvents)
stateEvents[j] = stateEvents[len(stateEvents)-1]
stateEvents = stateEvents[:len(stateEvents)-1]
break // there shouldn't be multiple events with the same event ID
}
}
}
return stateEvents
}
// getMembershipFromEvent returns the value of content.membership iff the event is a state event
// with type 'm.room.member' and state_key of userID. Otherwise, an empty string is returned.
func getMembershipFromEvent(ev *gomatrixserverlib.Event, userID string) string {
@ -1508,15 +1005,6 @@ func getMembershipFromEvent(ev *gomatrixserverlib.Event, userID string) string {
return membership
}
type stateDelta struct {
roomID string
stateEvents []*gomatrixserverlib.HeaderedEvent
membership string
// The PDU stream position of the latest membership event for this user, if applicable.
// Can be 0 if there is no membership event in this delta.
membershipPos types.StreamPosition
}
// StoreReceipt stores user receipts
func (d *Database) StoreReceipt(ctx context.Context, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error) {
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {

View file

@ -101,7 +101,7 @@ func (r *receiptStatements) UpsertReceipt(ctx context.Context, txn *sql.Tx, room
// SelectRoomReceiptsAfter select all receipts for a given room after a specific timestamp
func (r *receiptStatements) SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []api.OutputReceiptEvent, error) {
selectSQL := strings.Replace(selectRoomReceipts, "($2)", sqlutil.QueryVariadicOffset(len(roomIDs), 1), 1)
lastPos := types.StreamPosition(0)
lastPos := streamPos
params := make([]interface{}, len(roomIDs)+1)
params[0] = streamPos
for k, v := range roomIDs {

View file

@ -21,7 +21,6 @@ import (
// Import the sqlite3 package
_ "github.com/mattn/go-sqlite3"
"github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/storage/shared"
@ -119,7 +118,6 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er
Filter: filter,
SendToDevice: sendToDevice,
Receipts: receipts,
EDUCache: cache.New(),
}
return nil
}

View file

@ -1,5 +1,7 @@
package storage_test
// TODO: Fix these tests
/*
import (
"context"
"crypto/ed25519"
@ -746,3 +748,4 @@ func reversed(in []*gomatrixserverlib.HeaderedEvent) []*gomatrixserverlib.Header
}
return out
}
*/

View file

@ -0,0 +1,132 @@
package streams
import (
"context"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
)
type AccountDataStreamProvider struct {
StreamProvider
userAPI userapi.UserInternalAPI
}
func (p *AccountDataStreamProvider) Setup() {
p.StreamProvider.Setup()
p.latestMutex.Lock()
defer p.latestMutex.Unlock()
id, err := p.DB.MaxStreamPositionForAccountData(context.Background())
if err != nil {
panic(err)
}
p.latest = id
}
func (p *AccountDataStreamProvider) CompleteSync(
ctx context.Context,
req *types.SyncRequest,
) types.StreamPosition {
dataReq := &userapi.QueryAccountDataRequest{
UserID: req.Device.UserID,
}
dataRes := &userapi.QueryAccountDataResponse{}
if err := p.userAPI.QueryAccountData(ctx, dataReq, dataRes); err != nil {
req.Log.WithError(err).Error("p.userAPI.QueryAccountData failed")
return p.LatestPosition(ctx)
}
for datatype, databody := range dataRes.GlobalAccountData {
req.Response.AccountData.Events = append(
req.Response.AccountData.Events,
gomatrixserverlib.ClientEvent{
Type: datatype,
Content: gomatrixserverlib.RawJSON(databody),
},
)
}
for r, j := range req.Response.Rooms.Join {
for datatype, databody := range dataRes.RoomAccountData[r] {
j.AccountData.Events = append(
j.AccountData.Events,
gomatrixserverlib.ClientEvent{
Type: datatype,
Content: gomatrixserverlib.RawJSON(databody),
},
)
req.Response.Rooms.Join[r] = j
}
}
return p.LatestPosition(ctx)
}
func (p *AccountDataStreamProvider) IncrementalSync(
ctx context.Context,
req *types.SyncRequest,
from, to types.StreamPosition,
) types.StreamPosition {
r := types.Range{
From: from,
To: to,
}
accountDataFilter := gomatrixserverlib.DefaultEventFilter() // TODO: use filter provided in req instead
dataTypes, err := p.DB.GetAccountDataInRange(
ctx, req.Device.UserID, r, &accountDataFilter,
)
if err != nil {
req.Log.WithError(err).Error("p.DB.GetAccountDataInRange failed")
return from
}
if len(dataTypes) == 0 {
// TODO: this fixes the sytest but is it the right thing to do?
dataTypes[""] = []string{"m.push_rules"}
}
// Iterate over the rooms
for roomID, dataTypes := range dataTypes {
// Request the missing data from the database
for _, dataType := range dataTypes {
dataReq := userapi.QueryAccountDataRequest{
UserID: req.Device.UserID,
RoomID: roomID,
DataType: dataType,
}
dataRes := userapi.QueryAccountDataResponse{}
err = p.userAPI.QueryAccountData(ctx, &dataReq, &dataRes)
if err != nil {
req.Log.WithError(err).Error("p.userAPI.QueryAccountData failed")
continue
}
if roomID == "" {
if globalData, ok := dataRes.GlobalAccountData[dataType]; ok {
req.Response.AccountData.Events = append(
req.Response.AccountData.Events,
gomatrixserverlib.ClientEvent{
Type: dataType,
Content: gomatrixserverlib.RawJSON(globalData),
},
)
}
} else {
if roomData, ok := dataRes.RoomAccountData[roomID][dataType]; ok {
joinData := req.Response.Rooms.Join[roomID]
joinData.AccountData.Events = append(
joinData.AccountData.Events,
gomatrixserverlib.ClientEvent{
Type: dataType,
Content: gomatrixserverlib.RawJSON(roomData),
},
)
req.Response.Rooms.Join[roomID] = joinData
}
}
}
}
return to
}

View file

@ -0,0 +1,43 @@
package streams
import (
"context"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/internal"
"github.com/matrix-org/dendrite/syncapi/types"
)
type DeviceListStreamProvider struct {
PartitionedStreamProvider
rsAPI api.RoomserverInternalAPI
keyAPI keyapi.KeyInternalAPI
}
func (p *DeviceListStreamProvider) CompleteSync(
ctx context.Context,
req *types.SyncRequest,
) types.LogPosition {
return p.IncrementalSync(ctx, req, types.LogPosition{}, p.LatestPosition(ctx))
}
func (p *DeviceListStreamProvider) IncrementalSync(
ctx context.Context,
req *types.SyncRequest,
from, to types.LogPosition,
) types.LogPosition {
var err error
to, _, err = internal.DeviceListCatchup(context.Background(), p.keyAPI, p.rsAPI, req.Device.UserID, req.Response, from, to)
if err != nil {
req.Log.WithError(err).Error("internal.DeviceListCatchup failed")
return from
}
err = internal.DeviceOTKCounts(req.Context, p.keyAPI, req.Device.UserID, req.Device.ID, req.Response)
if err != nil {
req.Log.WithError(err).Error("internal.DeviceListCatchup failed")
return from
}
return to
}

View file

@ -0,0 +1,64 @@
package streams
import (
"context"
"github.com/matrix-org/dendrite/syncapi/types"
)
type InviteStreamProvider struct {
StreamProvider
}
func (p *InviteStreamProvider) Setup() {
p.StreamProvider.Setup()
p.latestMutex.Lock()
defer p.latestMutex.Unlock()
id, err := p.DB.MaxStreamPositionForInvites(context.Background())
if err != nil {
panic(err)
}
p.latest = id
}
func (p *InviteStreamProvider) CompleteSync(
ctx context.Context,
req *types.SyncRequest,
) types.StreamPosition {
return p.IncrementalSync(ctx, req, 0, p.LatestPosition(ctx))
}
func (p *InviteStreamProvider) IncrementalSync(
ctx context.Context,
req *types.SyncRequest,
from, to types.StreamPosition,
) types.StreamPosition {
r := types.Range{
From: from,
To: to,
}
invites, retiredInvites, err := p.DB.InviteEventsInRange(
ctx, req.Device.UserID, r,
)
if err != nil {
req.Log.WithError(err).Error("p.DB.InviteEventsInRange failed")
return from
}
for roomID, inviteEvent := range invites {
ir := types.NewInviteResponse(inviteEvent)
req.Response.Rooms.Invite[roomID] = *ir
}
for roomID := range retiredInvites {
if _, ok := req.Response.Rooms.Join[roomID]; !ok {
lr := types.NewLeaveResponse()
req.Response.Rooms.Leave[roomID] = *lr
}
}
return to
}

View file

@ -0,0 +1,305 @@
package streams
import (
"context"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
)
type PDUStreamProvider struct {
StreamProvider
}
func (p *PDUStreamProvider) Setup() {
p.StreamProvider.Setup()
p.latestMutex.Lock()
defer p.latestMutex.Unlock()
id, err := p.DB.MaxStreamPositionForPDUs(context.Background())
if err != nil {
panic(err)
}
p.latest = id
}
func (p *PDUStreamProvider) CompleteSync(
ctx context.Context,
req *types.SyncRequest,
) types.StreamPosition {
from := types.StreamPosition(0)
to := p.LatestPosition(ctx)
// Get the current sync position which we will base the sync response on.
// For complete syncs, we want to start at the most recent events and work
// backwards, so that we show the most recent events in the room.
r := types.Range{
From: to,
To: 0,
Backwards: true,
}
// Extract room state and recent events for all rooms the user is joined to.
joinedRoomIDs, err := p.DB.RoomIDsWithMembership(ctx, req.Device.UserID, gomatrixserverlib.Join)
if err != nil {
req.Log.WithError(err).Error("p.DB.RoomIDsWithMembership failed")
return from
}
stateFilter := gomatrixserverlib.DefaultStateFilter() // TODO: use filter provided in request
// Build up a /sync response. Add joined rooms.
for _, roomID := range joinedRoomIDs {
var jr *types.JoinResponse
jr, err = p.getJoinResponseForCompleteSync(
ctx, roomID, r, &stateFilter, req.Limit, req.Device,
)
if err != nil {
req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed")
return from
}
req.Response.Rooms.Join[roomID] = *jr
req.Rooms[roomID] = gomatrixserverlib.Join
}
// Add peeked rooms.
peeks, err := p.DB.PeeksInRange(ctx, req.Device.UserID, req.Device.ID, r)
if err != nil {
req.Log.WithError(err).Error("p.DB.PeeksInRange failed")
return from
}
for _, peek := range peeks {
if !peek.Deleted {
var jr *types.JoinResponse
jr, err = p.getJoinResponseForCompleteSync(
ctx, peek.RoomID, r, &stateFilter, req.Limit, req.Device,
)
if err != nil {
req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed")
return from
}
req.Response.Rooms.Peek[peek.RoomID] = *jr
}
}
return to
}
// nolint:gocyclo
func (p *PDUStreamProvider) IncrementalSync(
ctx context.Context,
req *types.SyncRequest,
from, to types.StreamPosition,
) (newPos types.StreamPosition) {
r := types.Range{
From: from,
To: to,
Backwards: from > to,
}
newPos = to
var err error
var stateDeltas []types.StateDelta
var joinedRooms []string
// TODO: use filter provided in request
stateFilter := gomatrixserverlib.DefaultStateFilter()
if req.WantFullState {
if stateDeltas, joinedRooms, err = p.DB.GetStateDeltasForFullStateSync(ctx, req.Device, r, req.Device.UserID, &stateFilter); err != nil {
req.Log.WithError(err).Error("p.DB.GetStateDeltasForFullStateSync failed")
return
}
} else {
if stateDeltas, joinedRooms, err = p.DB.GetStateDeltas(ctx, req.Device, r, req.Device.UserID, &stateFilter); err != nil {
req.Log.WithError(err).Error("p.DB.GetStateDeltas failed")
return
}
}
for _, roomID := range joinedRooms {
req.Rooms[roomID] = gomatrixserverlib.Join
}
for _, delta := range stateDeltas {
if err = p.addRoomDeltaToResponse(ctx, req.Device, r, delta, req.Limit, req.Response); err != nil {
req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed")
return newPos
}
}
return r.To
}
func (p *PDUStreamProvider) addRoomDeltaToResponse(
ctx context.Context,
device *userapi.Device,
r types.Range,
delta types.StateDelta,
numRecentEventsPerRoom int,
res *types.Response,
) error {
if delta.MembershipPos > 0 && delta.Membership == gomatrixserverlib.Leave {
// make sure we don't leak recent events after the leave event.
// TODO: History visibility makes this somewhat complex to handle correctly. For example:
// TODO: This doesn't work for join -> leave in a single /sync request (see events prior to join).
// TODO: This will fail on join -> leave -> sensitive msg -> join -> leave
// in a single /sync request
// This is all "okay" assuming history_visibility == "shared" which it is by default.
r.To = delta.MembershipPos
}
recentStreamEvents, limited, err := p.DB.RecentEvents(
ctx, delta.RoomID, r,
numRecentEventsPerRoom, true, true,
)
if err != nil {
return err
}
recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents)
delta.StateEvents = removeDuplicates(delta.StateEvents, recentEvents) // roll back
prevBatch, err := p.DB.GetBackwardTopologyPos(ctx, recentStreamEvents)
if err != nil {
return err
}
// XXX: should we ever get this far if we have no recent events or state in this room?
// in practice we do for peeks, but possibly not joins?
if len(recentEvents) == 0 && len(delta.StateEvents) == 0 {
return nil
}
switch delta.Membership {
case gomatrixserverlib.Join:
jr := types.NewJoinResponse()
jr.Timeline.PrevBatch = &prevBatch
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = limited
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Join[delta.RoomID] = *jr
case gomatrixserverlib.Peek:
jr := types.NewJoinResponse()
jr.Timeline.PrevBatch = &prevBatch
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = limited
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Peek[delta.RoomID] = *jr
case gomatrixserverlib.Leave:
fallthrough // transitions to leave are the same as ban
case gomatrixserverlib.Ban:
// TODO: recentEvents may contain events that this user is not allowed to see because they are
// no longer in the room.
lr := types.NewLeaveResponse()
lr.Timeline.PrevBatch = &prevBatch
lr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
lr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Leave[delta.RoomID] = *lr
}
return nil
}
func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
ctx context.Context,
roomID string,
r types.Range,
stateFilter *gomatrixserverlib.StateFilter,
numRecentEventsPerRoom int, device *userapi.Device,
) (jr *types.JoinResponse, err error) {
var stateEvents []*gomatrixserverlib.HeaderedEvent
stateEvents, err = p.DB.CurrentState(ctx, roomID, stateFilter)
if err != nil {
return
}
// TODO: When filters are added, we may need to call this multiple times to get enough events.
// See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316
var recentStreamEvents []types.StreamEvent
var limited bool
recentStreamEvents, limited, err = p.DB.RecentEvents(
ctx, roomID, r, numRecentEventsPerRoom, true, true,
)
if err != nil {
return
}
// TODO FIXME: We don't fully implement history visibility yet. To avoid leaking events which the
// user shouldn't see, we check the recent events and remove any prior to the join event of the user
// which is equiv to history_visibility: joined
joinEventIndex := -1
for i := len(recentStreamEvents) - 1; i >= 0; i-- {
ev := recentStreamEvents[i]
if ev.Type() == gomatrixserverlib.MRoomMember && ev.StateKeyEquals(device.UserID) {
membership, _ := ev.Membership()
if membership == "join" {
joinEventIndex = i
if i > 0 {
// the create event happens before the first join, so we should cut it at that point instead
if recentStreamEvents[i-1].Type() == gomatrixserverlib.MRoomCreate && recentStreamEvents[i-1].StateKeyEquals("") {
joinEventIndex = i - 1
break
}
}
break
}
}
}
if joinEventIndex != -1 {
// cut all events earlier than the join (but not the join itself)
recentStreamEvents = recentStreamEvents[joinEventIndex:]
limited = false // so clients know not to try to backpaginate
}
// Retrieve the backward topology position, i.e. the position of the
// oldest event in the room's topology.
var prevBatch *types.TopologyToken
if len(recentStreamEvents) > 0 {
var backwardTopologyPos, backwardStreamPos types.StreamPosition
backwardTopologyPos, backwardStreamPos, err = p.DB.PositionInTopology(ctx, recentStreamEvents[0].EventID())
if err != nil {
return
}
prevBatch = &types.TopologyToken{
Depth: backwardTopologyPos,
PDUPosition: backwardStreamPos,
}
prevBatch.Decrement()
}
// We don't include a device here as we don't need to send down
// transaction IDs for complete syncs, but we do it anyway because Sytest demands it for:
// "Can sync a room with a message with a transaction id" - which does a complete sync to check.
recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents)
stateEvents = removeDuplicates(stateEvents, recentEvents)
jr = types.NewJoinResponse()
jr.Timeline.PrevBatch = prevBatch
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = limited
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(stateEvents, gomatrixserverlib.FormatSync)
return jr, nil
}
func removeDuplicates(stateEvents, recentEvents []*gomatrixserverlib.HeaderedEvent) []*gomatrixserverlib.HeaderedEvent {
for _, recentEv := range recentEvents {
if recentEv.StateKey() == nil {
continue // not a state event
}
// TODO: This is a linear scan over all the current state events in this room. This will
// be slow for big rooms. We should instead sort the state events by event ID (ORDER BY)
// then do a binary search to find matching events, similar to what roomserver does.
for j := 0; j < len(stateEvents); j++ {
if stateEvents[j].EventID() == recentEv.EventID() {
// overwrite the element to remove with the last element then pop the last element.
// This is orders of magnitude faster than re-slicing, but doesn't preserve ordering
// (we don't care about the order of stateEvents)
stateEvents[j] = stateEvents[len(stateEvents)-1]
stateEvents = stateEvents[:len(stateEvents)-1]
break // there shouldn't be multiple events with the same event ID
}
}
}
return stateEvents
}

View file

@ -0,0 +1,91 @@
package streams
import (
"context"
"encoding/json"
eduAPI "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
)
type ReceiptStreamProvider struct {
StreamProvider
}
func (p *ReceiptStreamProvider) Setup() {
p.StreamProvider.Setup()
id, err := p.DB.MaxStreamPositionForReceipts(context.Background())
if err != nil {
panic(err)
}
p.latest = id
}
func (p *ReceiptStreamProvider) CompleteSync(
ctx context.Context,
req *types.SyncRequest,
) types.StreamPosition {
return p.IncrementalSync(ctx, req, 0, p.LatestPosition(ctx))
}
func (p *ReceiptStreamProvider) IncrementalSync(
ctx context.Context,
req *types.SyncRequest,
from, to types.StreamPosition,
) types.StreamPosition {
var joinedRooms []string
for roomID, membership := range req.Rooms {
if membership == gomatrixserverlib.Join {
joinedRooms = append(joinedRooms, roomID)
}
}
lastPos, receipts, err := p.DB.RoomReceiptsAfter(ctx, joinedRooms, from)
if err != nil {
req.Log.WithError(err).Error("p.DB.RoomReceiptsAfter failed")
return from
}
if len(receipts) == 0 || lastPos == 0 {
return to
}
// Group receipts by room, so we can create one ClientEvent for every room
receiptsByRoom := make(map[string][]eduAPI.OutputReceiptEvent)
for _, receipt := range receipts {
receiptsByRoom[receipt.RoomID] = append(receiptsByRoom[receipt.RoomID], receipt)
}
for roomID, receipts := range receiptsByRoom {
jr := req.Response.Rooms.Join[roomID]
var ok bool
ev := gomatrixserverlib.ClientEvent{
Type: gomatrixserverlib.MReceipt,
RoomID: roomID,
}
content := make(map[string]eduAPI.ReceiptMRead)
for _, receipt := range receipts {
var read eduAPI.ReceiptMRead
if read, ok = content[receipt.EventID]; !ok {
read = eduAPI.ReceiptMRead{
User: make(map[string]eduAPI.ReceiptTS),
}
}
read.User[receipt.UserID] = eduAPI.ReceiptTS{TS: receipt.Timestamp}
content[receipt.EventID] = read
}
ev.Content, err = json.Marshal(content)
if err != nil {
req.Log.WithError(err).Error("json.Marshal failed")
return from
}
jr.Ephemeral.Events = append(jr.Ephemeral.Events, ev)
req.Response.Rooms.Join[roomID] = jr
}
return lastPos
}

View file

@ -0,0 +1,51 @@
package streams
import (
"context"
"github.com/matrix-org/dendrite/syncapi/types"
)
type SendToDeviceStreamProvider struct {
StreamProvider
}
func (p *SendToDeviceStreamProvider) CompleteSync(
ctx context.Context,
req *types.SyncRequest,
) types.StreamPosition {
return p.IncrementalSync(ctx, req, 0, p.LatestPosition(ctx))
}
func (p *SendToDeviceStreamProvider) IncrementalSync(
ctx context.Context,
req *types.SyncRequest,
from, to types.StreamPosition,
) types.StreamPosition {
// See if we have any new tasks to do for the send-to-device messaging.
lastPos, events, updates, deletions, err := p.DB.SendToDeviceUpdatesForSync(req.Context, req.Device.UserID, req.Device.ID, req.Since)
if err != nil {
req.Log.WithError(err).Error("p.DB.SendToDeviceUpdatesForSync failed")
return from
}
// Before we return the sync response, make sure that we take action on
// any send-to-device database updates or deletions that we need to do.
// Then add the updates into the sync response.
if len(updates) > 0 || len(deletions) > 0 {
// Handle the updates and deletions in the database.
err = p.DB.CleanSendToDeviceUpdates(context.Background(), updates, deletions, req.Since)
if err != nil {
req.Log.WithError(err).Error("p.DB.CleanSendToDeviceUpdates failed")
return from
}
}
if len(events) > 0 {
// Add the updates into the sync response.
for _, event := range events {
req.Response.ToDevice.Events = append(req.Response.ToDevice.Events, event.SendToDeviceEvent)
}
}
return lastPos
}

View file

@ -0,0 +1,57 @@
package streams
import (
"context"
"encoding/json"
"github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
)
type TypingStreamProvider struct {
StreamProvider
EDUCache *cache.EDUCache
}
func (p *TypingStreamProvider) CompleteSync(
ctx context.Context,
req *types.SyncRequest,
) types.StreamPosition {
return p.IncrementalSync(ctx, req, 0, p.LatestPosition(ctx))
}
func (p *TypingStreamProvider) IncrementalSync(
ctx context.Context,
req *types.SyncRequest,
from, to types.StreamPosition,
) types.StreamPosition {
var err error
for roomID, membership := range req.Rooms {
if membership != gomatrixserverlib.Join {
continue
}
jr := req.Response.Rooms.Join[roomID]
if users, updated := p.EDUCache.GetTypingUsersIfUpdatedAfter(
roomID, int64(from),
); updated {
ev := gomatrixserverlib.ClientEvent{
Type: gomatrixserverlib.MTyping,
}
ev.Content, err = json.Marshal(map[string]interface{}{
"user_ids": users,
})
if err != nil {
req.Log.WithError(err).Error("json.Marshal failed")
return from
}
jr.Ephemeral.Events = append(jr.Ephemeral.Events, ev)
req.Response.Rooms.Join[roomID] = jr
}
}
return to
}

View file

@ -0,0 +1,78 @@
package streams
import (
"context"
"github.com/matrix-org/dendrite/eduserver/cache"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
rsapi "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
)
type Streams struct {
PDUStreamProvider types.StreamProvider
TypingStreamProvider types.StreamProvider
ReceiptStreamProvider types.StreamProvider
InviteStreamProvider types.StreamProvider
SendToDeviceStreamProvider types.StreamProvider
AccountDataStreamProvider types.StreamProvider
DeviceListStreamProvider types.PartitionedStreamProvider
}
func NewSyncStreamProviders(
d storage.Database, userAPI userapi.UserInternalAPI,
rsAPI rsapi.RoomserverInternalAPI, keyAPI keyapi.KeyInternalAPI,
eduCache *cache.EDUCache,
) *Streams {
streams := &Streams{
PDUStreamProvider: &PDUStreamProvider{
StreamProvider: StreamProvider{DB: d},
},
TypingStreamProvider: &TypingStreamProvider{
StreamProvider: StreamProvider{DB: d},
EDUCache: eduCache,
},
ReceiptStreamProvider: &ReceiptStreamProvider{
StreamProvider: StreamProvider{DB: d},
},
InviteStreamProvider: &InviteStreamProvider{
StreamProvider: StreamProvider{DB: d},
},
SendToDeviceStreamProvider: &SendToDeviceStreamProvider{
StreamProvider: StreamProvider{DB: d},
},
AccountDataStreamProvider: &AccountDataStreamProvider{
StreamProvider: StreamProvider{DB: d},
userAPI: userAPI,
},
DeviceListStreamProvider: &DeviceListStreamProvider{
PartitionedStreamProvider: PartitionedStreamProvider{DB: d},
rsAPI: rsAPI,
keyAPI: keyAPI,
},
}
streams.PDUStreamProvider.Setup()
streams.TypingStreamProvider.Setup()
streams.ReceiptStreamProvider.Setup()
streams.InviteStreamProvider.Setup()
streams.SendToDeviceStreamProvider.Setup()
streams.AccountDataStreamProvider.Setup()
streams.DeviceListStreamProvider.Setup()
return streams
}
func (s *Streams) Latest(ctx context.Context) types.StreamingToken {
return types.StreamingToken{
PDUPosition: s.PDUStreamProvider.LatestPosition(ctx),
TypingPosition: s.TypingStreamProvider.LatestPosition(ctx),
ReceiptPosition: s.PDUStreamProvider.LatestPosition(ctx),
InvitePosition: s.InviteStreamProvider.LatestPosition(ctx),
SendToDevicePosition: s.SendToDeviceStreamProvider.LatestPosition(ctx),
AccountDataPosition: s.AccountDataStreamProvider.LatestPosition(ctx),
DeviceListPosition: s.DeviceListStreamProvider.LatestPosition(ctx),
}
}

View file

@ -0,0 +1,38 @@
package streams
import (
"context"
"sync"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
)
type PartitionedStreamProvider struct {
DB storage.Database
latest types.LogPosition
latestMutex sync.RWMutex
}
func (p *PartitionedStreamProvider) Setup() {
}
func (p *PartitionedStreamProvider) Advance(
latest types.LogPosition,
) {
p.latestMutex.Lock()
defer p.latestMutex.Unlock()
if latest.IsAfter(&p.latest) {
p.latest = latest
}
}
func (p *PartitionedStreamProvider) LatestPosition(
ctx context.Context,
) types.LogPosition {
p.latestMutex.RLock()
defer p.latestMutex.RUnlock()
return p.latest
}

View file

@ -0,0 +1,38 @@
package streams
import (
"context"
"sync"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
)
type StreamProvider struct {
DB storage.Database
latest types.StreamPosition
latestMutex sync.RWMutex
}
func (p *StreamProvider) Setup() {
}
func (p *StreamProvider) Advance(
latest types.StreamPosition,
) {
p.latestMutex.Lock()
defer p.latestMutex.Unlock()
if latest > p.latest {
p.latest = latest
}
}
func (p *StreamProvider) LatestPosition(
ctx context.Context,
) types.StreamPosition {
p.latestMutex.RLock()
defer p.latestMutex.RUnlock()
return p.latest
}

View file

@ -15,7 +15,6 @@
package sync
import (
"context"
"encoding/json"
"net/http"
"strconv"
@ -26,7 +25,7 @@ import (
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
log "github.com/sirupsen/logrus"
"github.com/sirupsen/logrus"
)
const defaultSyncTimeout = time.Duration(0)
@ -40,18 +39,7 @@ type filter struct {
} `json:"room"`
}
// syncRequest represents a /sync request, with sensible defaults/sanity checks applied.
type syncRequest struct {
ctx context.Context
device userapi.Device
limit int
timeout time.Duration
since types.StreamingToken // nil means that no since token was supplied
wantFullState bool
log *log.Entry
}
func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Database) (*syncRequest, error) {
func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Database) (*types.SyncRequest, error) {
timeout := getTimeout(req.URL.Query().Get("timeout"))
fullState := req.URL.Query().Get("full_state")
wantFullState := fullState != "" && fullState != "false"
@ -87,15 +75,30 @@ func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Dat
}
}
}
filter := gomatrixserverlib.DefaultEventFilter()
filter.Limit = timelineLimit
// TODO: Additional query params: set_presence, filter
return &syncRequest{
ctx: req.Context(),
device: device,
timeout: timeout,
since: since,
wantFullState: wantFullState,
limit: timelineLimit,
log: util.GetLogger(req.Context()),
logger := util.GetLogger(req.Context()).WithFields(logrus.Fields{
"user_id": device.UserID,
"device_id": device.ID,
"since": since,
"timeout": timeout,
"limit": timelineLimit,
})
return &types.SyncRequest{
Context: req.Context(), //
Log: logger, //
Device: &device, //
Response: types.NewResponse(), // Populated by all streams
Filter: filter, //
Since: since, //
Timeout: timeout, //
Limit: timelineLimit, //
Rooms: make(map[string]string), // Populated by the PDU stream
WantFullState: wantFullState, //
}, nil
}

View file

@ -17,8 +17,6 @@
package sync
import (
"context"
"fmt"
"net"
"net/http"
"strings"
@ -30,13 +28,13 @@ import (
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/internal"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/streams"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
)
// RequestPool manages HTTP long-poll connections for /sync
@ -44,19 +42,30 @@ type RequestPool struct {
db storage.Database
cfg *config.SyncAPI
userAPI userapi.UserInternalAPI
Notifier *Notifier
keyAPI keyapi.KeyInternalAPI
rsAPI roomserverAPI.RoomserverInternalAPI
lastseen sync.Map
streams *streams.Streams
Notifier *notifier.Notifier
}
// NewRequestPool makes a new RequestPool
func NewRequestPool(
db storage.Database, cfg *config.SyncAPI, n *Notifier,
db storage.Database, cfg *config.SyncAPI,
userAPI userapi.UserInternalAPI, keyAPI keyapi.KeyInternalAPI,
rsAPI roomserverAPI.RoomserverInternalAPI,
streams *streams.Streams, notifier *notifier.Notifier,
) *RequestPool {
rp := &RequestPool{db, cfg, userAPI, n, keyAPI, rsAPI, sync.Map{}}
rp := &RequestPool{
db: db,
cfg: cfg,
userAPI: userAPI,
keyAPI: keyAPI,
rsAPI: rsAPI,
lastseen: sync.Map{},
streams: streams,
Notifier: notifier,
}
go rp.cleanLastSeen()
return rp
}
@ -128,8 +137,6 @@ var waitingSyncRequests = prometheus.NewGauge(
// called in a dedicated goroutine for this request. This function will block the goroutine
// until a response is ready, or it times out.
func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.Device) util.JSONResponse {
var syncData *types.Response
// Extract values from request
syncReq, err := newSyncRequest(req, *device, rp.db)
if err != nil {
@ -139,88 +146,108 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
}
}
logger := util.GetLogger(req.Context()).WithFields(log.Fields{
"user_id": device.UserID,
"device_id": device.ID,
"since": syncReq.since,
"timeout": syncReq.timeout,
"limit": syncReq.limit,
})
activeSyncRequests.Inc()
defer activeSyncRequests.Dec()
rp.updateLastSeen(req, device)
currPos := rp.Notifier.CurrentPosition()
if rp.shouldReturnImmediately(syncReq) {
syncData, err = rp.currentSyncForUser(*syncReq, currPos)
if err != nil {
logger.WithError(err).Error("rp.currentSyncForUser failed")
return jsonerror.InternalServerError()
}
logger.WithField("next", syncData.NextBatch).Info("Responding immediately")
return util.JSONResponse{
Code: http.StatusOK,
JSON: syncData,
}
}
waitingSyncRequests.Inc()
defer waitingSyncRequests.Dec()
// Otherwise, we wait for the notifier to tell us if something *may* have
// happened. We loop in case it turns out that nothing did happen.
currentPos := rp.Notifier.CurrentPosition()
timer := time.NewTimer(syncReq.timeout) // case of timeout=0 is handled above
defer timer.Stop()
if !rp.shouldReturnImmediately(syncReq) {
timer := time.NewTimer(syncReq.Timeout) // case of timeout=0 is handled above
defer timer.Stop()
userStreamListener := rp.Notifier.GetListener(*syncReq)
defer userStreamListener.Close()
userStreamListener := rp.Notifier.GetListener(*syncReq)
defer userStreamListener.Close()
// We need the loop in case userStreamListener wakes up even if there isn't
// anything to send down. In this case, we'll jump out of the select but
// don't want to send anything back until we get some actual content to
// respond with, so we skip the return an go back to waiting for content to
// be sent down or the request timing out.
var hasTimedOut bool
sincePos := syncReq.since
for {
select {
// Wait for notifier to wake us up
case <-userStreamListener.GetNotifyChannel(sincePos):
currPos = userStreamListener.GetSyncPosition()
// Or for timeout to expire
case <-timer.C:
// We just need to ensure we get out of the select after reaching the
// timeout, but there's nothing specific we want to do in this case
// apart from that, so we do nothing except stating we're timing out
// and need to respond.
hasTimedOut = true
// Or for the request to be cancelled
case <-req.Context().Done():
logger.WithError(err).Error("request cancelled")
return jsonerror.InternalServerError()
}
// Note that we don't time out during calculation of sync
// response. This ensures that we don't waste the hard work
// of calculating the sync only to get timed out before we
// can respond
syncData, err = rp.currentSyncForUser(*syncReq, currPos)
if err != nil {
logger.WithError(err).Error("rp.currentSyncForUser failed")
return jsonerror.InternalServerError()
}
if !syncData.IsEmpty() || hasTimedOut {
logger.WithField("next", syncData.NextBatch).WithField("timed_out", hasTimedOut).Info("Responding")
giveup := func() util.JSONResponse {
syncReq.Response.NextBatch = syncReq.Since
return util.JSONResponse{
Code: http.StatusOK,
JSON: syncData,
JSON: syncReq.Response,
}
}
select {
case <-syncReq.Context.Done(): // Caller gave up
return giveup()
case <-timer.C: // Timeout reached
return giveup()
case <-userStreamListener.GetNotifyChannel(syncReq.Since):
syncReq.Log.Debugln("Responding to sync after wake-up")
currentPos.ApplyUpdates(userStreamListener.GetSyncPosition())
}
} else {
syncReq.Log.Debugln("Responding to sync immediately")
}
if syncReq.Since.IsEmpty() {
// Complete sync
syncReq.Response.NextBatch = types.StreamingToken{
PDUPosition: rp.streams.PDUStreamProvider.CompleteSync(
syncReq.Context, syncReq,
),
TypingPosition: rp.streams.TypingStreamProvider.CompleteSync(
syncReq.Context, syncReq,
),
ReceiptPosition: rp.streams.ReceiptStreamProvider.CompleteSync(
syncReq.Context, syncReq,
),
InvitePosition: rp.streams.InviteStreamProvider.CompleteSync(
syncReq.Context, syncReq,
),
SendToDevicePosition: rp.streams.SendToDeviceStreamProvider.CompleteSync(
syncReq.Context, syncReq,
),
AccountDataPosition: rp.streams.AccountDataStreamProvider.CompleteSync(
syncReq.Context, syncReq,
),
DeviceListPosition: rp.streams.DeviceListStreamProvider.CompleteSync(
syncReq.Context, syncReq,
),
}
} else {
// Incremental sync
syncReq.Response.NextBatch = types.StreamingToken{
PDUPosition: rp.streams.PDUStreamProvider.IncrementalSync(
syncReq.Context, syncReq,
syncReq.Since.PDUPosition, currentPos.PDUPosition,
),
TypingPosition: rp.streams.TypingStreamProvider.IncrementalSync(
syncReq.Context, syncReq,
syncReq.Since.TypingPosition, currentPos.TypingPosition,
),
ReceiptPosition: rp.streams.ReceiptStreamProvider.IncrementalSync(
syncReq.Context, syncReq,
syncReq.Since.ReceiptPosition, currentPos.ReceiptPosition,
),
InvitePosition: rp.streams.InviteStreamProvider.IncrementalSync(
syncReq.Context, syncReq,
syncReq.Since.InvitePosition, currentPos.InvitePosition,
),
SendToDevicePosition: rp.streams.SendToDeviceStreamProvider.IncrementalSync(
syncReq.Context, syncReq,
syncReq.Since.SendToDevicePosition, currentPos.SendToDevicePosition,
),
AccountDataPosition: rp.streams.AccountDataStreamProvider.IncrementalSync(
syncReq.Context, syncReq,
syncReq.Since.AccountDataPosition, currentPos.AccountDataPosition,
),
DeviceListPosition: rp.streams.DeviceListStreamProvider.IncrementalSync(
syncReq.Context, syncReq,
syncReq.Since.DeviceListPosition, currentPos.DeviceListPosition,
),
}
}
return util.JSONResponse{
Code: http.StatusOK,
JSON: syncReq.Response,
}
}
@ -247,18 +274,18 @@ func (rp *RequestPool) OnIncomingKeyChangeRequest(req *http.Request, device *use
JSON: jsonerror.InvalidArgumentValue("bad 'to' value"),
}
}
// work out room joins/leaves
res, err := rp.db.IncrementalSync(
req.Context(), types.NewResponse(), *device, fromToken, toToken, 10, false,
)
syncReq, err := newSyncRequest(req, *device, rp.db)
if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("Failed to IncrementalSync")
util.GetLogger(req.Context()).WithError(err).Error("newSyncRequest failed")
return jsonerror.InternalServerError()
}
res, err = rp.appendDeviceLists(res, device.UserID, fromToken, toToken)
rp.streams.PDUStreamProvider.IncrementalSync(req.Context(), syncReq, fromToken.PDUPosition, toToken.PDUPosition)
_, _, err = internal.DeviceListCatchup(
req.Context(), rp.keyAPI, rp.rsAPI, syncReq.Device.UserID,
syncReq.Response, fromToken.DeviceListPosition, toToken.DeviceListPosition,
)
if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("Failed to appendDeviceLists info")
util.GetLogger(req.Context()).WithError(err).Error("Failed to DeviceListCatchup info")
return jsonerror.InternalServerError()
}
return util.JSONResponse{
@ -267,199 +294,18 @@ func (rp *RequestPool) OnIncomingKeyChangeRequest(req *http.Request, device *use
Changed []string `json:"changed"`
Left []string `json:"left"`
}{
Changed: res.DeviceLists.Changed,
Left: res.DeviceLists.Left,
Changed: syncReq.Response.DeviceLists.Changed,
Left: syncReq.Response.DeviceLists.Left,
},
}
}
// nolint:gocyclo
func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.StreamingToken) (*types.Response, error) {
res := types.NewResponse()
// See if we have any new tasks to do for the send-to-device messaging.
lastPos, events, updates, deletions, err := rp.db.SendToDeviceUpdatesForSync(req.ctx, req.device.UserID, req.device.ID, req.since)
if err != nil {
return nil, fmt.Errorf("rp.db.SendToDeviceUpdatesForSync: %w", err)
}
// TODO: handle ignored users
if req.since.IsEmpty() {
res, err = rp.db.CompleteSync(req.ctx, res, req.device, req.limit)
if err != nil {
return res, fmt.Errorf("rp.db.CompleteSync: %w", err)
}
} else {
res, err = rp.db.IncrementalSync(req.ctx, res, req.device, req.since, latestPos, req.limit, req.wantFullState)
if err != nil {
return res, fmt.Errorf("rp.db.IncrementalSync: %w", err)
}
}
accountDataFilter := gomatrixserverlib.DefaultEventFilter() // TODO: use filter provided in req instead
res, err = rp.appendAccountData(res, req.device.UserID, req, latestPos.PDUPosition, &accountDataFilter)
if err != nil {
return res, fmt.Errorf("rp.appendAccountData: %w", err)
}
res, err = rp.appendDeviceLists(res, req.device.UserID, req.since, latestPos)
if err != nil {
return res, fmt.Errorf("rp.appendDeviceLists: %w", err)
}
err = internal.DeviceOTKCounts(req.ctx, rp.keyAPI, req.device.UserID, req.device.ID, res)
if err != nil {
return res, fmt.Errorf("internal.DeviceOTKCounts: %w", err)
}
// Before we return the sync response, make sure that we take action on
// any send-to-device database updates or deletions that we need to do.
// Then add the updates into the sync response.
if len(updates) > 0 || len(deletions) > 0 {
// Handle the updates and deletions in the database.
err = rp.db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, req.since)
if err != nil {
return res, fmt.Errorf("rp.db.CleanSendToDeviceUpdates: %w", err)
}
}
if len(events) > 0 {
// Add the updates into the sync response.
for _, event := range events {
res.ToDevice.Events = append(res.ToDevice.Events, event.SendToDeviceEvent)
}
}
res.NextBatch.SendToDevicePosition = lastPos
return res, err
}
func (rp *RequestPool) appendDeviceLists(
data *types.Response, userID string, since, to types.StreamingToken,
) (*types.Response, error) {
_, err := internal.DeviceListCatchup(context.Background(), rp.keyAPI, rp.rsAPI, userID, data, since, to)
if err != nil {
return nil, fmt.Errorf("internal.DeviceListCatchup: %w", err)
}
return data, nil
}
// nolint:gocyclo
func (rp *RequestPool) appendAccountData(
data *types.Response, userID string, req syncRequest, currentPos types.StreamPosition,
accountDataFilter *gomatrixserverlib.EventFilter,
) (*types.Response, error) {
// TODO: Account data doesn't have a sync position of its own, meaning that
// account data might be sent multiple time to the client if multiple account
// data keys were set between two message. This isn't a huge issue since the
// duplicate data doesn't represent a huge quantity of data, but an optimisation
// here would be making sure each data is sent only once to the client.
if req.since.IsEmpty() {
// If this is the initial sync, we don't need to check if a data has
// already been sent. Instead, we send the whole batch.
dataReq := &userapi.QueryAccountDataRequest{
UserID: userID,
}
dataRes := &userapi.QueryAccountDataResponse{}
if err := rp.userAPI.QueryAccountData(req.ctx, dataReq, dataRes); err != nil {
return nil, err
}
for datatype, databody := range dataRes.GlobalAccountData {
data.AccountData.Events = append(
data.AccountData.Events,
gomatrixserverlib.ClientEvent{
Type: datatype,
Content: gomatrixserverlib.RawJSON(databody),
},
)
}
for r, j := range data.Rooms.Join {
for datatype, databody := range dataRes.RoomAccountData[r] {
j.AccountData.Events = append(
j.AccountData.Events,
gomatrixserverlib.ClientEvent{
Type: datatype,
Content: gomatrixserverlib.RawJSON(databody),
},
)
data.Rooms.Join[r] = j
}
}
return data, nil
}
r := types.Range{
From: req.since.PDUPosition,
To: currentPos,
}
// If both positions are the same, it means that the data was saved after the
// latest room event. In that case, we need to decrement the old position as
// results are exclusive of Low.
if r.Low() == r.High() {
r.From--
}
// Sync is not initial, get all account data since the latest sync
dataTypes, err := rp.db.GetAccountDataInRange(
req.ctx, userID, r, accountDataFilter,
)
if err != nil {
return nil, fmt.Errorf("rp.db.GetAccountDataInRange: %w", err)
}
if len(dataTypes) == 0 {
// TODO: this fixes the sytest but is it the right thing to do?
dataTypes[""] = []string{"m.push_rules"}
}
// Iterate over the rooms
for roomID, dataTypes := range dataTypes {
// Request the missing data from the database
for _, dataType := range dataTypes {
dataReq := userapi.QueryAccountDataRequest{
UserID: userID,
RoomID: roomID,
DataType: dataType,
}
dataRes := userapi.QueryAccountDataResponse{}
err = rp.userAPI.QueryAccountData(req.ctx, &dataReq, &dataRes)
if err != nil {
continue
}
if roomID == "" {
if globalData, ok := dataRes.GlobalAccountData[dataType]; ok {
data.AccountData.Events = append(
data.AccountData.Events,
gomatrixserverlib.ClientEvent{
Type: dataType,
Content: gomatrixserverlib.RawJSON(globalData),
},
)
}
} else {
if roomData, ok := dataRes.RoomAccountData[roomID][dataType]; ok {
joinData := data.Rooms.Join[roomID]
joinData.AccountData.Events = append(
joinData.AccountData.Events,
gomatrixserverlib.ClientEvent{
Type: dataType,
Content: gomatrixserverlib.RawJSON(roomData),
},
)
data.Rooms.Join[roomID] = joinData
}
}
}
}
return data, nil
}
// shouldReturnImmediately returns whether the /sync request is an initial sync,
// or timeout=0, or full_state=true, in any of the cases the request should
// return immediately.
func (rp *RequestPool) shouldReturnImmediately(syncReq *syncRequest) bool {
if syncReq.since.IsEmpty() || syncReq.timeout == 0 || syncReq.wantFullState {
func (rp *RequestPool) shouldReturnImmediately(syncReq *types.SyncRequest) bool {
if syncReq.Since.IsEmpty() || syncReq.Timeout == 0 || syncReq.WantFullState {
return true
}
waiting, werr := rp.db.SendToDeviceUpdatesWaiting(context.TODO(), syncReq.device.UserID, syncReq.device.ID)
return werr == nil && waiting
return false
}

View file

@ -20,6 +20,7 @@ import (
"github.com/gorilla/mux"
"github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/eduserver/cache"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
@ -28,8 +29,10 @@ import (
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/dendrite/syncapi/consumers"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/routing"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/streams"
"github.com/matrix-org/dendrite/syncapi/sync"
)
@ -50,57 +53,54 @@ func AddPublicRoutes(
logrus.WithError(err).Panicf("failed to connect to sync db")
}
pos, err := syncDB.SyncPosition(context.Background())
if err != nil {
logrus.WithError(err).Panicf("failed to get sync position")
eduCache := cache.New()
streams := streams.NewSyncStreamProviders(syncDB, userAPI, rsAPI, keyAPI, eduCache)
notifier := notifier.NewNotifier(streams.Latest(context.Background()))
if err = notifier.Load(context.Background(), syncDB); err != nil {
logrus.WithError(err).Panicf("failed to load notifier ")
}
notifier := sync.NewNotifier(pos)
err = notifier.Load(context.Background(), syncDB)
if err != nil {
logrus.WithError(err).Panicf("failed to start notifier")
}
requestPool := sync.NewRequestPool(syncDB, cfg, notifier, userAPI, keyAPI, rsAPI)
requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier)
keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer(
cfg.Matrix.ServerName, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputKeyChangeEvent)),
consumer, notifier, keyAPI, rsAPI, syncDB,
consumer, keyAPI, rsAPI, syncDB, notifier, streams.DeviceListStreamProvider,
)
if err = keyChangeConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start key change consumer")
}
roomConsumer := consumers.NewOutputRoomEventConsumer(
cfg, consumer, notifier, syncDB, rsAPI,
cfg, consumer, syncDB, notifier, streams.PDUStreamProvider,
streams.InviteStreamProvider, rsAPI,
)
if err = roomConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start room server consumer")
}
clientConsumer := consumers.NewOutputClientDataConsumer(
cfg, consumer, notifier, syncDB,
cfg, consumer, syncDB, notifier, streams.AccountDataStreamProvider,
)
if err = clientConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start client data consumer")
}
typingConsumer := consumers.NewOutputTypingEventConsumer(
cfg, consumer, notifier, syncDB,
cfg, consumer, syncDB, eduCache, notifier, streams.TypingStreamProvider,
)
if err = typingConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start typing consumer")
}
sendToDeviceConsumer := consumers.NewOutputSendToDeviceEventConsumer(
cfg, consumer, notifier, syncDB,
cfg, consumer, syncDB, notifier, streams.SendToDeviceStreamProvider,
)
if err = sendToDeviceConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start send-to-device consumer")
}
receiptConsumer := consumers.NewOutputReceiptEventConsumer(
cfg, consumer, notifier, syncDB,
cfg, consumer, syncDB, notifier, streams.ReceiptStreamProvider,
)
if err = receiptConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start receipts consumer")

53
syncapi/types/provider.go Normal file
View file

@ -0,0 +1,53 @@
package types
import (
"context"
"time"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
)
type SyncRequest struct {
Context context.Context
Log *logrus.Entry
Device *userapi.Device
Response *Response
Filter gomatrixserverlib.EventFilter
Since StreamingToken
Limit int
Timeout time.Duration
WantFullState bool
// Updated by the PDU stream.
Rooms map[string]string
}
type StreamProvider interface {
Setup()
// Advance will update the latest position of the stream based on
// an update and will wake callers waiting on StreamNotifyAfter.
Advance(latest StreamPosition)
// CompleteSync will update the response to include all updates as needed
// for a complete sync. It will always return immediately.
CompleteSync(ctx context.Context, req *SyncRequest) StreamPosition
// IncrementalSync will update the response to include all updates between
// the from and to sync positions. It will always return immediately,
// making no changes if the range contains no updates.
IncrementalSync(ctx context.Context, req *SyncRequest, from, to StreamPosition) StreamPosition
// LatestPosition returns the latest stream position for this stream.
LatestPosition(ctx context.Context) StreamPosition
}
type PartitionedStreamProvider interface {
Setup()
Advance(latest LogPosition)
CompleteSync(ctx context.Context, req *SyncRequest) LogPosition
IncrementalSync(ctx context.Context, req *SyncRequest, from, to LogPosition) LogPosition
LatestPosition(ctx context.Context) LogPosition
}

View file

@ -35,6 +35,15 @@ var (
ErrInvalidSyncTokenLen = fmt.Errorf("Sync token has an invalid length")
)
type StateDelta struct {
RoomID string
StateEvents []*gomatrixserverlib.HeaderedEvent
Membership string
// The PDU stream position of the latest membership event for this user, if applicable.
// Can be 0 if there is no membership event in this delta.
MembershipPos StreamPosition
}
// StreamPosition represents the offset in the sync stream a client is at.
type StreamPosition int64
@ -114,6 +123,7 @@ type StreamingToken struct {
ReceiptPosition StreamPosition
SendToDevicePosition StreamPosition
InvitePosition StreamPosition
AccountDataPosition StreamPosition
DeviceListPosition LogPosition
}
@ -130,10 +140,10 @@ func (s *StreamingToken) UnmarshalText(text []byte) (err error) {
func (t StreamingToken) String() string {
posStr := fmt.Sprintf(
"s%d_%d_%d_%d_%d",
"s%d_%d_%d_%d_%d_%d",
t.PDUPosition, t.TypingPosition,
t.ReceiptPosition, t.SendToDevicePosition,
t.InvitePosition,
t.InvitePosition, t.AccountDataPosition,
)
if dl := t.DeviceListPosition; !dl.IsEmpty() {
posStr += fmt.Sprintf(".dl-%d-%d", dl.Partition, dl.Offset)
@ -154,6 +164,8 @@ func (t *StreamingToken) IsAfter(other StreamingToken) bool {
return true
case t.InvitePosition > other.InvitePosition:
return true
case t.AccountDataPosition > other.AccountDataPosition:
return true
case t.DeviceListPosition.IsAfter(&other.DeviceListPosition):
return true
}
@ -161,7 +173,7 @@ func (t *StreamingToken) IsAfter(other StreamingToken) bool {
}
func (t *StreamingToken) IsEmpty() bool {
return t == nil || t.PDUPosition+t.TypingPosition+t.ReceiptPosition+t.SendToDevicePosition+t.InvitePosition == 0 && t.DeviceListPosition.IsEmpty()
return t == nil || t.PDUPosition+t.TypingPosition+t.ReceiptPosition+t.SendToDevicePosition+t.InvitePosition+t.AccountDataPosition == 0 && t.DeviceListPosition.IsEmpty()
}
// WithUpdates returns a copy of the StreamingToken with updates applied from another StreamingToken.
@ -178,22 +190,25 @@ func (t *StreamingToken) WithUpdates(other StreamingToken) StreamingToken {
// streaming token contains any positions that are not 0, they are considered updates
// and will overwrite the value in the token.
func (t *StreamingToken) ApplyUpdates(other StreamingToken) {
if other.PDUPosition > 0 {
if other.PDUPosition > t.PDUPosition {
t.PDUPosition = other.PDUPosition
}
if other.TypingPosition > 0 {
if other.TypingPosition > t.TypingPosition {
t.TypingPosition = other.TypingPosition
}
if other.ReceiptPosition > 0 {
if other.ReceiptPosition > t.ReceiptPosition {
t.ReceiptPosition = other.ReceiptPosition
}
if other.SendToDevicePosition > 0 {
if other.SendToDevicePosition > t.SendToDevicePosition {
t.SendToDevicePosition = other.SendToDevicePosition
}
if other.InvitePosition > 0 {
if other.InvitePosition > t.InvitePosition {
t.InvitePosition = other.InvitePosition
}
if other.DeviceListPosition.Offset > 0 {
if other.AccountDataPosition > t.AccountDataPosition {
t.AccountDataPosition = other.AccountDataPosition
}
if other.DeviceListPosition.IsAfter(&t.DeviceListPosition) {
t.DeviceListPosition = other.DeviceListPosition
}
}
@ -286,7 +301,7 @@ func NewStreamTokenFromString(tok string) (token StreamingToken, err error) {
}
categories := strings.Split(tok[1:], ".")
parts := strings.Split(categories[0], "_")
var positions [5]StreamPosition
var positions [6]StreamPosition
for i, p := range parts {
if i > len(positions) {
break
@ -304,6 +319,7 @@ func NewStreamTokenFromString(tok string) (token StreamingToken, err error) {
ReceiptPosition: positions[2],
SendToDevicePosition: positions[3],
InvitePosition: positions[4],
AccountDataPosition: positions[5],
}
// dl-0-1234
// $log_name-$partition-$offset

View file

@ -10,10 +10,10 @@ import (
func TestNewSyncTokenWithLogs(t *testing.T) {
tests := map[string]*StreamingToken{
"s4_0_0_0_0": {
"s4_0_0_0_0_0": {
PDUPosition: 4,
},
"s4_0_0_0_0.dl-0-123": {
"s4_0_0_0_0_0.dl-0-123": {
PDUPosition: 4,
DeviceListPosition: LogPosition{
Partition: 0,
@ -42,10 +42,10 @@ func TestNewSyncTokenWithLogs(t *testing.T) {
func TestSyncTokens(t *testing.T) {
shouldPass := map[string]string{
"s4_0_0_0_0": StreamingToken{4, 0, 0, 0, 0, LogPosition{}}.String(),
"s3_1_0_0_0.dl-1-2": StreamingToken{3, 1, 0, 0, 0, LogPosition{1, 2}}.String(),
"s3_1_2_3_5": StreamingToken{3, 1, 2, 3, 5, LogPosition{}}.String(),
"t3_1": TopologyToken{3, 1}.String(),
"s4_0_0_0_0_0": StreamingToken{4, 0, 0, 0, 0, 0, LogPosition{}}.String(),
"s3_1_0_0_0_0.dl-1-2": StreamingToken{3, 1, 0, 0, 0, 0, LogPosition{1, 2}}.String(),
"s3_1_2_3_5_0": StreamingToken{3, 1, 2, 3, 5, 0, LogPosition{}}.String(),
"t3_1": TopologyToken{3, 1}.String(),
}
for a, b := range shouldPass {