Add typing notifications to /sync responses - fixes #635

Signed-off-by: Alex Chen <minecnly@gmail.com>
This commit is contained in:
Cnly 2019-06-18 20:54:08 +08:00
parent ce189a737d
commit b896fdc537
17 changed files with 505 additions and 165 deletions

View file

@ -22,6 +22,7 @@ import (
"github.com/matrix-org/dendrite/common/config" "github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/sync" "github.com/matrix-org/dendrite/syncapi/sync"
"github.com/matrix-org/dendrite/syncapi/types"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
sarama "gopkg.in/Shopify/sarama.v1" sarama "gopkg.in/Shopify/sarama.v1"
) )
@ -29,7 +30,7 @@ import (
// OutputClientDataConsumer consumes events that originated in the client API server. // OutputClientDataConsumer consumes events that originated in the client API server.
type OutputClientDataConsumer struct { type OutputClientDataConsumer struct {
clientAPIConsumer *common.ContinualConsumer clientAPIConsumer *common.ContinualConsumer
db *storage.SyncServerDatabase db *storage.SyncServerDatasource
notifier *sync.Notifier notifier *sync.Notifier
} }
@ -38,7 +39,7 @@ func NewOutputClientDataConsumer(
cfg *config.Dendrite, cfg *config.Dendrite,
kafkaConsumer sarama.Consumer, kafkaConsumer sarama.Consumer,
n *sync.Notifier, n *sync.Notifier,
store *storage.SyncServerDatabase, store *storage.SyncServerDatasource,
) *OutputClientDataConsumer { ) *OutputClientDataConsumer {
consumer := common.ContinualConsumer{ consumer := common.ContinualConsumer{
@ -78,7 +79,7 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error
"room_id": output.RoomID, "room_id": output.RoomID,
}).Info("received data from client API server") }).Info("received data from client API server")
syncStreamPos, err := s.db.UpsertAccountData( pduPos, err := s.db.UpsertAccountData(
context.TODO(), string(msg.Key), output.RoomID, output.Type, context.TODO(), string(msg.Key), output.RoomID, output.Type,
) )
if err != nil { if err != nil {
@ -89,7 +90,7 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error
}).Panicf("could not save account data") }).Panicf("could not save account data")
} }
s.notifier.OnNewEvent(nil, string(msg.Key), syncStreamPos) s.notifier.OnNewEvent(nil, "", []string{string(msg.Key)}, types.SyncPosition{PDUPosition: pduPos})
return nil return nil
} }

View file

@ -33,7 +33,7 @@ import (
// OutputRoomEventConsumer consumes events that originated in the room server. // OutputRoomEventConsumer consumes events that originated in the room server.
type OutputRoomEventConsumer struct { type OutputRoomEventConsumer struct {
roomServerConsumer *common.ContinualConsumer roomServerConsumer *common.ContinualConsumer
db *storage.SyncServerDatabase db *storage.SyncServerDatasource
notifier *sync.Notifier notifier *sync.Notifier
query api.RoomserverQueryAPI query api.RoomserverQueryAPI
} }
@ -43,7 +43,7 @@ func NewOutputRoomEventConsumer(
cfg *config.Dendrite, cfg *config.Dendrite,
kafkaConsumer sarama.Consumer, kafkaConsumer sarama.Consumer,
n *sync.Notifier, n *sync.Notifier,
store *storage.SyncServerDatabase, store *storage.SyncServerDatasource,
queryAPI api.RoomserverQueryAPI, queryAPI api.RoomserverQueryAPI,
) *OutputRoomEventConsumer { ) *OutputRoomEventConsumer {
@ -126,7 +126,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
} }
} }
syncStreamPos, err := s.db.WriteEvent( pduPos, err := s.db.WriteEvent(
ctx, ctx,
&ev, &ev,
addsStateEvents, addsStateEvents,
@ -144,7 +144,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
}).Panicf("roomserver output log: write event failure") }).Panicf("roomserver output log: write event failure")
return nil return nil
} }
s.notifier.OnNewEvent(&ev, "", types.StreamPosition(syncStreamPos)) s.notifier.OnNewEvent(&ev, "", nil, types.SyncPosition{PDUPosition: pduPos})
return nil return nil
} }
@ -152,7 +152,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
func (s *OutputRoomEventConsumer) onNewInviteEvent( func (s *OutputRoomEventConsumer) onNewInviteEvent(
ctx context.Context, msg api.OutputNewInviteEvent, ctx context.Context, msg api.OutputNewInviteEvent,
) error { ) error {
syncStreamPos, err := s.db.AddInviteEvent(ctx, msg.Event) pduPos, err := s.db.AddInviteEvent(ctx, msg.Event)
if err != nil { if err != nil {
// panic rather than continue with an inconsistent database // panic rather than continue with an inconsistent database
log.WithFields(log.Fields{ log.WithFields(log.Fields{
@ -161,7 +161,7 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent(
}).Panicf("roomserver output log: write invite failure") }).Panicf("roomserver output log: write invite failure")
return nil return nil
} }
s.notifier.OnNewEvent(&msg.Event, "", syncStreamPos) s.notifier.OnNewEvent(&msg.Event, "", nil, types.SyncPosition{PDUPosition: pduPos})
return nil return nil
} }

View file

@ -0,0 +1,84 @@
// Copyright 2019 Alex Chen
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package consumers
import (
"encoding/json"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/sync"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/dendrite/typingserver/api"
log "github.com/sirupsen/logrus"
sarama "gopkg.in/Shopify/sarama.v1"
)
type OutputTypingEventConsumer struct {
typingConsumer *common.ContinualConsumer
db *storage.SyncServerDatasource
notifier *sync.Notifier
}
func NewOutputTypingEventConsumer(
cfg *config.Dendrite,
kafkaConsumer sarama.Consumer,
n *sync.Notifier,
store *storage.SyncServerDatasource,
) *OutputTypingEventConsumer {
consumer := common.ContinualConsumer{
Topic: string(cfg.Kafka.Topics.OutputTypingEvent),
Consumer: kafkaConsumer,
PartitionStore: store,
}
s := &OutputTypingEventConsumer{
typingConsumer: &consumer,
db: store,
notifier: n,
}
consumer.ProcessMessage = s.onMessage
return s
}
// Start consuming from typing api
func (s *OutputTypingEventConsumer) Start() error {
return s.typingConsumer.Start()
}
func (s *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
var output api.OutputTypingEvent
if err := json.Unmarshal(msg.Value, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("typing server output log: message parse failure")
return nil
}
log.WithFields(log.Fields{
"room_id": output.Event.RoomID,
}).Info("received data from typing server")
typingEvent := output.Event
if typingEvent.Typing {
s.db.AddTypingUser(typingEvent.UserID, typingEvent.RoomID, output.ExpireTime)
} else {
s.db.RemoveTypingUser(typingEvent.UserID, typingEvent.RoomID)
}
s.notifier.OnNewEvent(nil, output.Event.RoomID, nil, types.SyncPosition{TypingPosition: 1})
return nil
}

View file

@ -30,7 +30,7 @@ import (
const pathPrefixR0 = "/_matrix/client/r0" const pathPrefixR0 = "/_matrix/client/r0"
// Setup configures the given mux with sync-server listeners // Setup configures the given mux with sync-server listeners
func Setup(apiMux *mux.Router, srp *sync.RequestPool, syncDB *storage.SyncServerDatabase, deviceDB *devices.Database) { func Setup(apiMux *mux.Router, srp *sync.RequestPool, syncDB *storage.SyncServerDatasource, deviceDB *devices.Database) {
r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter() r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter()
authData := auth.Data{ authData := auth.Data{

View file

@ -40,7 +40,7 @@ type stateEventInStateResp struct {
// TODO: Check if the user is in the room. If not, check if the room's history // TODO: Check if the user is in the room. If not, check if the room's history
// is publicly visible. Current behaviour is returning an empty array if the // is publicly visible. Current behaviour is returning an empty array if the
// user cannot see the room's history. // user cannot see the room's history.
func OnIncomingStateRequest(req *http.Request, db *storage.SyncServerDatabase, roomID string) util.JSONResponse { func OnIncomingStateRequest(req *http.Request, db *storage.SyncServerDatasource, roomID string) util.JSONResponse {
// TODO(#287): Auth request and handle the case where the user has left (where // TODO(#287): Auth request and handle the case where the user has left (where
// we should return the state at the poin they left) // we should return the state at the poin they left)
@ -84,7 +84,7 @@ func OnIncomingStateRequest(req *http.Request, db *storage.SyncServerDatabase, r
// /rooms/{roomID}/state/{type}/{statekey} request. It will look in current // /rooms/{roomID}/state/{type}/{statekey} request. It will look in current
// state to see if there is an event with that type and state key, if there // state to see if there is an event with that type and state key, if there
// is then (by default) we return the content, otherwise a 404. // is then (by default) we return the content, otherwise a 404.
func OnIncomingStateTypeRequest(req *http.Request, db *storage.SyncServerDatabase, roomID string, evType, stateKey string) util.JSONResponse { func OnIncomingStateTypeRequest(req *http.Request, db *storage.SyncServerDatasource, roomID string, evType, stateKey string) util.JSONResponse {
// TODO(#287): Auth request and handle the case where the user has left (where // TODO(#287): Auth request and handle the case where the user has left (where
// we should return the state at the poin they left) // we should return the state at the poin they left)

View file

@ -19,8 +19,6 @@ import (
"database/sql" "database/sql"
"github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/syncapi/types"
) )
const accountDataSchema = ` const accountDataSchema = `
@ -94,7 +92,7 @@ func (s *accountDataStatements) insertAccountData(
func (s *accountDataStatements) selectAccountDataInRange( func (s *accountDataStatements) selectAccountDataInRange(
ctx context.Context, ctx context.Context,
userID string, userID string,
oldPos, newPos types.StreamPosition, oldPos, newPos int64,
) (data map[string][]string, err error) { ) (data map[string][]string, err error) {
data = make(map[string][]string) data = make(map[string][]string)

View file

@ -23,7 +23,6 @@ import (
"github.com/lib/pq" "github.com/lib/pq"
"github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
@ -113,7 +112,7 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) {
// Results are bucketed based on the room ID. If the same state is overwritten multiple times between the // Results are bucketed based on the room ID. If the same state is overwritten multiple times between the
// two positions, only the most recent state is returned. // two positions, only the most recent state is returned.
func (s *outputRoomEventsStatements) selectStateInRange( func (s *outputRoomEventsStatements) selectStateInRange(
ctx context.Context, txn *sql.Tx, oldPos, newPos types.StreamPosition, ctx context.Context, txn *sql.Tx, oldPos, newPos int64,
) (map[string]map[string]bool, map[string]streamEvent, error) { ) (map[string]map[string]bool, map[string]streamEvent, error) {
stmt := common.TxStmt(txn, s.selectStateInRangeStmt) stmt := common.TxStmt(txn, s.selectStateInRangeStmt)
@ -171,7 +170,7 @@ func (s *outputRoomEventsStatements) selectStateInRange(
eventIDToEvent[ev.EventID()] = streamEvent{ eventIDToEvent[ev.EventID()] = streamEvent{
Event: ev, Event: ev,
streamPosition: types.StreamPosition(streamPos), streamPosition: streamPos,
} }
} }
@ -223,7 +222,7 @@ func (s *outputRoomEventsStatements) insertEvent(
// RecentEventsInRoom returns the most recent events in the given room, up to a maximum of 'limit'. // RecentEventsInRoom returns the most recent events in the given room, up to a maximum of 'limit'.
func (s *outputRoomEventsStatements) selectRecentEvents( func (s *outputRoomEventsStatements) selectRecentEvents(
ctx context.Context, txn *sql.Tx, ctx context.Context, txn *sql.Tx,
roomID string, fromPos, toPos types.StreamPosition, limit int, roomID string, fromPos, toPos int64, limit int,
) ([]streamEvent, error) { ) ([]streamEvent, error) {
stmt := common.TxStmt(txn, s.selectRecentEventsStmt) stmt := common.TxStmt(txn, s.selectRecentEventsStmt)
rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, limit) rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, limit)
@ -286,7 +285,7 @@ func rowsToStreamEvents(rows *sql.Rows) ([]streamEvent, error) {
result = append(result, streamEvent{ result = append(result, streamEvent{
Event: ev, Event: ev,
streamPosition: types.StreamPosition(streamPos), streamPosition: streamPos,
transactionID: transactionID, transactionID: transactionID,
}) })
} }

View file

@ -17,7 +17,10 @@ package storage
import ( import (
"context" "context"
"database/sql" "database/sql"
"encoding/json"
"fmt" "fmt"
"strconv"
"time"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
@ -28,6 +31,7 @@ import (
_ "github.com/lib/pq" _ "github.com/lib/pq"
"github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/dendrite/typingserver/cache"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
) )
@ -37,31 +41,33 @@ type stateDelta struct {
membership string membership string
// The stream position of the latest membership event for this user, if applicable. // The stream position of the latest membership event for this user, if applicable.
// Can be 0 if there is no membership event in this delta. // Can be 0 if there is no membership event in this delta.
membershipPos types.StreamPosition membershipPos int64
} }
// Same as gomatrixserverlib.Event but also has the stream position for this event. // Same as gomatrixserverlib.Event but also has the stream position for this event.
type streamEvent struct { type streamEvent struct {
gomatrixserverlib.Event gomatrixserverlib.Event
streamPosition types.StreamPosition streamPosition int64
transactionID *api.TransactionID transactionID *api.TransactionID
} }
// SyncServerDatabase represents a sync server database // SyncServerDatabase represents a sync server datasource which manages
type SyncServerDatabase struct { // both the database for PDUs and caches for EDUs.
type SyncServerDatasource struct {
db *sql.DB db *sql.DB
common.PartitionOffsetStatements common.PartitionOffsetStatements
accountData accountDataStatements accountData accountDataStatements
events outputRoomEventsStatements events outputRoomEventsStatements
roomstate currentRoomStateStatements roomstate currentRoomStateStatements
invites inviteEventsStatements invites inviteEventsStatements
typingCache *cache.TypingCache
} }
// NewSyncServerDatabase creates a new sync server database // NewSyncServerDatabase creates a new sync server database
func NewSyncServerDatabase(dataSourceName string) (*SyncServerDatabase, error) { func NewSyncServerDatasource(dbDataSourceName string) (*SyncServerDatasource, error) {
var d SyncServerDatabase var d SyncServerDatasource
var err error var err error
if d.db, err = sql.Open("postgres", dataSourceName); err != nil { if d.db, err = sql.Open("postgres", dbDataSourceName); err != nil {
return nil, err return nil, err
} }
if err = d.PartitionOffsetStatements.Prepare(d.db, "syncapi"); err != nil { if err = d.PartitionOffsetStatements.Prepare(d.db, "syncapi"); err != nil {
@ -79,11 +85,12 @@ func NewSyncServerDatabase(dataSourceName string) (*SyncServerDatabase, error) {
if err := d.invites.prepare(d.db); err != nil { if err := d.invites.prepare(d.db); err != nil {
return nil, err return nil, err
} }
d.typingCache = cache.NewTypingCache()
return &d, nil return &d, nil
} }
// AllJoinedUsersInRooms returns a map of room ID to a list of all joined user IDs. // AllJoinedUsersInRooms returns a map of room ID to a list of all joined user IDs.
func (d *SyncServerDatabase) AllJoinedUsersInRooms(ctx context.Context) (map[string][]string, error) { func (d *SyncServerDatasource) AllJoinedUsersInRooms(ctx context.Context) (map[string][]string, error) {
return d.roomstate.selectJoinedUsers(ctx) return d.roomstate.selectJoinedUsers(ctx)
} }
@ -92,7 +99,7 @@ func (d *SyncServerDatabase) AllJoinedUsersInRooms(ctx context.Context) (map[str
// If an event is not found in the database then it will be omitted from the list. // If an event is not found in the database then it will be omitted from the list.
// Returns an error if there was a problem talking with the database. // Returns an error if there was a problem talking with the database.
// Does not include any transaction IDs in the returned events. // Does not include any transaction IDs in the returned events.
func (d *SyncServerDatabase) Events(ctx context.Context, eventIDs []string) ([]gomatrixserverlib.Event, error) { func (d *SyncServerDatasource) Events(ctx context.Context, eventIDs []string) ([]gomatrixserverlib.Event, error) {
streamEvents, err := d.events.selectEvents(ctx, nil, eventIDs) streamEvents, err := d.events.selectEvents(ctx, nil, eventIDs)
if err != nil { if err != nil {
return nil, err return nil, err
@ -106,36 +113,36 @@ func (d *SyncServerDatabase) Events(ctx context.Context, eventIDs []string) ([]g
// WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races // WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races
// when generating the stream position for this event. Returns the sync stream position for the inserted event. // when generating the stream position for this event. Returns the sync stream position for the inserted event.
// Returns an error if there was a problem inserting this event. // Returns an error if there was a problem inserting this event.
func (d *SyncServerDatabase) WriteEvent( func (d *SyncServerDatasource) WriteEvent(
ctx context.Context, ctx context.Context,
ev *gomatrixserverlib.Event, ev *gomatrixserverlib.Event,
addStateEvents []gomatrixserverlib.Event, addStateEvents []gomatrixserverlib.Event,
addStateEventIDs, removeStateEventIDs []string, addStateEventIDs, removeStateEventIDs []string,
transactionID *api.TransactionID, transactionID *api.TransactionID,
) (streamPos types.StreamPosition, returnErr error) { ) (pduPosition int64, returnErr error) {
returnErr = common.WithTransaction(d.db, func(txn *sql.Tx) error { returnErr = common.WithTransaction(d.db, func(txn *sql.Tx) error {
var err error var err error
pos, err := d.events.insertEvent(ctx, txn, ev, addStateEventIDs, removeStateEventIDs, transactionID) pos, err := d.events.insertEvent(ctx, txn, ev, addStateEventIDs, removeStateEventIDs, transactionID)
if err != nil { if err != nil {
return err return err
} }
streamPos = types.StreamPosition(pos) pduPosition = pos
if len(addStateEvents) == 0 && len(removeStateEventIDs) == 0 { if len(addStateEvents) == 0 && len(removeStateEventIDs) == 0 {
// Nothing to do, the event may have just been a message event. // Nothing to do, the event may have just been a message event.
return nil return nil
} }
return d.updateRoomState(ctx, txn, removeStateEventIDs, addStateEvents, streamPos) return d.updateRoomState(ctx, txn, removeStateEventIDs, addStateEvents, pduPosition)
}) })
return return
} }
func (d *SyncServerDatabase) updateRoomState( func (d *SyncServerDatasource) updateRoomState(
ctx context.Context, txn *sql.Tx, ctx context.Context, txn *sql.Tx,
removedEventIDs []string, removedEventIDs []string,
addedEvents []gomatrixserverlib.Event, addedEvents []gomatrixserverlib.Event,
streamPos types.StreamPosition, pduPosition int64,
) error { ) error {
// remove first, then add, as we do not ever delete state, but do replace state which is a remove followed by an add. // remove first, then add, as we do not ever delete state, but do replace state which is a remove followed by an add.
for _, eventID := range removedEventIDs { for _, eventID := range removedEventIDs {
@ -157,7 +164,7 @@ func (d *SyncServerDatabase) updateRoomState(
} }
membership = &value membership = &value
} }
if err := d.roomstate.upsertRoomState(ctx, txn, event, membership, int64(streamPos)); err != nil { if err := d.roomstate.upsertRoomState(ctx, txn, event, membership, pduPosition); err != nil {
return err return err
} }
} }
@ -168,7 +175,7 @@ func (d *SyncServerDatabase) updateRoomState(
// GetStateEvent returns the Matrix state event of a given type for a given room with a given state key // GetStateEvent returns the Matrix state event of a given type for a given room with a given state key
// If no event could be found, returns nil // If no event could be found, returns nil
// If there was an issue during the retrieval, returns an error // If there was an issue during the retrieval, returns an error
func (d *SyncServerDatabase) GetStateEvent( func (d *SyncServerDatasource) GetStateEvent(
ctx context.Context, roomID, evType, stateKey string, ctx context.Context, roomID, evType, stateKey string,
) (*gomatrixserverlib.Event, error) { ) (*gomatrixserverlib.Event, error) {
return d.roomstate.selectStateEvent(ctx, roomID, evType, stateKey) return d.roomstate.selectStateEvent(ctx, roomID, evType, stateKey)
@ -177,7 +184,7 @@ func (d *SyncServerDatabase) GetStateEvent(
// GetStateEventsForRoom fetches the state events for a given room. // GetStateEventsForRoom fetches the state events for a given room.
// Returns an empty slice if no state events could be found for this room. // Returns an empty slice if no state events could be found for this room.
// Returns an error if there was an issue with the retrieval. // Returns an error if there was an issue with the retrieval.
func (d *SyncServerDatabase) GetStateEventsForRoom( func (d *SyncServerDatasource) GetStateEventsForRoom(
ctx context.Context, roomID string, ctx context.Context, roomID string,
) (stateEvents []gomatrixserverlib.Event, err error) { ) (stateEvents []gomatrixserverlib.Event, err error) {
err = common.WithTransaction(d.db, func(txn *sql.Tx) error { err = common.WithTransaction(d.db, func(txn *sql.Tx) error {
@ -187,46 +194,49 @@ func (d *SyncServerDatabase) GetStateEventsForRoom(
return return
} }
// SyncStreamPosition returns the latest position in the sync stream. Returns 0 if there are no events yet. // SyncPosition returns the latest positions for syncing.
func (d *SyncServerDatabase) SyncStreamPosition(ctx context.Context) (types.StreamPosition, error) { func (d *SyncServerDatasource) SyncPosition(ctx context.Context) (types.SyncPosition, error) {
return d.syncStreamPositionTx(ctx, nil) return d.syncPositionTx(ctx, nil)
} }
func (d *SyncServerDatabase) syncStreamPositionTx( func (d *SyncServerDatasource) syncPositionTx(
ctx context.Context, txn *sql.Tx, ctx context.Context, txn *sql.Tx,
) (types.StreamPosition, error) { ) (sp types.SyncPosition, err error) {
maxID, err := d.events.selectMaxEventID(ctx, txn)
maxEventID, err := d.events.selectMaxEventID(ctx, txn)
if err != nil { if err != nil {
return 0, err return sp, err
} }
maxAccountDataID, err := d.accountData.selectMaxAccountDataID(ctx, txn) maxAccountDataID, err := d.accountData.selectMaxAccountDataID(ctx, txn)
if err != nil { if err != nil {
return 0, err return sp, err
} }
if maxAccountDataID > maxID { if maxAccountDataID > maxEventID {
maxID = maxAccountDataID maxEventID = maxAccountDataID
} }
maxInviteID, err := d.invites.selectMaxInviteID(ctx, txn) maxInviteID, err := d.invites.selectMaxInviteID(ctx, txn)
if err != nil { if err != nil {
return 0, err return sp, err
} }
if maxInviteID > maxID { if maxInviteID > maxEventID {
maxID = maxInviteID maxEventID = maxInviteID
} }
return types.StreamPosition(maxID), nil sp.PDUPosition = maxEventID
sp.TypingPosition = d.typingCache.GetLatestSyncPosition()
return
} }
// IncrementalSync returns all the data needed in order to create an incremental // addPDUDeltaToResponse adds all PDU delta to a sync response.
// sync response for the given user. Events returned will include any client // IDs of all rooms the user joined are returned so EDU delta can be added for them.
// transaction IDs associated with the given device. These transaction IDs come func (d *SyncServerDatasource) addPDUDeltaToResponse(
// from when the device sent the event via an API that included a transaction
// ID.
func (d *SyncServerDatabase) IncrementalSync(
ctx context.Context, ctx context.Context,
device authtypes.Device, device authtypes.Device,
fromPos, toPos types.StreamPosition, fromPos, toPos int64,
numRecentEventsPerRoom int, numRecentEventsPerRoom int,
) (*types.Response, error) { res *types.Response,
) ([]string, error) {
txn, err := d.db.BeginTx(ctx, &txReadOnlySnapshot) txn, err := d.db.BeginTx(ctx, &txReadOnlySnapshot)
if err != nil { if err != nil {
return nil, err return nil, err
@ -243,8 +253,9 @@ func (d *SyncServerDatabase) IncrementalSync(
return nil, err return nil, err
} }
res := types.NewResponse(toPos) joinedRoomIDs := make([]string, 0, len(deltas))
for _, delta := range deltas { for _, delta := range deltas {
joinedRoomIDs = append(joinedRoomIDs, delta.roomID)
err = d.addRoomDeltaToResponse(ctx, &device, txn, fromPos, toPos, delta, numRecentEventsPerRoom, res) err = d.addRoomDeltaToResponse(ctx, &device, txn, fromPos, toPos, delta, numRecentEventsPerRoom, res)
if err != nil { if err != nil {
return nil, err return nil, err
@ -257,52 +268,151 @@ func (d *SyncServerDatabase) IncrementalSync(
} }
succeeded = true succeeded = true
return joinedRoomIDs, nil
}
func (d *SyncServerDatasource) addTypingDeltaToResponse(
ctx context.Context,
since int64,
joinedRoomIDs []string,
res *types.Response,
) error {
var jr types.JoinResponse
var ok bool
var err error
for _, roomID := range joinedRoomIDs {
if typingUsers, updated := d.typingCache.GetTypingUsersIfUpdatedAfter(
roomID, since,
); updated {
ev := gomatrixserverlib.ClientEvent{
Type: gomatrixserverlib.MTyping,
RoomID: roomID,
}
ev.Content, err = json.Marshal(map[string]interface{}{
"user_ids": typingUsers,
})
if err != nil {
return err
}
if jr, ok = res.Rooms.Join[roomID]; !ok {
jr = *types.NewJoinResponse()
}
jr.Ephemeral.Events = append(jr.Ephemeral.Events, ev)
res.Rooms.Join[roomID] = jr
}
}
return nil
}
// addEDUDeltaToResponse adds updates for each type of EDUs since fromPos if
// the position for that type of EDU in toPos is not 0.
func (d *SyncServerDatasource) addEDUDeltaToResponse(
ctx context.Context,
fromPos, toPos types.SyncPosition,
joinedRoomIDs []string,
res *types.Response,
) (err error) {
if toPos.TypingPosition != 0 {
err = d.addTypingDeltaToResponse(
ctx, fromPos.TypingPosition, joinedRoomIDs, res,
)
}
return
}
// IncrementalSync returns all the data needed in order to create an incremental
// sync response for the given user. Events returned will include any client
// transaction IDs associated with the given device. These transaction IDs come
// from when the device sent the event via an API that included a transaction
// ID.
func (d *SyncServerDatasource) IncrementalSync(
ctx context.Context,
device authtypes.Device,
fromPos, toPos types.SyncPosition,
numRecentEventsPerRoom int,
) (*types.Response, error) {
res := types.NewResponse(toPos)
var joinedRoomIDs []string
var err error
if toPos.PDUPosition != 0 {
joinedRoomIDs, err = d.addPDUDeltaToResponse(
ctx, device, fromPos.PDUPosition, toPos.PDUPosition, numRecentEventsPerRoom, res,
)
} else {
joinedRoomIDs, err = d.roomstate.selectRoomIDsWithMembership(
ctx, nil, device.UserID, "join",
)
}
if err != nil {
return nil, err
}
err = d.addEDUDeltaToResponse(
ctx, fromPos, toPos, joinedRoomIDs, res,
)
if err != nil {
return nil, err
}
return res, nil return res, nil
} }
// CompleteSync a complete /sync API response for the given user. // getResponseWithPDUsForCompleteSync creates a response and add all PDUs needed
func (d *SyncServerDatabase) CompleteSync( // to it. It returns toPos and joinedRoomIDs for use of adding EDUs.
ctx context.Context, userID string, numRecentEventsPerRoom int, func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync(
) (*types.Response, error) { ctx context.Context,
userID string,
numRecentEventsPerRoom int,
) (
res *types.Response,
toPos types.SyncPosition,
joinedRoomIDs []string,
err error,
) {
// This needs to be all done in a transaction as we need to do multiple SELECTs, and we need to have // This needs to be all done in a transaction as we need to do multiple SELECTs, and we need to have
// a consistent view of the database throughout. This includes extracting the sync stream position. // a consistent view of the database throughout. This includes extracting the sync stream position.
// This does have the unfortunate side-effect that all the matrixy logic resides in this function, // This does have the unfortunate side-effect that all the matrixy logic resides in this function,
// but it's better to not hide the fact that this is being done in a transaction. // but it's better to not hide the fact that this is being done in a transaction.
txn, err := d.db.BeginTx(ctx, &txReadOnlySnapshot) txn, err := d.db.BeginTx(ctx, &txReadOnlySnapshot)
if err != nil { if err != nil {
return nil, err return
} }
var succeeded bool var succeeded bool
defer common.EndTransaction(txn, &succeeded) defer common.EndTransaction(txn, &succeeded)
// Get the current stream position which we will base the sync response on. // Get the current sync position which we will base the sync response on.
pos, err := d.syncStreamPositionTx(ctx, txn) toPos, err = d.syncPositionTx(ctx, txn)
if err != nil { if err != nil {
return nil, err return
} }
res = types.NewResponse(toPos)
// Extract room state and recent events for all rooms the user is joined to. // Extract room state and recent events for all rooms the user is joined to.
roomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, "join") joinedRoomIDs, err = d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, "join")
if err != nil { if err != nil {
return nil, err return
} }
// Build up a /sync response. Add joined rooms. // Build up a /sync response. Add joined rooms.
res := types.NewResponse(pos) for _, roomID := range joinedRoomIDs {
for _, roomID := range roomIDs {
var stateEvents []gomatrixserverlib.Event var stateEvents []gomatrixserverlib.Event
stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID) stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID)
if err != nil { if err != nil {
return nil, err return
} }
// TODO: When filters are added, we may need to call this multiple times to get enough events. // TODO: When filters are added, we may need to call this multiple times to get enough events.
// See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316 // See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316
var recentStreamEvents []streamEvent var recentStreamEvents []streamEvent
recentStreamEvents, err = d.events.selectRecentEvents( recentStreamEvents, err = d.events.selectRecentEvents(
ctx, txn, roomID, types.StreamPosition(0), pos, numRecentEventsPerRoom, ctx, txn, roomID, 0, toPos.PDUPosition, numRecentEventsPerRoom,
) )
if err != nil { if err != nil {
return nil, err return
} }
// We don't include a device here as we don't need to send down // We don't include a device here as we don't need to send down
@ -311,10 +421,12 @@ func (d *SyncServerDatabase) CompleteSync(
stateEvents = removeDuplicates(stateEvents, recentEvents) stateEvents = removeDuplicates(stateEvents, recentEvents)
jr := types.NewJoinResponse() jr := types.NewJoinResponse()
if prevBatch := recentStreamEvents[0].streamPosition - 1; prevBatch > 0 { if prevPDUPos := recentStreamEvents[0].streamPosition - 1; prevPDUPos > 0 {
jr.Timeline.PrevBatch = types.StreamPosition(prevBatch).String() // Use the short form of batch token for prev_batch
jr.Timeline.PrevBatch = strconv.FormatInt(prevPDUPos, 10)
} else { } else {
jr.Timeline.PrevBatch = types.StreamPosition(1).String() // Use the short form of batch token for prev_batch
jr.Timeline.PrevBatch = "1"
} }
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = true jr.Timeline.Limited = true
@ -322,12 +434,34 @@ func (d *SyncServerDatabase) CompleteSync(
res.Rooms.Join[roomID] = *jr res.Rooms.Join[roomID] = *jr
} }
if err = d.addInvitesToResponse(ctx, txn, userID, 0, pos, res); err != nil { if err = d.addInvitesToResponse(ctx, txn, userID, 0, toPos.PDUPosition, res); err != nil {
return nil, err return
} }
succeeded = true succeeded = true
return res, err return res, toPos, joinedRoomIDs, err
}
// CompleteSync a complete /sync API response for the given user.
func (d *SyncServerDatasource) CompleteSync(
ctx context.Context, userID string, numRecentEventsPerRoom int,
) (*types.Response, error) {
res, toPos, joinedRoomIDs, err := d.getResponseWithPDUsForCompleteSync(
ctx, userID, numRecentEventsPerRoom,
)
if err != nil {
return nil, err
}
// Use a zero value SyncPosition for fromPos so all EDU states are added.
err = d.addEDUDeltaToResponse(
ctx, types.SyncPosition{}, toPos, joinedRoomIDs, res,
)
if err != nil {
return nil, err
}
return res, nil
} }
var txReadOnlySnapshot = sql.TxOptions{ var txReadOnlySnapshot = sql.TxOptions{
@ -345,8 +479,8 @@ var txReadOnlySnapshot = sql.TxOptions{
// Returns a map following the format data[roomID] = []dataTypes // Returns a map following the format data[roomID] = []dataTypes
// If no data is retrieved, returns an empty map // If no data is retrieved, returns an empty map
// If there was an issue with the retrieval, returns an error // If there was an issue with the retrieval, returns an error
func (d *SyncServerDatabase) GetAccountDataInRange( func (d *SyncServerDatasource) GetAccountDataInRange(
ctx context.Context, userID string, oldPos, newPos types.StreamPosition, ctx context.Context, userID string, oldPos, newPos int64,
) (map[string][]string, error) { ) (map[string][]string, error) {
return d.accountData.selectAccountDataInRange(ctx, userID, oldPos, newPos) return d.accountData.selectAccountDataInRange(ctx, userID, oldPos, newPos)
} }
@ -357,26 +491,24 @@ func (d *SyncServerDatabase) GetAccountDataInRange(
// If no data with the given type, user ID and room ID exists in the database, // If no data with the given type, user ID and room ID exists in the database,
// creates a new row, else update the existing one // creates a new row, else update the existing one
// Returns an error if there was an issue with the upsert // Returns an error if there was an issue with the upsert
func (d *SyncServerDatabase) UpsertAccountData( func (d *SyncServerDatasource) UpsertAccountData(
ctx context.Context, userID, roomID, dataType string, ctx context.Context, userID, roomID, dataType string,
) (types.StreamPosition, error) { ) (int64, error) {
pos, err := d.accountData.insertAccountData(ctx, userID, roomID, dataType) return d.accountData.insertAccountData(ctx, userID, roomID, dataType)
return types.StreamPosition(pos), err
} }
// AddInviteEvent stores a new invite event for a user. // AddInviteEvent stores a new invite event for a user.
// If the invite was successfully stored this returns the stream ID it was stored at. // If the invite was successfully stored this returns the stream ID it was stored at.
// Returns an error if there was a problem communicating with the database. // Returns an error if there was a problem communicating with the database.
func (d *SyncServerDatabase) AddInviteEvent( func (d *SyncServerDatasource) AddInviteEvent(
ctx context.Context, inviteEvent gomatrixserverlib.Event, ctx context.Context, inviteEvent gomatrixserverlib.Event,
) (types.StreamPosition, error) { ) (int64, error) {
pos, err := d.invites.insertInviteEvent(ctx, inviteEvent) return d.invites.insertInviteEvent(ctx, inviteEvent)
return types.StreamPosition(pos), err
} }
// RetireInviteEvent removes an old invite event from the database. // RetireInviteEvent removes an old invite event from the database.
// Returns an error if there was a problem communicating with the database. // Returns an error if there was a problem communicating with the database.
func (d *SyncServerDatabase) RetireInviteEvent( func (d *SyncServerDatasource) RetireInviteEvent(
ctx context.Context, inviteEventID string, ctx context.Context, inviteEventID string,
) error { ) error {
// TODO: Record that invite has been retired in a stream so that we can // TODO: Record that invite has been retired in a stream so that we can
@ -385,10 +517,22 @@ func (d *SyncServerDatabase) RetireInviteEvent(
return err return err
} }
func (d *SyncServerDatabase) addInvitesToResponse( func (d *SyncServerDatasource) AddTypingUser(
userID, roomID string, expireTime *time.Time,
) {
d.typingCache.AddTypingUser(userID, roomID, expireTime)
}
func (d *SyncServerDatasource) RemoveTypingUser(
userID, roomID string,
) {
d.typingCache.RemoveUser(userID, roomID)
}
func (d *SyncServerDatasource) addInvitesToResponse(
ctx context.Context, txn *sql.Tx, ctx context.Context, txn *sql.Tx,
userID string, userID string,
fromPos, toPos types.StreamPosition, fromPos, toPos int64,
res *types.Response, res *types.Response,
) error { ) error {
invites, err := d.invites.selectInviteEventsInRange( invites, err := d.invites.selectInviteEventsInRange(
@ -409,11 +553,11 @@ func (d *SyncServerDatabase) addInvitesToResponse(
} }
// addRoomDeltaToResponse adds a room state delta to a sync response // addRoomDeltaToResponse adds a room state delta to a sync response
func (d *SyncServerDatabase) addRoomDeltaToResponse( func (d *SyncServerDatasource) addRoomDeltaToResponse(
ctx context.Context, ctx context.Context,
device *authtypes.Device, device *authtypes.Device,
txn *sql.Tx, txn *sql.Tx,
fromPos, toPos types.StreamPosition, fromPos, toPos int64,
delta stateDelta, delta stateDelta,
numRecentEventsPerRoom int, numRecentEventsPerRoom int,
res *types.Response, res *types.Response,
@ -445,10 +589,12 @@ func (d *SyncServerDatabase) addRoomDeltaToResponse(
switch delta.membership { switch delta.membership {
case "join": case "join":
jr := types.NewJoinResponse() jr := types.NewJoinResponse()
if prevBatch := recentStreamEvents[0].streamPosition - 1; prevBatch > 0 { if prevPDUPos := recentStreamEvents[0].streamPosition - 1; prevPDUPos > 0 {
jr.Timeline.PrevBatch = types.StreamPosition(prevBatch).String() // Use the short form of batch token for prev_batch
jr.Timeline.PrevBatch = strconv.FormatInt(prevPDUPos, 10)
} else { } else {
jr.Timeline.PrevBatch = types.StreamPosition(1).String() // Use the short form of batch token for prev_batch
jr.Timeline.PrevBatch = "1"
} }
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
@ -460,10 +606,12 @@ func (d *SyncServerDatabase) addRoomDeltaToResponse(
// TODO: recentEvents may contain events that this user is not allowed to see because they are // TODO: recentEvents may contain events that this user is not allowed to see because they are
// no longer in the room. // no longer in the room.
lr := types.NewLeaveResponse() lr := types.NewLeaveResponse()
if prevBatch := recentStreamEvents[0].streamPosition - 1; prevBatch > 0 { if prevPDUPos := recentStreamEvents[0].streamPosition - 1; prevPDUPos > 0 {
lr.Timeline.PrevBatch = types.StreamPosition(prevBatch).String() // Use the short form of batch token for prev_batch
lr.Timeline.PrevBatch = strconv.FormatInt(prevPDUPos, 10)
} else { } else {
lr.Timeline.PrevBatch = types.StreamPosition(1).String() // Use the short form of batch token for prev_batch
lr.Timeline.PrevBatch = "1"
} }
lr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) lr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
@ -476,7 +624,7 @@ func (d *SyncServerDatabase) addRoomDeltaToResponse(
// fetchStateEvents converts the set of event IDs into a set of events. It will fetch any which are missing from the database. // fetchStateEvents converts the set of event IDs into a set of events. It will fetch any which are missing from the database.
// Returns a map of room ID to list of events. // Returns a map of room ID to list of events.
func (d *SyncServerDatabase) fetchStateEvents( func (d *SyncServerDatasource) fetchStateEvents(
ctx context.Context, txn *sql.Tx, ctx context.Context, txn *sql.Tx,
roomIDToEventIDSet map[string]map[string]bool, roomIDToEventIDSet map[string]map[string]bool,
eventIDToEvent map[string]streamEvent, eventIDToEvent map[string]streamEvent,
@ -521,7 +669,7 @@ func (d *SyncServerDatabase) fetchStateEvents(
return stateBetween, nil return stateBetween, nil
} }
func (d *SyncServerDatabase) fetchMissingStateEvents( func (d *SyncServerDatasource) fetchMissingStateEvents(
ctx context.Context, txn *sql.Tx, eventIDs []string, ctx context.Context, txn *sql.Tx, eventIDs []string,
) ([]streamEvent, error) { ) ([]streamEvent, error) {
// Fetch from the events table first so we pick up the stream ID for the // Fetch from the events table first so we pick up the stream ID for the
@ -560,9 +708,9 @@ func (d *SyncServerDatabase) fetchMissingStateEvents(
return events, nil return events, nil
} }
func (d *SyncServerDatabase) getStateDeltas( func (d *SyncServerDatasource) getStateDeltas(
ctx context.Context, device *authtypes.Device, txn *sql.Tx, ctx context.Context, device *authtypes.Device, txn *sql.Tx,
fromPos, toPos types.StreamPosition, userID string, fromPos, toPos int64, userID string,
) ([]stateDelta, error) { ) ([]stateDelta, error) {
// Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821 // Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821
// - Get membership list changes for this user in this sync response // - Get membership list changes for this user in this sync response
@ -601,7 +749,7 @@ func (d *SyncServerDatabase) getStateDeltas(
} }
s := make([]streamEvent, len(allState)) s := make([]streamEvent, len(allState))
for i := 0; i < len(s); i++ { for i := 0; i < len(s); i++ {
s[i] = streamEvent{Event: allState[i], streamPosition: types.StreamPosition(0)} s[i] = streamEvent{Event: allState[i], streamPosition: 0}
} }
state[roomID] = s state[roomID] = s
continue // we'll add this room in when we do joined rooms continue // we'll add this room in when we do joined rooms

View file

@ -35,8 +35,8 @@ type Notifier struct {
roomIDToJoinedUsers map[string]userIDSet roomIDToJoinedUsers map[string]userIDSet
// Protects currPos and userStreams. // Protects currPos and userStreams.
streamLock *sync.Mutex streamLock *sync.Mutex
// The latest sync stream position // The latest sync position
currPos types.StreamPosition currPos types.SyncPosition
// A map of user_id => UserStream which can be used to wake a given user's /sync request. // A map of user_id => UserStream which can be used to wake a given user's /sync request.
userStreams map[string]*UserStream userStreams map[string]*UserStream
// The last time we cleaned out stale entries from the userStreams map // The last time we cleaned out stale entries from the userStreams map
@ -46,7 +46,7 @@ type Notifier struct {
// NewNotifier creates a new notifier set to the given stream position. // NewNotifier creates a new notifier set to the given stream position.
// In order for this to be of any use, the Notifier needs to be told all rooms and // In order for this to be of any use, the Notifier needs to be told all rooms and
// the joined users within each of them by calling Notifier.Load(*storage.SyncServerDatabase). // the joined users within each of them by calling Notifier.Load(*storage.SyncServerDatabase).
func NewNotifier(pos types.StreamPosition) *Notifier { func NewNotifier(pos types.SyncPosition) *Notifier {
return &Notifier{ return &Notifier{
currPos: pos, currPos: pos,
roomIDToJoinedUsers: make(map[string]userIDSet), roomIDToJoinedUsers: make(map[string]userIDSet),
@ -59,8 +59,9 @@ func NewNotifier(pos types.StreamPosition) *Notifier {
// OnNewEvent is called when a new event is received from the room server. Must only be // OnNewEvent is called when a new event is received from the room server. Must only be
// called from a single goroutine, to avoid races between updates which could set the // called from a single goroutine, to avoid races between updates which could set the
// current position in the stream incorrectly. // current position in the stream incorrectly.
// Can be called either with a *gomatrixserverlib.Event, or with an user ID // Can be called with one among: a *gomatrixserverlib.Event, a room ID, or a list of user IDs.
func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, userID string, pos types.StreamPosition) { // If a position in pos is 0, it means no updates available of that type.
func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, roomID string, userIDs []string, pos types.SyncPosition) {
// update the current position then notify relevant /sync streams. // update the current position then notify relevant /sync streams.
// This needs to be done PRIOR to waking up users as they will read this value. // This needs to be done PRIOR to waking up users as they will read this value.
n.streamLock.Lock() n.streamLock.Lock()
@ -101,8 +102,14 @@ func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, userID string, pos ty
for _, toNotifyUserID := range userIDs { for _, toNotifyUserID := range userIDs {
n.wakeupUser(toNotifyUserID, pos) n.wakeupUser(toNotifyUserID, pos)
} }
} else if len(userID) > 0 { } else if roomID != "" {
n.wakeupUser(userID, pos) for _, userID := range n.joinedUsers(roomID) {
n.wakeupUser(userID, pos)
}
} else if len(userIDs) > 0 {
for _, userID := range userIDs {
n.wakeupUser(userID, pos)
}
} }
} }
@ -127,7 +134,7 @@ func (n *Notifier) GetListener(req syncRequest) UserStreamListener {
} }
// Load the membership states required to notify users correctly. // Load the membership states required to notify users correctly.
func (n *Notifier) Load(ctx context.Context, db *storage.SyncServerDatabase) error { func (n *Notifier) Load(ctx context.Context, db *storage.SyncServerDatasource) error {
roomToUsers, err := db.AllJoinedUsersInRooms(ctx) roomToUsers, err := db.AllJoinedUsersInRooms(ctx)
if err != nil { if err != nil {
return err return err
@ -137,7 +144,7 @@ func (n *Notifier) Load(ctx context.Context, db *storage.SyncServerDatabase) err
} }
// CurrentPosition returns the current stream position // CurrentPosition returns the current stream position
func (n *Notifier) CurrentPosition() types.StreamPosition { func (n *Notifier) CurrentPosition() types.SyncPosition {
return n.currPos return n.currPos
} }
@ -156,7 +163,7 @@ func (n *Notifier) setUsersJoinedToRooms(roomIDToUserIDs map[string][]string) {
} }
} }
func (n *Notifier) wakeupUser(userID string, newPos types.StreamPosition) { func (n *Notifier) wakeupUser(userID string, newPos types.SyncPosition) {
stream := n.fetchUserStream(userID, false) stream := n.fetchUserStream(userID, false)
if stream == nil { if stream == nil {
return return

View file

@ -16,8 +16,10 @@ package sync
import ( import (
"context" "context"
"errors"
"net/http" "net/http"
"strconv" "strconv"
"strings"
"time" "time"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/clientapi/auth/authtypes"
@ -36,7 +38,7 @@ type syncRequest struct {
device authtypes.Device device authtypes.Device
limit int limit int
timeout time.Duration timeout time.Duration
since *types.StreamPosition // nil means that no since token was supplied since *types.SyncPosition // nil means that no since token was supplied
wantFullState bool wantFullState bool
log *log.Entry log *log.Entry
} }
@ -74,14 +76,37 @@ func getTimeout(timeoutMS string) time.Duration {
// getSyncStreamPosition tries to parse a 'since' token taken from the API to a // getSyncStreamPosition tries to parse a 'since' token taken from the API to a
// stream position. If the string is empty then (nil, nil) is returned. // stream position. If the string is empty then (nil, nil) is returned.
func getSyncStreamPosition(since string) (*types.StreamPosition, error) { // There are two types of token: The first is a normal length one containing
// all PDU and EDU positions. The second only contains the PDU position; prev_batch
// tokens generated by the server will be in this form.
func getSyncStreamPosition(since string) (*types.SyncPosition, error) {
if since == "" { if since == "" {
return nil, nil return nil, nil
} }
i, err := strconv.Atoi(since)
if err != nil { posStrings := strings.Split(since, "_")
return nil, err if len(posStrings) != 2 && len(posStrings) != 1 {
return nil, errors.New("malformed batch token")
}
positions := make([]int64, len(posStrings))
for i, posString := range posStrings {
pos, err := strconv.ParseInt(posString, 10, 64)
if err != nil {
return nil, err
}
positions[i] = pos
}
if len(positions) == 2 {
// Normal length token
return &types.SyncPosition{
positions[0], positions[1],
}, nil
} else {
// Token with PDU position only
return &types.SyncPosition{
PDUPosition: positions[0],
}, nil
} }
token := types.StreamPosition(i)
return &token, nil
} }

View file

@ -31,13 +31,13 @@ import (
// RequestPool manages HTTP long-poll connections for /sync // RequestPool manages HTTP long-poll connections for /sync
type RequestPool struct { type RequestPool struct {
db *storage.SyncServerDatabase db *storage.SyncServerDatasource
accountDB *accounts.Database accountDB *accounts.Database
notifier *Notifier notifier *Notifier
} }
// NewRequestPool makes a new RequestPool // NewRequestPool makes a new RequestPool
func NewRequestPool(db *storage.SyncServerDatabase, n *Notifier, adb *accounts.Database) *RequestPool { func NewRequestPool(db *storage.SyncServerDatasource, n *Notifier, adb *accounts.Database) *RequestPool {
return &RequestPool{db, adb, n} return &RequestPool{db, adb, n}
} }
@ -128,7 +128,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
} }
} }
func (rp *RequestPool) currentSyncForUser(req syncRequest, currentPos types.StreamPosition) (res *types.Response, err error) { func (rp *RequestPool) currentSyncForUser(req syncRequest, currentPos types.SyncPosition) (res *types.Response, err error) {
// TODO: handle ignored users // TODO: handle ignored users
if req.since == nil { if req.since == nil {
res, err = rp.db.CompleteSync(req.ctx, req.device.UserID, req.limit) res, err = rp.db.CompleteSync(req.ctx, req.device.UserID, req.limit)
@ -140,12 +140,12 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, currentPos types.Stre
return return
} }
res, err = rp.appendAccountData(res, req.device.UserID, req, currentPos) res, err = rp.appendAccountData(res, req.device.UserID, req, currentPos.PDUPosition)
return return
} }
func (rp *RequestPool) appendAccountData( func (rp *RequestPool) appendAccountData(
data *types.Response, userID string, req syncRequest, currentPos types.StreamPosition, data *types.Response, userID string, req syncRequest, currentPos int64,
) (*types.Response, error) { ) (*types.Response, error) {
// TODO: Account data doesn't have a sync position of its own, meaning that // TODO: Account data doesn't have a sync position of its own, meaning that
// account data might be sent multiple time to the client if multiple account // account data might be sent multiple time to the client if multiple account
@ -179,7 +179,7 @@ func (rp *RequestPool) appendAccountData(
} }
// Sync is not initial, get all account data since the latest sync // Sync is not initial, get all account data since the latest sync
dataTypes, err := rp.db.GetAccountDataInRange(req.ctx, userID, *req.since, currentPos) dataTypes, err := rp.db.GetAccountDataInRange(req.ctx, userID, req.since.PDUPosition, currentPos)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -34,8 +34,8 @@ type UserStream struct {
lock sync.Mutex lock sync.Mutex
// Closed when there is an update. // Closed when there is an update.
signalChannel chan struct{} signalChannel chan struct{}
// The last stream position that there may have been an update for the suser // The last sync position that there may have been an update for the user
pos types.StreamPosition pos types.SyncPosition
// The last time when we had some listeners waiting // The last time when we had some listeners waiting
timeOfLastChannel time.Time timeOfLastChannel time.Time
// The number of listeners waiting // The number of listeners waiting
@ -51,7 +51,7 @@ type UserStreamListener struct {
} }
// NewUserStream creates a new user stream // NewUserStream creates a new user stream
func NewUserStream(userID string, currPos types.StreamPosition) *UserStream { func NewUserStream(userID string, currPos types.SyncPosition) *UserStream {
return &UserStream{ return &UserStream{
UserID: userID, UserID: userID,
timeOfLastChannel: time.Now(), timeOfLastChannel: time.Now(),
@ -85,7 +85,7 @@ func (s *UserStream) GetListener(ctx context.Context) UserStreamListener {
} }
// Broadcast a new stream position for this user. // Broadcast a new stream position for this user.
func (s *UserStream) Broadcast(pos types.StreamPosition) { func (s *UserStream) Broadcast(pos types.SyncPosition) {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()
@ -120,7 +120,7 @@ func (s *UserStream) TimeOfLastNonEmpty() time.Time {
// GetStreamPosition returns last stream position which the UserStream was // GetStreamPosition returns last stream position which the UserStream was
// notified about // notified about
func (s *UserStreamListener) GetStreamPosition() types.StreamPosition { func (s *UserStreamListener) GetStreamPosition() types.SyncPosition {
s.userStream.lock.Lock() s.userStream.lock.Lock()
defer s.userStream.lock.Unlock() defer s.userStream.lock.Unlock()
@ -132,11 +132,11 @@ func (s *UserStreamListener) GetStreamPosition() types.StreamPosition {
// sincePos specifies from which point we want to be notified about. If there // sincePos specifies from which point we want to be notified about. If there
// has already been an update after sincePos we'll return a closed channel // has already been an update after sincePos we'll return a closed channel
// immediately. // immediately.
func (s *UserStreamListener) GetNotifyChannel(sincePos types.StreamPosition) <-chan struct{} { func (s *UserStreamListener) GetNotifyChannel(sincePos types.SyncPosition) <-chan struct{} {
s.userStream.lock.Lock() s.userStream.lock.Lock()
defer s.userStream.lock.Unlock() defer s.userStream.lock.Unlock()
if sincePos < s.userStream.pos { if s.userStream.pos.IsAfter(sincePos) {
// If the listener is behind, i.e. missed a potential update, then we // If the listener is behind, i.e. missed a potential update, then we
// want them to wake up immediately. We do this by returning a new // want them to wake up immediately. We do this by returning a new
// closed stream, which returns immediately when selected. // closed stream, which returns immediately when selected.

View file

@ -28,7 +28,6 @@ import (
"github.com/matrix-org/dendrite/syncapi/routing" "github.com/matrix-org/dendrite/syncapi/routing"
"github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/sync" "github.com/matrix-org/dendrite/syncapi/sync"
"github.com/matrix-org/dendrite/syncapi/types"
) )
// SetupSyncAPIComponent sets up and registers HTTP handlers for the SyncAPI // SetupSyncAPIComponent sets up and registers HTTP handlers for the SyncAPI
@ -39,17 +38,17 @@ func SetupSyncAPIComponent(
accountsDB *accounts.Database, accountsDB *accounts.Database,
queryAPI api.RoomserverQueryAPI, queryAPI api.RoomserverQueryAPI,
) { ) {
syncDB, err := storage.NewSyncServerDatabase(string(base.Cfg.Database.SyncAPI)) syncDB, err := storage.NewSyncServerDatasource(string(base.Cfg.Database.SyncAPI))
if err != nil { if err != nil {
logrus.WithError(err).Panicf("failed to connect to sync db") logrus.WithError(err).Panicf("failed to connect to sync db")
} }
pos, err := syncDB.SyncStreamPosition(context.Background()) pos, err := syncDB.SyncPosition(context.Background())
if err != nil { if err != nil {
logrus.WithError(err).Panicf("failed to get stream position") logrus.WithError(err).Panicf("failed to get stream position")
} }
notifier := sync.NewNotifier(types.StreamPosition(pos)) notifier := sync.NewNotifier(pos)
err = notifier.Load(context.Background(), syncDB) err = notifier.Load(context.Background(), syncDB)
if err != nil { if err != nil {
logrus.WithError(err).Panicf("failed to start notifier") logrus.WithError(err).Panicf("failed to start notifier")
@ -71,5 +70,12 @@ func SetupSyncAPIComponent(
logrus.WithError(err).Panicf("failed to start client data consumer") logrus.WithError(err).Panicf("failed to start client data consumer")
} }
typingConsumer := consumers.NewOutputTypingEventConsumer(
base.Cfg, base.KafkaConsumer, notifier, syncDB,
)
if err = typingConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start typing server consumer")
}
routing.Setup(base.APIMux, requestPool, syncDB, deviceDB) routing.Setup(base.APIMux, requestPool, syncDB, deviceDB)
} }

View file

@ -21,12 +21,24 @@ import (
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
) )
// StreamPosition represents the offset in the sync stream a client is at. // SyncPosition contains the PDU and EDU stream sync positions for a client.
type StreamPosition int64 type SyncPosition struct {
// PDUPosition is the stream position for PDUs the client is at.
PDUPosition int64
// TypingPosition is the position for typing notifications the client is at.
TypingPosition int64
}
// String implements the Stringer interface. // String implements the Stringer interface.
func (sp StreamPosition) String() string { func (sp SyncPosition) String() string {
return strconv.FormatInt(int64(sp), 10) return strconv.FormatInt(sp.PDUPosition, 10) + "_" +
strconv.FormatInt(sp.TypingPosition, 10)
}
// IsAfter returns whether sp refers to states newer than other
func (sp SyncPosition) IsAfter(other SyncPosition) bool {
return sp.PDUPosition > other.PDUPosition ||
sp.TypingPosition > other.TypingPosition
} }
// PrevEventRef represents a reference to a previous event in a state event upgrade // PrevEventRef represents a reference to a previous event in a state event upgrade
@ -53,11 +65,10 @@ type Response struct {
} }
// NewResponse creates an empty response with initialised maps. // NewResponse creates an empty response with initialised maps.
func NewResponse(pos StreamPosition) *Response { func NewResponse(pos SyncPosition) *Response {
res := Response{} res := Response{
// Make sure we send the next_batch as a string. We don't want to confuse clients by sending this NextBatch: pos.String(),
// as an integer even though (at the moment) it is. }
res.NextBatch = pos.String()
// Pre-initialise the maps. Synapse will return {} even if there are no rooms under a specific section, // Pre-initialise the maps. Synapse will return {} even if there are no rooms under a specific section,
// so let's do the same thing. Bonus: this means we can't get dreaded 'assignment to entry in nil map' errors. // so let's do the same thing. Bonus: this means we can't get dreaded 'assignment to entry in nil map' errors.
res.Rooms.Join = make(map[string]JoinResponse) res.Rooms.Join = make(map[string]JoinResponse)

View file

@ -12,12 +12,17 @@
package api package api
import "time"
// OutputTypingEvent is an entry in typing server output kafka log. // OutputTypingEvent is an entry in typing server output kafka log.
// This contains the event with extra fields used to create 'm.typing' event // This contains the event with extra fields used to create 'm.typing' event
// in clientapi & federation. // in clientapi & federation.
type OutputTypingEvent struct { type OutputTypingEvent struct {
// The Event for the typing edu event. // The Event for the typing edu event.
Event TypingEvent `json:"event"` Event TypingEvent `json:"event"`
// ExpireTime is the interval after which the user should no longer be
// considered typing. Only available if Event.Typing is true.
ExpireTime *time.Time
// Users typing in the room when the event was generated. // Users typing in the room when the event was generated.
TypingUsers []string `json:"typing_users"` TypingUsers []string `json:"typing_users"`
} }

View file

@ -22,25 +22,53 @@ const defaultTypingTimeout = 10 * time.Second
// userSet is a map of user IDs to a timer, timer fires at expiry. // userSet is a map of user IDs to a timer, timer fires at expiry.
type userSet map[string]*time.Timer type userSet map[string]*time.Timer
type roomData struct {
syncPosition int64
userSet userSet
}
// TypingCache maintains a list of users typing in each room. // TypingCache maintains a list of users typing in each room.
type TypingCache struct { type TypingCache struct {
sync.RWMutex sync.RWMutex
data map[string]userSet latestSyncPosition int64
data map[string]*roomData
}
// Create a roomData with its sync position set to the latest sync position.
// Must only be called after locking the cache.
func (t *TypingCache) newRoomData() *roomData {
return &roomData{
syncPosition: t.latestSyncPosition,
userSet: make(userSet),
}
} }
// NewTypingCache returns a new TypingCache initialised for use. // NewTypingCache returns a new TypingCache initialised for use.
func NewTypingCache() *TypingCache { func NewTypingCache() *TypingCache {
return &TypingCache{data: make(map[string]userSet)} return &TypingCache{data: make(map[string]*roomData)}
} }
// GetTypingUsers returns the list of users typing in a room. // GetTypingUsers returns the list of users typing in a room.
func (t *TypingCache) GetTypingUsers(roomID string) (users []string) { func (t *TypingCache) GetTypingUsers(roomID string) []string {
users, _ := t.GetTypingUsersIfUpdatedAfter(roomID, 0)
// 0 should work above because the first position used will be 1.
return users
}
// GetTypingUsersIfUpdatedAfter returns all users typing in this room with
// updated == true if the typing sync position of the room is after the given
// position. Otherwise, returns an empty slice with updated == false.
func (t *TypingCache) GetTypingUsersIfUpdatedAfter(
roomID string, position int64,
) (users []string, updated bool) {
t.RLock() t.RLock()
usersMap, ok := t.data[roomID] roomData, ok := t.data[roomID]
userSet := roomData.userSet
t.RUnlock() t.RUnlock()
if ok { if ok && roomData.syncPosition > position {
users = make([]string, 0, len(usersMap)) updated = true
for userID := range usersMap { users = make([]string, 0, len(userSet))
for userID := range userSet {
users = append(users, userID) users = append(users, userID)
} }
} }
@ -64,12 +92,16 @@ func (t *TypingCache) addUser(userID, roomID string, expiryTimer *time.Timer) {
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
t.latestSyncPosition++
if t.data[roomID] == nil { if t.data[roomID] == nil {
t.data[roomID] = make(userSet) t.data[roomID] = t.newRoomData()
} else {
t.data[roomID].syncPosition = t.latestSyncPosition
} }
// Stop the timer to cancel the call to timeoutCallback // Stop the timer to cancel the call to timeoutCallback
if timer, ok := t.data[roomID][userID]; ok { if timer, ok := t.data[roomID].userSet[userID]; ok {
// It may happen that at this stage timer fires but now we have a lock on t. // It may happen that at this stage timer fires but now we have a lock on t.
// Hence the execution of timeoutCallback will happen after we unlock. // Hence the execution of timeoutCallback will happen after we unlock.
// So we may lose a typing state, though this event is highly unlikely. // So we may lose a typing state, though this event is highly unlikely.
@ -78,7 +110,7 @@ func (t *TypingCache) addUser(userID, roomID string, expiryTimer *time.Timer) {
timer.Stop() timer.Stop()
} }
t.data[roomID][userID] = expiryTimer t.data[roomID].userSet[userID] = expiryTimer
} }
// Returns a function which is called after timeout happens. // Returns a function which is called after timeout happens.
@ -94,10 +126,27 @@ func (t *TypingCache) RemoveUser(userID, roomID string) {
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
if timer, ok := t.data[roomID][userID]; ok { roomData, ok := t.data[roomID]
timer.Stop() if !ok {
delete(t.data[roomID], userID) return
} }
timer, ok := roomData.userSet[userID]
if !ok {
return
}
timer.Stop()
delete(roomData.userSet, userID)
t.latestSyncPosition++
t.data[roomID].syncPosition = t.latestSyncPosition
}
func (t *TypingCache) GetLatestSyncPosition() int64 {
t.Lock()
defer t.Unlock()
return t.latestSyncPosition
} }
func getExpireTime(expire *time.Time) time.Time { func getExpireTime(expire *time.Time) time.Time {

View file

@ -68,6 +68,13 @@ func (t *TypingServerInputAPI) sendEvent(ite *api.InputTypingEvent) error {
TypingUsers: userIDs, TypingUsers: userIDs,
} }
if ev.Typing {
expireTime := ite.OriginServerTS.Time().Add(
time.Duration(ite.Timeout) * time.Millisecond,
)
ote.ExpireTime = &expireTime
}
eventJSON, err := json.Marshal(ote) eventJSON, err := json.Marshal(ote)
if err != nil { if err != nil {
return err return err