Streams package

This commit is contained in:
Neil Alexander 2021-01-07 11:09:52 +00:00
parent 98707e1554
commit 6929b8a4ec
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
26 changed files with 336 additions and 319 deletions

View file

@ -113,19 +113,6 @@ func (t *EDUCache) AddTypingUser(
return t.GetLatestSyncPosition()
}
// AddSendToDeviceMessage increases the sync position for
// send-to-device updates.
// Returns the sync position before update, as the caller
// will use this to record the current stream position
// at the time that the send-to-device message was sent.
func (t *EDUCache) AddSendToDeviceMessage() int64 {
t.Lock()
defer t.Unlock()
latestSyncPosition := t.latestSyncPosition
t.latestSyncPosition++
return latestSyncPosition
}
// addUser with mutex lock & replace the previous timer.
// Returns the latest typing sync position after update.
func (t *EDUCache) addUser(

View file

@ -23,6 +23,7 @@ import (
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/streams"
log "github.com/sirupsen/logrus"
)
@ -30,6 +31,7 @@ import (
type OutputClientDataConsumer struct {
clientAPIConsumer *internal.ContinualConsumer
db storage.Database
streams *streams.Streams
}
// NewOutputClientDataConsumer creates a new OutputClientData consumer. Call Start() to begin consuming from room servers.
@ -37,6 +39,7 @@ func NewOutputClientDataConsumer(
cfg *config.SyncAPI,
kafkaConsumer sarama.Consumer,
store storage.Database,
streams *streams.Streams,
) *OutputClientDataConsumer {
consumer := internal.ContinualConsumer{
@ -48,6 +51,7 @@ func NewOutputClientDataConsumer(
s := &OutputClientDataConsumer{
clientAPIConsumer: &consumer,
db: store,
streams: streams,
}
consumer.ProcessMessage = s.onMessage

View file

@ -23,6 +23,7 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/streams"
log "github.com/sirupsen/logrus"
)
@ -30,6 +31,7 @@ import (
type OutputReceiptEventConsumer struct {
receiptConsumer *internal.ContinualConsumer
db storage.Database
streams *streams.Streams
}
// NewOutputReceiptEventConsumer creates a new OutputReceiptEventConsumer.
@ -38,6 +40,7 @@ func NewOutputReceiptEventConsumer(
cfg *config.SyncAPI,
kafkaConsumer sarama.Consumer,
store storage.Database,
streams *streams.Streams,
) *OutputReceiptEventConsumer {
consumer := internal.ContinualConsumer{
@ -50,6 +53,7 @@ func NewOutputReceiptEventConsumer(
s := &OutputReceiptEventConsumer{
receiptConsumer: &consumer,
db: store,
streams: streams,
}
consumer.ProcessMessage = s.onMessage
@ -82,7 +86,7 @@ func (s *OutputReceiptEventConsumer) onMessage(msg *sarama.ConsumerMessage) erro
return err
}
s.db.TypingStream().Advance(streamPos)
s.streams.TypingStreamProvider.Advance(streamPos)
return nil
}

View file

@ -23,6 +23,7 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/streams"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
log "github.com/sirupsen/logrus"
@ -33,6 +34,7 @@ type OutputSendToDeviceEventConsumer struct {
sendToDeviceConsumer *internal.ContinualConsumer
db storage.Database
serverName gomatrixserverlib.ServerName // our server name
streams *streams.Streams
}
// NewOutputSendToDeviceEventConsumer creates a new OutputSendToDeviceEventConsumer.
@ -41,6 +43,7 @@ func NewOutputSendToDeviceEventConsumer(
cfg *config.SyncAPI,
kafkaConsumer sarama.Consumer,
store storage.Database,
streams *streams.Streams,
) *OutputSendToDeviceEventConsumer {
consumer := internal.ContinualConsumer{
@ -54,6 +57,7 @@ func NewOutputSendToDeviceEventConsumer(
sendToDeviceConsumer: &consumer,
db: store,
serverName: cfg.Matrix.ServerName,
streams: streams,
}
consumer.ProcessMessage = s.onMessage
@ -97,7 +101,7 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(msg *sarama.ConsumerMessage)
return err
}
s.db.SendToDeviceStream().Advance(streamPos)
s.streams.SendToDeviceStreamProvider.Advance(streamPos)
return nil
}

View file

@ -19,9 +19,11 @@ import (
"github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/streams"
"github.com/matrix-org/dendrite/syncapi/types"
log "github.com/sirupsen/logrus"
)
@ -29,7 +31,8 @@ import (
// OutputTypingEventConsumer consumes events that originated in the EDU server.
type OutputTypingEventConsumer struct {
typingConsumer *internal.ContinualConsumer
db storage.Database
eduCache *cache.EDUCache
streams *streams.Streams
}
// NewOutputTypingEventConsumer creates a new OutputTypingEventConsumer.
@ -38,6 +41,8 @@ func NewOutputTypingEventConsumer(
cfg *config.SyncAPI,
kafkaConsumer sarama.Consumer,
store storage.Database,
eduCache *cache.EDUCache,
streams *streams.Streams,
) *OutputTypingEventConsumer {
consumer := internal.ContinualConsumer{
@ -49,7 +54,8 @@ func NewOutputTypingEventConsumer(
s := &OutputTypingEventConsumer{
typingConsumer: &consumer,
db: store,
eduCache: eduCache,
streams: streams,
}
consumer.ProcessMessage = s.onMessage
@ -59,10 +65,11 @@ func NewOutputTypingEventConsumer(
// Start consuming from EDU api
func (s *OutputTypingEventConsumer) Start() error {
s.db.SetTypingTimeoutCallback(func(userID, roomID string, latestSyncPosition int64) {
s.db.TypingStream().Advance(types.StreamPosition(latestSyncPosition))
})
/*
s.eduCache.SetTypingTimeoutCallback(func(userID, roomID string, latestSyncPosition int64) {
s.eduCache.TypingStream().Advance(types.StreamPosition(latestSyncPosition))
})
*/
return s.typingConsumer.Start()
}
@ -83,12 +90,16 @@ func (s *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error
var typingPos types.StreamPosition
typingEvent := output.Event
if typingEvent.Typing {
typingPos = s.db.AddTypingUser(typingEvent.UserID, typingEvent.RoomID, output.ExpireTime)
typingPos = types.StreamPosition(
s.eduCache.AddTypingUser(typingEvent.UserID, typingEvent.RoomID, output.ExpireTime),
)
} else {
typingPos = s.db.RemoveTypingUser(typingEvent.UserID, typingEvent.RoomID)
typingPos = types.StreamPosition(
s.eduCache.RemoveUser(typingEvent.UserID, typingEvent.RoomID),
)
}
s.db.TypingStream().Advance(typingPos)
s.streams.TypingStreamProvider.Advance(typingPos)
return nil
}

View file

@ -24,6 +24,7 @@ import (
"github.com/matrix-org/dendrite/keyserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/streams"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus"
@ -33,6 +34,7 @@ import (
type OutputKeyChangeEventConsumer struct {
keyChangeConsumer *internal.ContinualConsumer
db storage.Database
streams *streams.Streams
serverName gomatrixserverlib.ServerName // our server name
rsAPI roomserverAPI.RoomserverInternalAPI
keyAPI api.KeyInternalAPI
@ -49,6 +51,7 @@ func NewOutputKeyChangeEventConsumer(
keyAPI api.KeyInternalAPI,
rsAPI roomserverAPI.RoomserverInternalAPI,
store storage.Database,
streams *streams.Streams,
) *OutputKeyChangeEventConsumer {
consumer := internal.ContinualConsumer{
@ -66,6 +69,7 @@ func NewOutputKeyChangeEventConsumer(
rsAPI: rsAPI,
partitionToOffset: make(map[int32]int64),
partitionToOffsetMu: sync.Mutex{},
streams: streams,
}
consumer.ProcessMessage = s.onMessage
@ -115,7 +119,7 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er
Partition: msg.Partition,
}
s.db.DeviceListStream().Advance(posUpdate)
s.streams.DeviceListStreamProvider.Advance(posUpdate)
//for userID := range queryRes.UserIDsToCount {
// s.notifier.OnNewKeyChange(posUpdate, userID, output.UserID)

View file

@ -24,6 +24,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/streams"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus"
@ -35,6 +36,7 @@ type OutputRoomEventConsumer struct {
rsAPI api.RoomserverInternalAPI
rsConsumer *internal.ContinualConsumer
db storage.Database
streams *streams.Streams
}
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
@ -42,6 +44,7 @@ func NewOutputRoomEventConsumer(
cfg *config.SyncAPI,
kafkaConsumer sarama.Consumer,
store storage.Database,
streams *streams.Streams,
rsAPI api.RoomserverInternalAPI,
) *OutputRoomEventConsumer {
@ -55,6 +58,7 @@ func NewOutputRoomEventConsumer(
cfg: cfg,
rsConsumer: &consumer,
db: store,
streams: streams,
rsAPI: rsAPI,
}
consumer.ProcessMessage = s.onMessage
@ -176,7 +180,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
return err
}
s.db.PDUStream().Advance(pduPos)
s.streams.PDUStreamProvider.Advance(pduPos)
return nil
}
@ -215,7 +219,7 @@ func (s *OutputRoomEventConsumer) onOldRoomEvent(
return err
}
s.db.PDUStream().Advance(pduPos)
s.streams.PDUStreamProvider.Advance(pduPos)
return nil
}
@ -271,7 +275,7 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent(
return nil
}
s.db.InviteStream().Advance(pduPos)
s.streams.InviteStreamProvider.Advance(pduPos)
return nil
}
@ -291,7 +295,7 @@ func (s *OutputRoomEventConsumer) onRetireInviteEvent(
// Notify any active sync requests that the invite has been retired.
// Invites share the same stream counter as PDUs
s.db.InviteStream().Advance(pduPos)
s.streams.InviteStreamProvider.Advance(pduPos)
return nil
}

View file

@ -16,11 +16,10 @@ package storage
import (
"context"
"time"
"database/sql"
eduAPI "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/types"
@ -31,14 +30,25 @@ import (
type Database interface {
internal.PartitionStorer
PDUStream() types.StreamProvider
PDUTopology() types.TopologyProvider
TypingStream() types.StreamProvider
ReceiptStream() types.StreamProvider
InviteStream() types.StreamProvider
SendToDeviceStream() types.StreamProvider
AccountDataStream() types.StreamProvider
DeviceListStream() types.StreamLogProvider
ReadOnlySnapshot(ctx context.Context) (*sql.Tx, error)
MaxStreamTokenForPDUs(ctx context.Context) (types.StreamPosition, error)
MaxStreamTokenForReceipts(ctx context.Context) (types.StreamPosition, error)
MaxStreamTokenForInvites(ctx context.Context) (types.StreamPosition, error)
CurrentState(ctx context.Context, txn *sql.Tx, roomID string, stateFilterPart *gomatrixserverlib.StateFilter) ([]*gomatrixserverlib.HeaderedEvent, error)
GetStateDeltasForFullStateSync(ctx context.Context, device *userapi.Device, txn *sql.Tx, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter) ([]types.StateDelta, []string, error)
GetStateDeltas(ctx context.Context, device *userapi.Device, txn *sql.Tx, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter) ([]types.StateDelta, []string, error)
RoomIDsWithMembership(ctx context.Context, txn *sql.Tx, userID string, membership string) ([]string, error)
RecentEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, limit int, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error)
GetBackwardTopologyPos(ctx context.Context, txn *sql.Tx, events []types.StreamEvent) (types.TopologyToken, error)
PositionInTopology(ctx context.Context, txn *sql.Tx, eventID string) (pos types.StreamPosition, spos types.StreamPosition, err error)
InviteEventsInRange(ctx context.Context, txn *sql.Tx, targetUserID string, r types.Range) (map[string]*gomatrixserverlib.HeaderedEvent, map[string]*gomatrixserverlib.HeaderedEvent, error)
PeeksInRange(ctx context.Context, txn *sql.Tx, userID, deviceID string, r types.Range) (peeks []types.Peek, err error)
RoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []eduAPI.OutputReceiptEvent, error)
// AllJoinedUsersInRooms returns a map of room ID to a list of all joined user IDs.
AllJoinedUsersInRooms(ctx context.Context) (map[string][]string, error)
@ -95,15 +105,6 @@ type Database interface {
// DeletePeek deletes all peeks for a given room by a given user
// Returns an error if there was a problem communicating with the database.
DeletePeeks(ctx context.Context, RoomID, UserID string) (types.StreamPosition, error)
// SetTypingTimeoutCallback sets a callback function that is called right after
// a user is removed from the typing user list due to timeout.
SetTypingTimeoutCallback(fn cache.TimeoutCallbackFn)
// AddTypingUser adds a typing user to the typing cache.
// Returns the newly calculated sync position for typing notifications.
AddTypingUser(userID, roomID string, expireTime *time.Time) types.StreamPosition
// RemoveTypingUser removes a typing user from the typing cache.
// Returns the newly calculated sync position for typing notifications.
RemoveTypingUser(userID, roomID string) types.StreamPosition
// GetEventsInStreamingRange retrieves all of the events on a given ordering using the given extremities and limit.
GetEventsInStreamingRange(ctx context.Context, from, to *types.StreamingToken, roomID string, limit int, backwardOrdering bool) (events []types.StreamEvent, err error)
// GetEventsInTopologicalRange retrieves all of the events on a given ordering using the given extremities and limit.
@ -118,8 +119,6 @@ type Database interface {
// matches the streamevent.transactionID device then the transaction ID gets
// added to the unsigned section of the output event.
StreamEventsToEvents(device *userapi.Device, in []types.StreamEvent) []*gomatrixserverlib.HeaderedEvent
// AddSendToDevice increases the EDU position in the cache and returns the stream position.
AddSendToDevice() types.StreamPosition
// SendToDeviceUpdatesForSync returns a list of send-to-device updates. It returns three lists:
// - "events": a list of send-to-device events that should be included in the sync
// - "changes": a list of send-to-device events that should be updated in the database by

View file

@ -20,12 +20,10 @@ import (
// Import the postgres database driver.
_ "github.com/lib/pq"
"github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/storage/postgres/deltas"
"github.com/matrix-org/dendrite/syncapi/storage/shared"
userapi "github.com/matrix-org/dendrite/userapi/api"
)
// SyncServerDatasource represents a sync server datasource which manages
@ -39,7 +37,7 @@ type SyncServerDatasource struct {
// NewDatabase creates a new sync server database
// nolint:gocyclo
func NewDatabase(dbProperties *config.DatabaseOptions, userAPI userapi.UserInternalAPI) (*SyncServerDatasource, error) {
func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, error) {
var d SyncServerDatasource
var err error
if d.db, err = sqlutil.Open(dbProperties); err != nil {
@ -107,8 +105,6 @@ func NewDatabase(dbProperties *config.DatabaseOptions, userAPI userapi.UserInter
Filter: filter,
SendToDevice: sendToDevice,
Receipts: receipts,
EDUCache: cache.New(),
}
d.Database.ConfigureProviders(userAPI)
return &d, nil
}

View file

@ -19,12 +19,10 @@ import (
"database/sql"
"encoding/json"
"fmt"
"time"
eduAPI "github.com/matrix-org/dendrite/eduserver/api"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/api"
@ -49,74 +47,70 @@ type Database struct {
SendToDevice tables.SendToDevice
Filter tables.Filter
Receipts tables.Receipts
EDUCache *cache.EDUCache
PDUStreamProvider types.StreamProvider
PDUTopologyProvider types.TopologyProvider
TypingStreamProvider types.StreamProvider
ReceiptStreamProvider types.StreamProvider
InviteStreamProvider types.StreamProvider
SendToDeviceStreamProvider types.StreamProvider
AccountDataStreamProvider types.StreamProvider
DeviceListStreamProvider types.StreamLogProvider
}
// ConfigureProviders creates instances of the various
// stream and topology providers provided by the storage
// packages.
func (d *Database) ConfigureProviders(userAPI userapi.UserInternalAPI) {
d.PDUStreamProvider = &PDUStreamProvider{StreamProvider{DB: d}}
d.TypingStreamProvider = &TypingStreamProvider{StreamProvider{DB: d}}
d.ReceiptStreamProvider = &ReceiptStreamProvider{StreamProvider{DB: d}}
d.InviteStreamProvider = &InviteStreamProvider{StreamProvider{DB: d}}
d.SendToDeviceStreamProvider = &SendToDeviceStreamProvider{StreamProvider{DB: d}}
d.AccountDataStreamProvider = &AccountDataStreamProvider{
StreamProvider: StreamProvider{DB: d},
userAPI: userAPI,
func (d *Database) ReadOnlySnapshot(ctx context.Context) (*sql.Tx, error) {
return d.DB.BeginTx(ctx, &sql.TxOptions{
// Set the isolation level so that we see a snapshot of the database.
// In PostgreSQL repeatable read transactions will see a snapshot taken
// at the first query, and since the transaction is read-only it can't
// run into any serialisation errors.
// https://www.postgresql.org/docs/9.5/static/transaction-iso.html#XACT-REPEATABLE-READ
Isolation: sql.LevelRepeatableRead,
ReadOnly: true,
})
}
func (d *Database) MaxStreamTokenForPDUs(ctx context.Context) (types.StreamPosition, error) {
id, err := d.OutputEvents.SelectMaxEventID(ctx, nil)
if err != nil {
return 0, fmt.Errorf("d.OutputEvents.SelectMaxEventID: %w", err)
}
d.DeviceListStreamProvider = &DeviceListStreamProvider{StreamLogProvider{DB: d}}
d.PDUStreamProvider.Setup()
d.TypingStreamProvider.Setup()
d.ReceiptStreamProvider.Setup()
d.InviteStreamProvider.Setup()
d.SendToDeviceStreamProvider.Setup()
d.AccountDataStreamProvider.Setup()
d.DeviceListStreamProvider.Setup()
d.PDUTopologyProvider = &PDUTopologyProvider{DB: d}
return types.StreamPosition(id), nil
}
func (d *Database) PDUStream() types.StreamProvider {
return d.PDUStreamProvider
func (d *Database) MaxStreamTokenForReceipts(ctx context.Context) (types.StreamPosition, error) {
id, err := d.Receipts.SelectMaxReceiptID(ctx, nil)
if err != nil {
return 0, fmt.Errorf("d.Receipts.SelectMaxReceiptID: %w", err)
}
return types.StreamPosition(id), nil
}
func (d *Database) PDUTopology() types.TopologyProvider {
return d.PDUTopologyProvider
func (d *Database) MaxStreamTokenForInvites(ctx context.Context) (types.StreamPosition, error) {
id, err := d.Invites.SelectMaxInviteID(ctx, nil)
if err != nil {
return 0, fmt.Errorf("d.Invites.SelectMaxInviteID: %w", err)
}
return types.StreamPosition(id), nil
}
func (d *Database) TypingStream() types.StreamProvider {
return d.TypingStreamProvider
func (d *Database) CurrentState(ctx context.Context, txn *sql.Tx, roomID string, stateFilterPart *gomatrixserverlib.StateFilter) ([]*gomatrixserverlib.HeaderedEvent, error) {
return d.CurrentRoomState.SelectCurrentState(ctx, txn, roomID, stateFilterPart)
}
func (d *Database) ReceiptStream() types.StreamProvider {
return d.ReceiptStreamProvider
func (d *Database) RoomIDsWithMembership(ctx context.Context, txn *sql.Tx, userID string, membership string) ([]string, error) {
return d.CurrentRoomState.SelectRoomIDsWithMembership(ctx, txn, userID, membership)
}
func (d *Database) InviteStream() types.StreamProvider {
return d.InviteStreamProvider
func (d *Database) RecentEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, limit int, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error) {
return d.OutputEvents.SelectRecentEvents(ctx, txn, roomID, r, limit, chronologicalOrder, onlySyncEvents)
}
func (d *Database) SendToDeviceStream() types.StreamProvider {
return d.SendToDeviceStreamProvider
func (d *Database) PositionInTopology(ctx context.Context, txn *sql.Tx, eventID string) (pos types.StreamPosition, spos types.StreamPosition, err error) {
return d.Topology.SelectPositionInTopology(ctx, txn, eventID)
}
func (d *Database) AccountDataStream() types.StreamProvider {
return d.AccountDataStreamProvider
func (d *Database) InviteEventsInRange(ctx context.Context, txn *sql.Tx, targetUserID string, r types.Range) (map[string]*gomatrixserverlib.HeaderedEvent, map[string]*gomatrixserverlib.HeaderedEvent, error) {
return d.Invites.SelectInviteEventsInRange(ctx, txn, targetUserID, r)
}
func (d *Database) DeviceListStream() types.StreamLogProvider {
return d.DeviceListStreamProvider
func (d *Database) PeeksInRange(ctx context.Context, txn *sql.Tx, userID, deviceID string, r types.Range) (peeks []types.Peek, err error) {
return d.Peeks.SelectPeeksInRange(ctx, txn, userID, deviceID, r)
}
func (d *Database) RoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []eduAPI.OutputReceiptEvent, error) {
return d.Receipts.SelectRoomReceiptsAfter(ctx, roomIDs, streamPos)
}
// Events lookups a list of event by their event ID.
@ -166,6 +160,7 @@ func (d *Database) GetEventsInStreamingRange(
return events, err
}
/*
func (d *Database) AddTypingUser(
userID, roomID string, expireTime *time.Time,
) types.StreamPosition {
@ -178,13 +173,16 @@ func (d *Database) RemoveTypingUser(
return types.StreamPosition(d.EDUCache.RemoveUser(userID, roomID))
}
func (d *Database) AddSendToDevice() types.StreamPosition {
return types.StreamPosition(d.EDUCache.AddSendToDeviceMessage())
}
func (d *Database) SetTypingTimeoutCallback(fn cache.TimeoutCallbackFn) {
d.EDUCache.SetTimeoutCallback(fn)
}
*/
/*
func (d *Database) AddSendToDevice() types.StreamPosition {
return types.StreamPosition(d.EDUCache.AddSendToDeviceMessage())
}
*/
func (d *Database) AllJoinedUsersInRooms(ctx context.Context) (map[string][]string, error) {
return d.CurrentRoomState.SelectJoinedUsers(ctx)
@ -552,7 +550,7 @@ func (d *Database) RedactEvent(ctx context.Context, redactedEventID string, reda
// Retrieve the backward topology position, i.e. the position of the
// oldest event in the room's topology.
func (d *Database) getBackwardTopologyPos(
func (d *Database) GetBackwardTopologyPos(
ctx context.Context, txn *sql.Tx,
events []types.StreamEvent,
) (types.TopologyToken, error) {
@ -576,7 +574,7 @@ func (d *Database) addRoomDeltaToResponse(
device *userapi.Device,
txn *sql.Tx,
r types.Range,
delta stateDelta,
delta types.StateDelta,
numRecentEventsPerRoom int,
res *types.Response,
) error {
@ -740,11 +738,11 @@ func (d *Database) fetchMissingStateEvents(
// the user has new membership events.
// A list of joined room IDs is also returned in case the caller needs it.
// nolint:gocyclo
func (d *Database) getStateDeltas(
func (d *Database) GetStateDeltas(
ctx context.Context, device *userapi.Device, txn *sql.Tx,
r types.Range, userID string,
stateFilter *gomatrixserverlib.StateFilter,
) ([]stateDelta, []string, error) {
) ([]types.StateDelta, []string, error) {
// Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821
// - Get membership list changes for this user in this sync response
// - For each room which has membership list changes:
@ -753,7 +751,7 @@ func (d *Database) getStateDeltas(
// * Check if user is still CURRENTLY invited to the room. If so, add room to 'invited' block.
// * Check if the user is CURRENTLY (TODO) left/banned. If so, add room to 'archived' block.
// - Get all CURRENTLY joined rooms, and add them to 'joined' block.
var deltas []stateDelta
var deltas []types.StateDelta
// get all the state events ever (i.e. for all available rooms) between these two positions
stateNeeded, eventMap, err := d.OutputEvents.SelectStateInRange(ctx, txn, r, stateFilter)
@ -784,10 +782,10 @@ func (d *Database) getStateDeltas(
state[peek.RoomID] = s
}
if !peek.Deleted {
deltas = append(deltas, stateDelta{
membership: gomatrixserverlib.Peek,
stateEvents: d.StreamEventsToEvents(device, state[peek.RoomID]),
roomID: peek.RoomID,
deltas = append(deltas, types.StateDelta{
Membership: gomatrixserverlib.Peek,
StateEvents: d.StreamEventsToEvents(device, state[peek.RoomID]),
RoomID: peek.RoomID,
})
}
}
@ -812,11 +810,11 @@ func (d *Database) getStateDeltas(
continue // we'll add this room in when we do joined rooms
}
deltas = append(deltas, stateDelta{
membership: membership,
membershipPos: ev.StreamPosition,
stateEvents: d.StreamEventsToEvents(device, stateStreamEvents),
roomID: roomID,
deltas = append(deltas, types.StateDelta{
Membership: membership,
MembershipPos: ev.StreamPosition,
StateEvents: d.StreamEventsToEvents(device, stateStreamEvents),
RoomID: roomID,
})
break
}
@ -829,10 +827,10 @@ func (d *Database) getStateDeltas(
return nil, nil, err
}
for _, joinedRoomID := range joinedRoomIDs {
deltas = append(deltas, stateDelta{
membership: gomatrixserverlib.Join,
stateEvents: d.StreamEventsToEvents(device, state[joinedRoomID]),
roomID: joinedRoomID,
deltas = append(deltas, types.StateDelta{
Membership: gomatrixserverlib.Join,
StateEvents: d.StreamEventsToEvents(device, state[joinedRoomID]),
RoomID: joinedRoomID,
})
}
@ -844,13 +842,13 @@ func (d *Database) getStateDeltas(
// Fetches full state for all joined rooms and uses selectStateInRange to get
// updates for other rooms.
// nolint:gocyclo
func (d *Database) getStateDeltasForFullStateSync(
func (d *Database) GetStateDeltasForFullStateSync(
ctx context.Context, device *userapi.Device, txn *sql.Tx,
r types.Range, userID string,
stateFilter *gomatrixserverlib.StateFilter,
) ([]stateDelta, []string, error) {
) ([]types.StateDelta, []string, error) {
// Use a reasonable initial capacity
deltas := make(map[string]stateDelta)
deltas := make(map[string]types.StateDelta)
peeks, err := d.Peeks.SelectPeeksInRange(ctx, txn, userID, device.ID, r)
if err != nil {
@ -864,10 +862,10 @@ func (d *Database) getStateDeltasForFullStateSync(
if stateErr != nil {
return nil, nil, stateErr
}
deltas[peek.RoomID] = stateDelta{
membership: gomatrixserverlib.Peek,
stateEvents: d.StreamEventsToEvents(device, s),
roomID: peek.RoomID,
deltas[peek.RoomID] = types.StateDelta{
Membership: gomatrixserverlib.Peek,
StateEvents: d.StreamEventsToEvents(device, s),
RoomID: peek.RoomID,
}
}
}
@ -886,11 +884,11 @@ func (d *Database) getStateDeltasForFullStateSync(
for _, ev := range stateStreamEvents {
if membership := getMembershipFromEvent(ev.Event, userID); membership != "" {
if membership != gomatrixserverlib.Join { // We've already added full state for all joined rooms above.
deltas[roomID] = stateDelta{
membership: membership,
membershipPos: ev.StreamPosition,
stateEvents: d.StreamEventsToEvents(device, stateStreamEvents),
roomID: roomID,
deltas[roomID] = types.StateDelta{
Membership: membership,
MembershipPos: ev.StreamPosition,
StateEvents: d.StreamEventsToEvents(device, stateStreamEvents),
RoomID: roomID,
}
}
@ -910,15 +908,15 @@ func (d *Database) getStateDeltasForFullStateSync(
if stateErr != nil {
return nil, nil, stateErr
}
deltas[joinedRoomID] = stateDelta{
membership: gomatrixserverlib.Join,
stateEvents: d.StreamEventsToEvents(device, s),
roomID: joinedRoomID,
deltas[joinedRoomID] = types.StateDelta{
Membership: gomatrixserverlib.Join,
StateEvents: d.StreamEventsToEvents(device, s),
RoomID: joinedRoomID,
}
}
// Create a response array.
result := make([]stateDelta, len(deltas))
result := make([]types.StateDelta, len(deltas))
i := 0
for _, delta := range deltas {
result[i] = delta
@ -1057,15 +1055,6 @@ func getMembershipFromEvent(ev *gomatrixserverlib.Event, userID string) string {
return membership
}
type stateDelta struct {
roomID string
stateEvents []*gomatrixserverlib.HeaderedEvent
membership string
// The PDU stream position of the latest membership event for this user, if applicable.
// Can be 0 if there is no membership event in this delta.
membershipPos types.StreamPosition
}
// StoreReceipt stores user receipts
func (d *Database) StoreReceipt(ctx context.Context, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error) {
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {

View file

@ -1,31 +0,0 @@
package shared
import (
"context"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
)
type PDUTopologyProvider struct {
DB *Database
}
func (p *PDUTopologyProvider) TopologyRange(ctx context.Context, res *types.Response, roomID string, from, to types.TopologyToken, filter gomatrixserverlib.EventFilter) {
backwardOrdering := from.Depth > to.Depth || from.PDUPosition > to.PDUPosition
events, err := p.DB.GetEventsInTopologicalRange(ctx, &from, &to, roomID, filter.Limit, backwardOrdering)
if err != nil {
return
}
_ = events
}
func (p *PDUTopologyProvider) TopologyLatestPosition(ctx context.Context, roomID string) types.TopologyToken {
token, err := p.DB.MaxTopologicalPosition(ctx, roomID)
if err != nil {
return types.TopologyToken{}
}
return token
}

View file

@ -21,12 +21,10 @@ import (
// Import the sqlite3 package
_ "github.com/mattn/go-sqlite3"
"github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/storage/shared"
"github.com/matrix-org/dendrite/syncapi/storage/sqlite3/deltas"
userapi "github.com/matrix-org/dendrite/userapi/api"
)
// SyncServerDatasource represents a sync server datasource which manages
@ -41,7 +39,7 @@ type SyncServerDatasource struct {
// NewDatabase creates a new sync server database
// nolint: gocyclo
func NewDatabase(dbProperties *config.DatabaseOptions, userAPI userapi.UserInternalAPI) (*SyncServerDatasource, error) {
func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, error) {
var d SyncServerDatasource
var err error
if d.db, err = sqlutil.Open(dbProperties); err != nil {
@ -51,7 +49,6 @@ func NewDatabase(dbProperties *config.DatabaseOptions, userAPI userapi.UserInter
if err = d.prepare(dbProperties); err != nil {
return nil, err
}
d.ConfigureProviders(userAPI)
return &d, nil
}
@ -121,7 +118,6 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er
Filter: filter,
SendToDevice: sendToDevice,
Receipts: receipts,
EDUCache: cache.New(),
}
return nil
}

View file

@ -22,16 +22,15 @@ import (
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/storage/postgres"
"github.com/matrix-org/dendrite/syncapi/storage/sqlite3"
userapi "github.com/matrix-org/dendrite/userapi/api"
)
// NewSyncServerDatasource opens a database connection.
func NewSyncServerDatasource(dbProperties *config.DatabaseOptions, userAPI userapi.UserInternalAPI) (Database, error) {
func NewSyncServerDatasource(dbProperties *config.DatabaseOptions) (Database, error) {
switch {
case dbProperties.ConnectionString.IsSQLite():
return sqlite3.NewDatabase(dbProperties, userAPI)
return sqlite3.NewDatabase(dbProperties)
case dbProperties.ConnectionString.IsPostgres():
return postgres.NewDatabase(dbProperties, userAPI)
return postgres.NewDatabase(dbProperties)
default:
return nil, fmt.Errorf("unexpected database type")
}

View file

@ -1,4 +1,4 @@
package shared
package streams
import (
"context"

View file

@ -1,4 +1,4 @@
package shared
package streams
import (
"context"
@ -16,12 +16,11 @@ func (p *InviteStreamProvider) Setup() {
p.latestMutex.Lock()
defer p.latestMutex.Unlock()
latest, err := p.DB.Invites.SelectMaxInviteID(context.Background(), nil)
id, err := p.DB.MaxStreamTokenForInvites(context.Background())
if err != nil {
return
}
p.latest = types.StreamPosition(latest)
p.latest = id
}
func (p *InviteStreamProvider) CompleteSync(
@ -41,7 +40,7 @@ func (p *InviteStreamProvider) IncrementalSync(
To: to,
}
invites, retiredInvites, err := p.DB.Invites.SelectInviteEventsInRange(
invites, retiredInvites, err := p.DB.InviteEventsInRange(
ctx, nil, req.Device.UserID, r,
)
if err != nil {

View file

@ -1,4 +1,4 @@
package shared
package streams
import (
"context"
@ -14,27 +14,17 @@ type PDUStreamProvider struct {
StreamProvider
}
var txReadOnlySnapshot = sql.TxOptions{
// Set the isolation level so that we see a snapshot of the database.
// In PostgreSQL repeatable read transactions will see a snapshot taken
// at the first query, and since the transaction is read-only it can't
// run into any serialisation errors.
// https://www.postgresql.org/docs/9.5/static/transaction-iso.html#XACT-REPEATABLE-READ
Isolation: sql.LevelRepeatableRead,
ReadOnly: true,
}
func (p *PDUStreamProvider) Setup() {
p.StreamProvider.Setup()
p.latestMutex.Lock()
defer p.latestMutex.Unlock()
id, err := p.DB.OutputEvents.SelectMaxEventID(context.Background(), nil)
id, err := p.DB.MaxStreamTokenForPDUs(context.Background())
if err != nil {
return
}
p.latest = types.StreamPosition(id)
p.latest = id
}
func (p *PDUStreamProvider) CompleteSync(
@ -47,7 +37,7 @@ func (p *PDUStreamProvider) CompleteSync(
// a consistent view of the database throughout. This does have the unfortunate side-effect that all
// the matrixy logic resides in this function, but it's better to not hide the fact that this is
// being done in a transaction.
txn, err := p.DB.DB.BeginTx(ctx, &txReadOnlySnapshot)
txn, err := p.DB.ReadOnlySnapshot(ctx)
if err != nil {
return to
}
@ -61,8 +51,7 @@ func (p *PDUStreamProvider) CompleteSync(
}
// Extract room state and recent events for all rooms the user is joined to.
var joinedRoomIDs []string
joinedRoomIDs, err = p.DB.CurrentRoomState.SelectRoomIDsWithMembership(ctx, txn, req.Device.UserID, gomatrixserverlib.Join)
joinedRoomIDs, err := p.DB.RoomIDsWithMembership(ctx, txn, req.Device.UserID, gomatrixserverlib.Join)
if err != nil {
return to
}
@ -83,7 +72,7 @@ func (p *PDUStreamProvider) CompleteSync(
}
// Add peeked rooms.
peeks, err := p.DB.Peeks.SelectPeeksInRange(ctx, txn, req.Device.UserID, req.Device.ID, r)
peeks, err := p.DB.PeeksInRange(ctx, txn, req.Device.UserID, req.Device.ID, r)
if err != nil {
return to
}
@ -101,7 +90,6 @@ func (p *PDUStreamProvider) CompleteSync(
}
succeeded = true
return p.LatestPosition(ctx)
}
@ -120,18 +108,18 @@ func (p *PDUStreamProvider) IncrementalSync(
var err error
//var events []types.StreamEvent
var stateDeltas []stateDelta
var stateDeltas []types.StateDelta
var joinedRooms []string
// TODO: use filter provided in request
stateFilter := gomatrixserverlib.DefaultStateFilter()
if req.WantFullState {
if stateDeltas, joinedRooms, err = p.DB.getStateDeltasForFullStateSync(ctx, req.Device, nil, r, req.Device.UserID, &stateFilter); err != nil {
if stateDeltas, joinedRooms, err = p.DB.GetStateDeltasForFullStateSync(ctx, req.Device, nil, r, req.Device.UserID, &stateFilter); err != nil {
return
}
} else {
if stateDeltas, joinedRooms, err = p.DB.getStateDeltas(ctx, req.Device, nil, r, req.Device.UserID, &stateFilter); err != nil {
if stateDeltas, joinedRooms, err = p.DB.GetStateDeltas(ctx, req.Device, nil, r, req.Device.UserID, &stateFilter); err != nil {
return
}
}
@ -155,56 +143,56 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
device *userapi.Device,
txn *sql.Tx,
r types.Range,
delta stateDelta,
delta types.StateDelta,
numRecentEventsPerRoom int,
res *types.Response,
) error {
if delta.membershipPos > 0 && delta.membership == gomatrixserverlib.Leave {
if delta.MembershipPos > 0 && delta.Membership == gomatrixserverlib.Leave {
// make sure we don't leak recent events after the leave event.
// TODO: History visibility makes this somewhat complex to handle correctly. For example:
// TODO: This doesn't work for join -> leave in a single /sync request (see events prior to join).
// TODO: This will fail on join -> leave -> sensitive msg -> join -> leave
// in a single /sync request
// This is all "okay" assuming history_visibility == "shared" which it is by default.
r.To = delta.membershipPos
r.To = delta.MembershipPos
}
recentStreamEvents, limited, err := p.DB.OutputEvents.SelectRecentEvents(
ctx, txn, delta.roomID, r,
recentStreamEvents, limited, err := p.DB.RecentEvents(
ctx, txn, delta.RoomID, r,
numRecentEventsPerRoom, true, true,
)
if err != nil {
return err
}
recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents)
delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back
prevBatch, err := p.DB.getBackwardTopologyPos(ctx, txn, recentStreamEvents)
delta.StateEvents = removeDuplicates(delta.StateEvents, recentEvents) // roll back
prevBatch, err := p.DB.GetBackwardTopologyPos(ctx, txn, recentStreamEvents)
if err != nil {
return err
}
// XXX: should we ever get this far if we have no recent events or state in this room?
// in practice we do for peeks, but possibly not joins?
if len(recentEvents) == 0 && len(delta.stateEvents) == 0 {
if len(recentEvents) == 0 && len(delta.StateEvents) == 0 {
return nil
}
switch delta.membership {
switch delta.Membership {
case gomatrixserverlib.Join:
jr := types.NewJoinResponse()
jr.Timeline.PrevBatch = &prevBatch
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = limited
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Join[delta.roomID] = *jr
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Join[delta.RoomID] = *jr
case gomatrixserverlib.Peek:
jr := types.NewJoinResponse()
jr.Timeline.PrevBatch = &prevBatch
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = limited
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Peek[delta.roomID] = *jr
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Peek[delta.RoomID] = *jr
case gomatrixserverlib.Leave:
fallthrough // transitions to leave are the same as ban
case gomatrixserverlib.Ban:
@ -214,8 +202,8 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
lr.Timeline.PrevBatch = &prevBatch
lr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
lr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Leave[delta.roomID] = *lr
lr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Leave[delta.RoomID] = *lr
}
return nil
@ -229,7 +217,7 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
numRecentEventsPerRoom int, device *userapi.Device,
) (jr *types.JoinResponse, err error) {
var stateEvents []*gomatrixserverlib.HeaderedEvent
stateEvents, err = p.DB.CurrentRoomState.SelectCurrentState(ctx, txn, roomID, stateFilter)
stateEvents, err = p.DB.CurrentState(ctx, txn, roomID, stateFilter)
if err != nil {
return
}
@ -237,7 +225,7 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
// See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316
var recentStreamEvents []types.StreamEvent
var limited bool
recentStreamEvents, limited, err = p.DB.OutputEvents.SelectRecentEvents(
recentStreamEvents, limited, err = p.DB.RecentEvents(
ctx, txn, roomID, r, numRecentEventsPerRoom, true, true,
)
if err != nil {
@ -276,7 +264,7 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
var prevBatch *types.TopologyToken
if len(recentStreamEvents) > 0 {
var backwardTopologyPos, backwardStreamPos types.StreamPosition
backwardTopologyPos, backwardStreamPos, err = p.DB.Topology.SelectPositionInTopology(ctx, txn, recentStreamEvents[0].EventID())
backwardTopologyPos, backwardStreamPos, err = p.DB.PositionInTopology(ctx, txn, recentStreamEvents[0].EventID())
if err != nil {
return
}

View file

@ -1,4 +1,4 @@
package shared
package streams
import (
"context"
@ -16,12 +16,11 @@ type ReceiptStreamProvider struct {
func (p *ReceiptStreamProvider) Setup() {
p.StreamProvider.Setup()
latest, err := p.DB.Receipts.SelectMaxReceiptID(context.Background(), nil)
id, err := p.DB.MaxStreamTokenForReceipts(context.Background())
if err != nil {
return
}
p.latest = types.StreamPosition(latest)
p.latest = id
}
func (p *ReceiptStreamProvider) CompleteSync(
@ -43,7 +42,7 @@ func (p *ReceiptStreamProvider) IncrementalSync(
}
}
lastPos, receipts, err := p.DB.Receipts.SelectRoomReceiptsAfter(ctx, joinedRooms, from)
lastPos, receipts, err := p.DB.RoomReceiptsAfter(ctx, joinedRooms, from)
if err != nil {
return to //fmt.Errorf("unable to select receipts for rooms: %w", err)
}

View file

@ -1,4 +1,4 @@
package shared
package streams
import (
"context"

View file

@ -1,15 +1,17 @@
package shared
package streams
import (
"context"
"encoding/json"
"github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
)
type TypingStreamProvider struct {
StreamProvider
EDUCache *cache.EDUCache
}
func (p *TypingStreamProvider) CompleteSync(
@ -35,7 +37,7 @@ func (p *TypingStreamProvider) IncrementalSync(
jr := req.Response.Rooms.Join[roomID]
if users, updated := p.DB.EDUCache.GetTypingUsersIfUpdatedAfter(
if users, updated := p.EDUCache.GetTypingUsersIfUpdatedAfter(
roomID, int64(from),
); updated {
ev := gomatrixserverlib.ClientEvent{

View file

@ -1,4 +1,4 @@
package shared
package streams
import (
"context"

View file

@ -0,0 +1,50 @@
package streams
import (
"github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
)
type Streams struct {
PDUStreamProvider types.StreamProvider
PDUTopologyProvider types.TopologyProvider
TypingStreamProvider types.StreamProvider
ReceiptStreamProvider types.StreamProvider
InviteStreamProvider types.StreamProvider
SendToDeviceStreamProvider types.StreamProvider
AccountDataStreamProvider types.StreamProvider
DeviceListStreamProvider types.StreamLogProvider
}
func NewSyncStreamProviders(
d storage.Database, userAPI userapi.UserInternalAPI,
eduCache *cache.EDUCache,
) *Streams {
streams := &Streams{
PDUStreamProvider: &PDUStreamProvider{StreamProvider{DB: d}},
TypingStreamProvider: &TypingStreamProvider{
StreamProvider: StreamProvider{DB: d},
EDUCache: eduCache,
},
ReceiptStreamProvider: &ReceiptStreamProvider{StreamProvider{DB: d}},
InviteStreamProvider: &InviteStreamProvider{StreamProvider{DB: d}},
SendToDeviceStreamProvider: &SendToDeviceStreamProvider{StreamProvider{DB: d}},
AccountDataStreamProvider: &AccountDataStreamProvider{
StreamProvider: StreamProvider{DB: d},
userAPI: userAPI,
},
DeviceListStreamProvider: &DeviceListStreamProvider{StreamLogProvider{DB: d}},
}
streams.PDUStreamProvider.Setup()
streams.TypingStreamProvider.Setup()
streams.ReceiptStreamProvider.Setup()
streams.InviteStreamProvider.Setup()
streams.SendToDeviceStreamProvider.Setup()
streams.AccountDataStreamProvider.Setup()
streams.DeviceListStreamProvider.Setup()
return streams
}

View file

@ -1,14 +1,15 @@
package shared
package streams
import (
"context"
"sync"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
)
type StreamProvider struct {
DB *Database
DB storage.Database
latest types.StreamPosition
latestMutex sync.RWMutex
update *sync.Cond

View file

@ -1,14 +1,15 @@
package shared
package streams
import (
"context"
"sync"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
)
type StreamLogProvider struct {
DB *Database
DB storage.Database
latest types.LogPosition
latestMutex sync.RWMutex
update *sync.Cond

View file

@ -31,6 +31,7 @@ import (
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/internal"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/streams"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/util"
@ -39,19 +40,13 @@ import (
// RequestPool manages HTTP long-poll connections for /sync
type RequestPool struct {
db storage.Database
cfg *config.SyncAPI
userAPI userapi.UserInternalAPI
keyAPI keyapi.KeyInternalAPI
rsAPI roomserverAPI.RoomserverInternalAPI
lastseen sync.Map
pduStream types.StreamProvider
typingStream types.StreamProvider
receiptStream types.StreamProvider
sendToDeviceStream types.StreamProvider
inviteStream types.StreamProvider
accountDataStream types.StreamProvider
deviceListStream types.StreamLogProvider
db storage.Database
cfg *config.SyncAPI
userAPI userapi.UserInternalAPI
keyAPI keyapi.KeyInternalAPI
rsAPI roomserverAPI.RoomserverInternalAPI
lastseen sync.Map
streams *streams.Streams
}
// NewRequestPool makes a new RequestPool
@ -59,21 +54,16 @@ func NewRequestPool(
db storage.Database, cfg *config.SyncAPI,
userAPI userapi.UserInternalAPI, keyAPI keyapi.KeyInternalAPI,
rsAPI roomserverAPI.RoomserverInternalAPI,
streams *streams.Streams,
) *RequestPool {
rp := &RequestPool{
db: db,
cfg: cfg,
userAPI: userAPI,
keyAPI: keyAPI,
rsAPI: rsAPI,
lastseen: sync.Map{},
pduStream: db.PDUStream(),
typingStream: db.TypingStream(),
receiptStream: db.ReceiptStream(),
sendToDeviceStream: db.SendToDeviceStream(),
inviteStream: db.InviteStream(),
accountDataStream: db.AccountDataStream(),
deviceListStream: db.DeviceListStream(),
db: db,
cfg: cfg,
userAPI: userAPI,
keyAPI: keyAPI,
rsAPI: rsAPI,
lastseen: sync.Map{},
streams: streams,
}
go rp.cleanLastSeen()
return rp
@ -186,13 +176,13 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
case <-timer.C: // Timeout reached
return giveup()
case <-rp.pduStream.NotifyAfter(waitctx, syncReq.Since.PDUPosition):
case <-rp.typingStream.NotifyAfter(waitctx, syncReq.Since.TypingPosition):
case <-rp.receiptStream.NotifyAfter(waitctx, syncReq.Since.ReceiptPosition):
case <-rp.inviteStream.NotifyAfter(waitctx, syncReq.Since.InvitePosition):
case <-rp.sendToDeviceStream.NotifyAfter(waitctx, syncReq.Since.SendToDevicePosition):
case <-rp.accountDataStream.NotifyAfter(waitctx, syncReq.Since.AccountDataPosition):
case <-rp.deviceListStream.NotifyAfter(waitctx, syncReq.Since.DeviceListPosition):
case <-rp.streams.PDUStreamProvider.NotifyAfter(waitctx, syncReq.Since.PDUPosition):
case <-rp.streams.TypingStreamProvider.NotifyAfter(waitctx, syncReq.Since.TypingPosition):
case <-rp.streams.ReceiptStreamProvider.NotifyAfter(waitctx, syncReq.Since.ReceiptPosition):
case <-rp.streams.InviteStreamProvider.NotifyAfter(waitctx, syncReq.Since.InvitePosition):
case <-rp.streams.SendToDeviceStreamProvider.NotifyAfter(waitctx, syncReq.Since.SendToDevicePosition):
case <-rp.streams.AccountDataStreamProvider.NotifyAfter(waitctx, syncReq.Since.AccountDataPosition):
case <-rp.streams.DeviceListStreamProvider.NotifyAfter(waitctx, syncReq.Since.DeviceListPosition):
}
syncReq.Log.Println("Responding to sync after wakeup")
@ -204,58 +194,65 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
if syncReq.Since.IsEmpty() {
// Complete sync
syncReq.Response.NextBatch = types.StreamingToken{
PDUPosition: rp.pduStream.CompleteSync(
PDUPosition: rp.streams.PDUStreamProvider.CompleteSync(
syncReq.Context, syncReq,
),
TypingPosition: rp.typingStream.CompleteSync(
TypingPosition: rp.streams.TypingStreamProvider.CompleteSync(
syncReq.Context, syncReq,
),
ReceiptPosition: rp.receiptStream.CompleteSync(
ReceiptPosition: rp.streams.ReceiptStreamProvider.CompleteSync(
syncReq.Context, syncReq,
),
InvitePosition: rp.inviteStream.CompleteSync(
InvitePosition: rp.streams.InviteStreamProvider.CompleteSync(
syncReq.Context, syncReq,
),
SendToDevicePosition: rp.sendToDeviceStream.CompleteSync(
SendToDevicePosition: rp.streams.SendToDeviceStreamProvider.CompleteSync(
syncReq.Context, syncReq,
),
AccountDataPosition: rp.accountDataStream.CompleteSync(
AccountDataPosition: rp.streams.AccountDataStreamProvider.CompleteSync(
syncReq.Context, syncReq,
),
DeviceListPosition: rp.deviceListStream.CompleteSync(
DeviceListPosition: rp.streams.DeviceListStreamProvider.CompleteSync(
syncReq.Context, syncReq,
),
}
} else {
// Incremental sync
syncReq.Response.NextBatch = types.StreamingToken{
PDUPosition: rp.pduStream.IncrementalSync(
PDUPosition: rp.streams.PDUStreamProvider.IncrementalSync(
syncReq.Context, syncReq,
syncReq.Since.PDUPosition, rp.pduStream.LatestPosition(syncReq.Context),
syncReq.Since.PDUPosition, // from
rp.streams.PDUStreamProvider.LatestPosition(syncReq.Context), // to
),
TypingPosition: rp.typingStream.IncrementalSync(
TypingPosition: rp.streams.TypingStreamProvider.IncrementalSync(
syncReq.Context, syncReq,
syncReq.Since.TypingPosition, rp.typingStream.LatestPosition(syncReq.Context),
syncReq.Since.TypingPosition, // from
rp.streams.TypingStreamProvider.LatestPosition(syncReq.Context), // to
),
ReceiptPosition: rp.receiptStream.IncrementalSync(
syncReq.Context, syncReq, syncReq.Since.ReceiptPosition,
rp.receiptStream.LatestPosition(syncReq.Context),
ReceiptPosition: rp.streams.ReceiptStreamProvider.IncrementalSync(
syncReq.Context, syncReq,
syncReq.Since.ReceiptPosition, // from
rp.streams.ReceiptStreamProvider.LatestPosition(syncReq.Context), // to
),
InvitePosition: rp.inviteStream.IncrementalSync(
syncReq.Context, syncReq, syncReq.Since.InvitePosition,
rp.inviteStream.LatestPosition(syncReq.Context),
InvitePosition: rp.streams.InviteStreamProvider.IncrementalSync(
syncReq.Context, syncReq,
syncReq.Since.InvitePosition, // from
rp.streams.InviteStreamProvider.LatestPosition(syncReq.Context), // to
),
SendToDevicePosition: rp.sendToDeviceStream.IncrementalSync(
syncReq.Context, syncReq, syncReq.Since.SendToDevicePosition,
rp.sendToDeviceStream.LatestPosition(syncReq.Context),
SendToDevicePosition: rp.streams.SendToDeviceStreamProvider.IncrementalSync(
syncReq.Context, syncReq,
syncReq.Since.SendToDevicePosition, // from
rp.streams.SendToDeviceStreamProvider.LatestPosition(syncReq.Context), // to
),
AccountDataPosition: rp.accountDataStream.IncrementalSync(
syncReq.Context, syncReq, syncReq.Since.AccountDataPosition,
rp.accountDataStream.LatestPosition(syncReq.Context),
AccountDataPosition: rp.streams.AccountDataStreamProvider.IncrementalSync(
syncReq.Context, syncReq,
syncReq.Since.AccountDataPosition, // from
rp.streams.AccountDataStreamProvider.LatestPosition(syncReq.Context), // to
),
DeviceListPosition: rp.deviceListStream.IncrementalSync(
syncReq.Context, syncReq, syncReq.Since.DeviceListPosition,
rp.db.DeviceListStream().LatestPosition(syncReq.Context),
DeviceListPosition: rp.streams.DeviceListStreamProvider.IncrementalSync(
syncReq.Context, syncReq,
syncReq.Since.DeviceListPosition, // from
rp.streams.DeviceListStreamProvider.LatestPosition(syncReq.Context), // to
),
}
}

View file

@ -18,6 +18,7 @@ import (
"github.com/gorilla/mux"
"github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/eduserver/cache"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
@ -28,6 +29,7 @@ import (
"github.com/matrix-org/dendrite/syncapi/consumers"
"github.com/matrix-org/dendrite/syncapi/routing"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/streams"
"github.com/matrix-org/dendrite/syncapi/sync"
)
@ -43,51 +45,54 @@ func AddPublicRoutes(
) {
consumer, _ := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka)
syncDB, err := storage.NewSyncServerDatasource(&cfg.Database, userAPI)
syncDB, err := storage.NewSyncServerDatasource(&cfg.Database)
if err != nil {
logrus.WithError(err).Panicf("failed to connect to sync db")
}
requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI)
eduCache := cache.New()
streams := streams.NewSyncStreamProviders(syncDB, userAPI, eduCache)
requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams)
keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer(
cfg.Matrix.ServerName, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputKeyChangeEvent)),
consumer, keyAPI, rsAPI, syncDB,
consumer, keyAPI, rsAPI, syncDB, streams,
)
if err = keyChangeConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start key change consumer")
}
roomConsumer := consumers.NewOutputRoomEventConsumer(
cfg, consumer, syncDB, rsAPI,
cfg, consumer, syncDB, streams, rsAPI,
)
if err = roomConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start room server consumer")
}
clientConsumer := consumers.NewOutputClientDataConsumer(
cfg, consumer, syncDB,
cfg, consumer, syncDB, streams,
)
if err = clientConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start client data consumer")
}
typingConsumer := consumers.NewOutputTypingEventConsumer(
cfg, consumer, syncDB,
cfg, consumer, syncDB, eduCache, streams,
)
if err = typingConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start typing consumer")
}
sendToDeviceConsumer := consumers.NewOutputSendToDeviceEventConsumer(
cfg, consumer, syncDB,
cfg, consumer, syncDB, streams,
)
if err = sendToDeviceConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start send-to-device consumer")
}
receiptConsumer := consumers.NewOutputReceiptEventConsumer(
cfg, consumer, syncDB,
cfg, consumer, syncDB, streams,
)
if err = receiptConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start receipts consumer")

View file

@ -35,6 +35,15 @@ var (
ErrInvalidSyncTokenLen = fmt.Errorf("Sync token has an invalid length")
)
type StateDelta struct {
RoomID string
StateEvents []*gomatrixserverlib.HeaderedEvent
Membership string
// The PDU stream position of the latest membership event for this user, if applicable.
// Can be 0 if there is no membership event in this delta.
MembershipPos StreamPosition
}
// StreamPosition represents the offset in the sync stream a client is at.
type StreamPosition int64