diff --git a/syncapi/consumers/typingserver.go b/syncapi/consumers/typingserver.go index d65301cb2..66124db9e 100644 --- a/syncapi/consumers/typingserver.go +++ b/syncapi/consumers/typingserver.go @@ -72,13 +72,14 @@ func (s *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error "room_id": output.Event.RoomID, }).Info("received data from typing server") + var typingPos int64 typingEvent := output.Event if typingEvent.Typing { - s.db.AddTypingUser(typingEvent.UserID, typingEvent.RoomID, output.ExpireTime) + typingPos = s.db.AddTypingUser(typingEvent.UserID, typingEvent.RoomID, output.ExpireTime) } else { - s.db.RemoveTypingUser(typingEvent.UserID, typingEvent.RoomID) + typingPos = s.db.RemoveTypingUser(typingEvent.UserID, typingEvent.RoomID) } - s.notifier.OnNewEvent(nil, output.Event.RoomID, nil, types.SyncPosition{TypingPosition: 1}) + s.notifier.OnNewEvent(nil, output.Event.RoomID, nil, types.SyncPosition{TypingPosition: typingPos}) return nil } diff --git a/syncapi/storage/syncserver.go b/syncapi/storage/syncserver.go index 13d718df7..ac77efebc 100644 --- a/syncapi/storage/syncserver.go +++ b/syncapi/storage/syncserver.go @@ -514,16 +514,20 @@ func (d *SyncServerDatasource) RetireInviteEvent( return err } +// AddTypingUser adds a typing user to the typing cache. +// Returns the latest sync position for typing notifications after update. func (d *SyncServerDatasource) AddTypingUser( userID, roomID string, expireTime *time.Time, -) { - d.typingCache.AddTypingUser(userID, roomID, expireTime) +) int64 { + return d.typingCache.AddTypingUser(userID, roomID, expireTime) } +// RemoveTypingUser removes a typing user from the typing cache. +// Returns the latest sync position for typing notifications after update. func (d *SyncServerDatasource) RemoveTypingUser( userID, roomID string, -) { - d.typingCache.RemoveUser(userID, roomID) +) int64 { + return d.typingCache.RemoveUser(userID, roomID) } func (d *SyncServerDatasource) addInvitesToResponse( diff --git a/typingserver/cache/cache.go b/typingserver/cache/cache.go index 04de0e822..356228a0d 100644 --- a/typingserver/cache/cache.go +++ b/typingserver/cache/cache.go @@ -80,16 +80,23 @@ func (t *TypingCache) GetTypingUsersIfUpdatedAfter( // AddTypingUser sets an user as typing in a room. // expire is the time when the user typing should time out. // if expire is nil, defaultTypingTimeout is assumed. -func (t *TypingCache) AddTypingUser(userID, roomID string, expire *time.Time) { +// Returns the latest sync position for typing after update. +func (t *TypingCache) AddTypingUser( + userID, roomID string, expire *time.Time, +) int64 { expireTime := getExpireTime(expire) if until := time.Until(expireTime); until > 0 { timer := time.AfterFunc(until, t.timeoutCallback(userID, roomID)) - t.addUser(userID, roomID, timer) + return t.addUser(userID, roomID, timer) } + return t.GetLatestSyncPosition() } // addUser with mutex lock & replace the previous timer. -func (t *TypingCache) addUser(userID, roomID string, expiryTimer *time.Timer) { +// Returns the latest typing sync position after update. +func (t *TypingCache) addUser( + userID, roomID string, expiryTimer *time.Timer, +) int64 { t.Lock() defer t.Unlock() @@ -112,29 +119,33 @@ func (t *TypingCache) addUser(userID, roomID string, expiryTimer *time.Timer) { } t.data[roomID].userSet[userID] = expiryTimer + + return t.latestSyncPosition } // Returns a function which is called after timeout happens. // This removes the user. func (t *TypingCache) timeoutCallback(userID, roomID string) func() { return func() { + // TODO: Notify syncing users about removal t.RemoveUser(userID, roomID) } } // RemoveUser with mutex lock & stop the timer. -func (t *TypingCache) RemoveUser(userID, roomID string) { +// Returns the latest sync position for typing after update. +func (t *TypingCache) RemoveUser(userID, roomID string) int64 { t.Lock() defer t.Unlock() roomData, ok := t.data[roomID] if !ok { - return + return t.latestSyncPosition } timer, ok := roomData.userSet[userID] if !ok { - return + return t.latestSyncPosition } timer.Stop() @@ -142,6 +153,8 @@ func (t *TypingCache) RemoveUser(userID, roomID string) { t.latestSyncPosition++ t.data[roomID].syncPosition = t.latestSyncPosition + + return t.latestSyncPosition } func (t *TypingCache) GetLatestSyncPosition() int64 {