From 19f1a6fe0d543ab5f8b95de3883591235dfdeb8f Mon Sep 17 00:00:00 2001 From: Cnly Date: Thu, 27 Jun 2019 16:11:29 +0800 Subject: [PATCH] Remove users from typing list when typing status times out Signed-off-by: Alex Chen --- syncapi/consumers/typingserver.go | 6 ++++++ syncapi/storage/syncserver.go | 4 ++++ typingserver/cache/cache.go | 28 ++++++++++++++++++---------- 3 files changed, 28 insertions(+), 10 deletions(-) diff --git a/syncapi/consumers/typingserver.go b/syncapi/consumers/typingserver.go index 5efb14faf..601ebaabd 100644 --- a/syncapi/consumers/typingserver.go +++ b/syncapi/consumers/typingserver.go @@ -45,11 +45,13 @@ func NewOutputTypingEventConsumer( Consumer: kafkaConsumer, PartitionStore: store, } + s := &OutputTypingEventConsumer{ typingConsumer: &consumer, db: store, notifier: n, } + consumer.ProcessMessage = s.onMessage return s @@ -57,6 +59,10 @@ func NewOutputTypingEventConsumer( // Start consuming from typing api func (s *OutputTypingEventConsumer) Start() error { + s.db.SetTypingTimeoutCallback(func(userID, roomID string, latestSyncPosition int64) { + s.notifier.OnNewEvent(nil, roomID, nil, types.SyncPosition{TypingPosition: latestSyncPosition}) + }) + return s.typingConsumer.Start() } diff --git a/syncapi/storage/syncserver.go b/syncapi/storage/syncserver.go index 4b85cafdc..e76a38e0d 100644 --- a/syncapi/storage/syncserver.go +++ b/syncapi/storage/syncserver.go @@ -515,6 +515,10 @@ func (d *SyncServerDatasource) RetireInviteEvent( return err } +func (d *SyncServerDatasource) SetTypingTimeoutCallback(fn cache.TimeoutCallbackFn) { + d.typingCache.SetTimeoutCallback(fn) +} + // AddTypingUser adds a typing user to the typing cache. // Returns the latest sync position for typing notifications after update. func (d *SyncServerDatasource) AddTypingUser( diff --git a/typingserver/cache/cache.go b/typingserver/cache/cache.go index 356228a0d..2571e7427 100644 --- a/typingserver/cache/cache.go +++ b/typingserver/cache/cache.go @@ -22,6 +22,11 @@ const defaultTypingTimeout = 10 * time.Second // userSet is a map of user IDs to a timer, timer fires at expiry. type userSet map[string]*time.Timer +// TimeoutCallbackFn is a function called right after removal of a user +// from the typing user list due to timeout. +// latestSyncPosition is the typing sync position after the removal. +type TimeoutCallbackFn func(userID, roomID string, latestSyncPosition int64) + type roomData struct { syncPosition int64 userSet userSet @@ -32,6 +37,7 @@ type TypingCache struct { sync.RWMutex latestSyncPosition int64 data map[string]*roomData + timeoutCallback TimeoutCallbackFn } // Create a roomData with its sync position set to the latest sync position. @@ -48,6 +54,12 @@ func NewTypingCache() *TypingCache { return &TypingCache{data: make(map[string]*roomData)} } +// SetTimeoutCallback sets a callback function that is called right after +// a user is removed from the typing user list due to timeout. +func (t *TypingCache) SetTimeoutCallback(fn TimeoutCallbackFn) { + t.timeoutCallback = fn +} + // GetTypingUsers returns the list of users typing in a room. func (t *TypingCache) GetTypingUsers(roomID string) []string { users, _ := t.GetTypingUsersIfUpdatedAfter(roomID, 0) @@ -86,7 +98,12 @@ func (t *TypingCache) AddTypingUser( ) int64 { expireTime := getExpireTime(expire) if until := time.Until(expireTime); until > 0 { - timer := time.AfterFunc(until, t.timeoutCallback(userID, roomID)) + timer := time.AfterFunc(until, func() { + latestSyncPosition := t.RemoveUser(userID, roomID) + if t.timeoutCallback != nil { + t.timeoutCallback(userID, roomID, latestSyncPosition) + } + }) return t.addUser(userID, roomID, timer) } return t.GetLatestSyncPosition() @@ -123,15 +140,6 @@ func (t *TypingCache) addUser( return t.latestSyncPosition } -// Returns a function which is called after timeout happens. -// This removes the user. -func (t *TypingCache) timeoutCallback(userID, roomID string) func() { - return func() { - // TODO: Notify syncing users about removal - t.RemoveUser(userID, roomID) - } -} - // RemoveUser with mutex lock & stop the timer. // Returns the latest sync position for typing after update. func (t *TypingCache) RemoveUser(userID, roomID string) int64 {