Typing event consumer now supplies real, not bogus position to notifier

Signed-off-by: Alex Chen <minecnly@gmail.com>
This commit is contained in:
Cnly 2019-06-26 22:46:12 +08:00
parent ad4f69a81c
commit d3ee72c59b
3 changed files with 31 additions and 13 deletions

View file

@ -72,13 +72,14 @@ func (s *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error
"room_id": output.Event.RoomID, "room_id": output.Event.RoomID,
}).Info("received data from typing server") }).Info("received data from typing server")
var typingPos int64
typingEvent := output.Event typingEvent := output.Event
if typingEvent.Typing { if typingEvent.Typing {
s.db.AddTypingUser(typingEvent.UserID, typingEvent.RoomID, output.ExpireTime) typingPos = s.db.AddTypingUser(typingEvent.UserID, typingEvent.RoomID, output.ExpireTime)
} else { } else {
s.db.RemoveTypingUser(typingEvent.UserID, typingEvent.RoomID) typingPos = s.db.RemoveTypingUser(typingEvent.UserID, typingEvent.RoomID)
} }
s.notifier.OnNewEvent(nil, output.Event.RoomID, nil, types.SyncPosition{TypingPosition: 1}) s.notifier.OnNewEvent(nil, output.Event.RoomID, nil, types.SyncPosition{TypingPosition: typingPos})
return nil return nil
} }

View file

@ -514,16 +514,20 @@ func (d *SyncServerDatasource) RetireInviteEvent(
return err return err
} }
// AddTypingUser adds a typing user to the typing cache.
// Returns the latest sync position for typing notifications after update.
func (d *SyncServerDatasource) AddTypingUser( func (d *SyncServerDatasource) AddTypingUser(
userID, roomID string, expireTime *time.Time, userID, roomID string, expireTime *time.Time,
) { ) int64 {
d.typingCache.AddTypingUser(userID, roomID, expireTime) return d.typingCache.AddTypingUser(userID, roomID, expireTime)
} }
// RemoveTypingUser removes a typing user from the typing cache.
// Returns the latest sync position for typing notifications after update.
func (d *SyncServerDatasource) RemoveTypingUser( func (d *SyncServerDatasource) RemoveTypingUser(
userID, roomID string, userID, roomID string,
) { ) int64 {
d.typingCache.RemoveUser(userID, roomID) return d.typingCache.RemoveUser(userID, roomID)
} }
func (d *SyncServerDatasource) addInvitesToResponse( func (d *SyncServerDatasource) addInvitesToResponse(

View file

@ -80,16 +80,23 @@ func (t *TypingCache) GetTypingUsersIfUpdatedAfter(
// AddTypingUser sets an user as typing in a room. // AddTypingUser sets an user as typing in a room.
// expire is the time when the user typing should time out. // expire is the time when the user typing should time out.
// if expire is nil, defaultTypingTimeout is assumed. // if expire is nil, defaultTypingTimeout is assumed.
func (t *TypingCache) AddTypingUser(userID, roomID string, expire *time.Time) { // Returns the latest sync position for typing after update.
func (t *TypingCache) AddTypingUser(
userID, roomID string, expire *time.Time,
) int64 {
expireTime := getExpireTime(expire) expireTime := getExpireTime(expire)
if until := time.Until(expireTime); until > 0 { if until := time.Until(expireTime); until > 0 {
timer := time.AfterFunc(until, t.timeoutCallback(userID, roomID)) timer := time.AfterFunc(until, t.timeoutCallback(userID, roomID))
t.addUser(userID, roomID, timer) return t.addUser(userID, roomID, timer)
} }
return t.GetLatestSyncPosition()
} }
// addUser with mutex lock & replace the previous timer. // addUser with mutex lock & replace the previous timer.
func (t *TypingCache) addUser(userID, roomID string, expiryTimer *time.Timer) { // Returns the latest typing sync position after update.
func (t *TypingCache) addUser(
userID, roomID string, expiryTimer *time.Timer,
) int64 {
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
@ -112,29 +119,33 @@ func (t *TypingCache) addUser(userID, roomID string, expiryTimer *time.Timer) {
} }
t.data[roomID].userSet[userID] = expiryTimer t.data[roomID].userSet[userID] = expiryTimer
return t.latestSyncPosition
} }
// Returns a function which is called after timeout happens. // Returns a function which is called after timeout happens.
// This removes the user. // This removes the user.
func (t *TypingCache) timeoutCallback(userID, roomID string) func() { func (t *TypingCache) timeoutCallback(userID, roomID string) func() {
return func() { return func() {
// TODO: Notify syncing users about removal
t.RemoveUser(userID, roomID) t.RemoveUser(userID, roomID)
} }
} }
// RemoveUser with mutex lock & stop the timer. // RemoveUser with mutex lock & stop the timer.
func (t *TypingCache) RemoveUser(userID, roomID string) { // Returns the latest sync position for typing after update.
func (t *TypingCache) RemoveUser(userID, roomID string) int64 {
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
roomData, ok := t.data[roomID] roomData, ok := t.data[roomID]
if !ok { if !ok {
return return t.latestSyncPosition
} }
timer, ok := roomData.userSet[userID] timer, ok := roomData.userSet[userID]
if !ok { if !ok {
return return t.latestSyncPosition
} }
timer.Stop() timer.Stop()
@ -142,6 +153,8 @@ func (t *TypingCache) RemoveUser(userID, roomID string) {
t.latestSyncPosition++ t.latestSyncPosition++
t.data[roomID].syncPosition = t.latestSyncPosition t.data[roomID].syncPosition = t.latestSyncPosition
return t.latestSyncPosition
} }
func (t *TypingCache) GetLatestSyncPosition() int64 { func (t *TypingCache) GetLatestSyncPosition() int64 {