Remove users from typing list when typing status times out

Signed-off-by: Alex Chen <minecnly@gmail.com>
This commit is contained in:
Cnly 2019-06-27 16:11:29 +08:00
parent d79a6a21c7
commit 19f1a6fe0d
3 changed files with 28 additions and 10 deletions

View file

@ -45,11 +45,13 @@ func NewOutputTypingEventConsumer(
Consumer: kafkaConsumer, Consumer: kafkaConsumer,
PartitionStore: store, PartitionStore: store,
} }
s := &OutputTypingEventConsumer{ s := &OutputTypingEventConsumer{
typingConsumer: &consumer, typingConsumer: &consumer,
db: store, db: store,
notifier: n, notifier: n,
} }
consumer.ProcessMessage = s.onMessage consumer.ProcessMessage = s.onMessage
return s return s
@ -57,6 +59,10 @@ func NewOutputTypingEventConsumer(
// Start consuming from typing api // Start consuming from typing api
func (s *OutputTypingEventConsumer) Start() error { func (s *OutputTypingEventConsumer) Start() error {
s.db.SetTypingTimeoutCallback(func(userID, roomID string, latestSyncPosition int64) {
s.notifier.OnNewEvent(nil, roomID, nil, types.SyncPosition{TypingPosition: latestSyncPosition})
})
return s.typingConsumer.Start() return s.typingConsumer.Start()
} }

View file

@ -515,6 +515,10 @@ func (d *SyncServerDatasource) RetireInviteEvent(
return err return err
} }
func (d *SyncServerDatasource) SetTypingTimeoutCallback(fn cache.TimeoutCallbackFn) {
d.typingCache.SetTimeoutCallback(fn)
}
// AddTypingUser adds a typing user to the typing cache. // AddTypingUser adds a typing user to the typing cache.
// Returns the latest sync position for typing notifications after update. // Returns the latest sync position for typing notifications after update.
func (d *SyncServerDatasource) AddTypingUser( func (d *SyncServerDatasource) AddTypingUser(

View file

@ -22,6 +22,11 @@ const defaultTypingTimeout = 10 * time.Second
// userSet is a map of user IDs to a timer, timer fires at expiry. // userSet is a map of user IDs to a timer, timer fires at expiry.
type userSet map[string]*time.Timer type userSet map[string]*time.Timer
// TimeoutCallbackFn is a function called right after removal of a user
// from the typing user list due to timeout.
// latestSyncPosition is the typing sync position after the removal.
type TimeoutCallbackFn func(userID, roomID string, latestSyncPosition int64)
type roomData struct { type roomData struct {
syncPosition int64 syncPosition int64
userSet userSet userSet userSet
@ -32,6 +37,7 @@ type TypingCache struct {
sync.RWMutex sync.RWMutex
latestSyncPosition int64 latestSyncPosition int64
data map[string]*roomData data map[string]*roomData
timeoutCallback TimeoutCallbackFn
} }
// Create a roomData with its sync position set to the latest sync position. // Create a roomData with its sync position set to the latest sync position.
@ -48,6 +54,12 @@ func NewTypingCache() *TypingCache {
return &TypingCache{data: make(map[string]*roomData)} return &TypingCache{data: make(map[string]*roomData)}
} }
// SetTimeoutCallback sets a callback function that is called right after
// a user is removed from the typing user list due to timeout.
func (t *TypingCache) SetTimeoutCallback(fn TimeoutCallbackFn) {
t.timeoutCallback = fn
}
// GetTypingUsers returns the list of users typing in a room. // GetTypingUsers returns the list of users typing in a room.
func (t *TypingCache) GetTypingUsers(roomID string) []string { func (t *TypingCache) GetTypingUsers(roomID string) []string {
users, _ := t.GetTypingUsersIfUpdatedAfter(roomID, 0) users, _ := t.GetTypingUsersIfUpdatedAfter(roomID, 0)
@ -86,7 +98,12 @@ func (t *TypingCache) AddTypingUser(
) int64 { ) int64 {
expireTime := getExpireTime(expire) expireTime := getExpireTime(expire)
if until := time.Until(expireTime); until > 0 { if until := time.Until(expireTime); until > 0 {
timer := time.AfterFunc(until, t.timeoutCallback(userID, roomID)) timer := time.AfterFunc(until, func() {
latestSyncPosition := t.RemoveUser(userID, roomID)
if t.timeoutCallback != nil {
t.timeoutCallback(userID, roomID, latestSyncPosition)
}
})
return t.addUser(userID, roomID, timer) return t.addUser(userID, roomID, timer)
} }
return t.GetLatestSyncPosition() return t.GetLatestSyncPosition()
@ -123,15 +140,6 @@ func (t *TypingCache) addUser(
return t.latestSyncPosition return t.latestSyncPosition
} }
// Returns a function which is called after timeout happens.
// This removes the user.
func (t *TypingCache) timeoutCallback(userID, roomID string) func() {
return func() {
// TODO: Notify syncing users about removal
t.RemoveUser(userID, roomID)
}
}
// RemoveUser with mutex lock & stop the timer. // RemoveUser with mutex lock & stop the timer.
// Returns the latest sync position for typing after update. // Returns the latest sync position for typing after update.
func (t *TypingCache) RemoveUser(userID, roomID string) int64 { func (t *TypingCache) RemoveUser(userID, roomID string) int64 {