diff --git a/eduserver/cache/cache.go b/eduserver/cache/cache.go index f637d7c97..974808df8 100644 --- a/eduserver/cache/cache.go +++ b/eduserver/cache/cache.go @@ -151,6 +151,8 @@ func (t *EDUCache) RemoveUser(userID, roomID string) int64 { t.Lock() defer t.Unlock() + t.latestSyncPosition++ + roomData, ok := t.data[roomID] if !ok { return t.latestSyncPosition @@ -164,7 +166,6 @@ func (t *EDUCache) RemoveUser(userID, roomID string) int64 { timer.Stop() delete(roomData.userSet, userID) - t.latestSyncPosition++ t.data[roomID].syncPosition = t.latestSyncPosition return t.latestSyncPosition diff --git a/syncapi/consumers/eduserver_receipts.go b/syncapi/consumers/eduserver_receipts.go index 68b388acb..f52ea4360 100644 --- a/syncapi/consumers/eduserver_receipts.go +++ b/syncapi/consumers/eduserver_receipts.go @@ -86,7 +86,7 @@ func (s *OutputReceiptEventConsumer) onMessage(msg *sarama.ConsumerMessage) erro return err } - s.streams.TypingStreamProvider.Advance(streamPos) + s.streams.ReceiptStreamProvider.Advance(streamPos) return nil } diff --git a/syncapi/consumers/eduserver_typing.go b/syncapi/consumers/eduserver_typing.go index 853d3845e..234ec8d21 100644 --- a/syncapi/consumers/eduserver_typing.go +++ b/syncapi/consumers/eduserver_typing.go @@ -16,6 +16,7 @@ package consumers import ( "encoding/json" + "fmt" "github.com/Shopify/sarama" "github.com/matrix-org/dendrite/eduserver/api" @@ -65,11 +66,9 @@ func NewOutputTypingEventConsumer( // Start consuming from EDU api func (s *OutputTypingEventConsumer) Start() error { - /* - s.eduCache.SetTypingTimeoutCallback(func(userID, roomID string, latestSyncPosition int64) { - s.eduCache.TypingStream().Advance(types.StreamPosition(latestSyncPosition)) - }) - */ + s.eduCache.SetTimeoutCallback(func(userID, roomID string, latestSyncPosition int64) { + s.streams.TypingStreamProvider.Advance(types.StreamPosition(latestSyncPosition)) + }) return s.typingConsumer.Start() } @@ -99,7 +98,9 @@ func (s *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error ) } + fmt.Println("Advancing typing position to", typingPos) s.streams.TypingStreamProvider.Advance(typingPos) + fmt.Println("Advanced typing position to", typingPos) return nil } diff --git a/syncapi/streams/template_pstream.go b/syncapi/streams/template_pstream.go index 56f374a1d..1c249b767 100644 --- a/syncapi/streams/template_pstream.go +++ b/syncapi/streams/template_pstream.go @@ -31,26 +31,25 @@ func (p *PartitionedStreamProvider) Advance( latest types.LogPosition, ) { p.latestMutex.Lock() - defer p.latestMutex.Unlock() - if latest.IsAfter(&p.latest) { p.latest = latest + } + p.latestMutex.Unlock() - p.subscriptionsMutex.Lock() - for id, s := range p.subscriptions { - select { - case <-s.ctx.Done(): + p.subscriptionsMutex.Lock() + defer p.subscriptionsMutex.Unlock() + + for id, s := range p.subscriptions { + select { + case <-s.ctx.Done(): + close(s.ch) + delete(p.subscriptions, id) + default: + if latest.IsAfter(&s.from) { close(s.ch) delete(p.subscriptions, id) - continue - default: - if latest.IsAfter(&s.from) { - close(s.ch) - delete(p.subscriptions, id) - } } } - p.subscriptionsMutex.Unlock() } } diff --git a/syncapi/streams/template_stream.go b/syncapi/streams/template_stream.go index 9ce3ef852..84a59d315 100644 --- a/syncapi/streams/template_stream.go +++ b/syncapi/streams/template_stream.go @@ -31,26 +31,25 @@ func (p *StreamProvider) Advance( latest types.StreamPosition, ) { p.latestMutex.Lock() - defer p.latestMutex.Unlock() - if latest > p.latest { p.latest = latest + } + p.latestMutex.Unlock() - p.subscriptionsMutex.Lock() - for id, s := range p.subscriptions { - select { - case <-s.ctx.Done(): + p.subscriptionsMutex.Lock() + defer p.subscriptionsMutex.Unlock() + + for id, s := range p.subscriptions { + select { + case <-s.ctx.Done(): + close(s.ch) + delete(p.subscriptions, id) + default: + if latest > s.from { close(s.ch) delete(p.subscriptions, id) - continue - default: - if latest > s.from { - close(s.ch) - delete(p.subscriptions, id) - } } } - p.subscriptionsMutex.Unlock() } } diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index 7096347b5..95f9fc157 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -221,38 +221,31 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. syncReq.Response.NextBatch = types.StreamingToken{ PDUPosition: rp.streams.PDUStreamProvider.IncrementalSync( syncReq.Context, syncReq, - syncReq.Since.PDUPosition, // from - rp.streams.PDUStreamProvider.LatestPosition(syncReq.Context), // to + syncReq.Since.PDUPosition, rp.streams.PDUStreamProvider.LatestPosition(syncReq.Context), ), TypingPosition: rp.streams.TypingStreamProvider.IncrementalSync( syncReq.Context, syncReq, - syncReq.Since.TypingPosition, // from - rp.streams.TypingStreamProvider.LatestPosition(syncReq.Context), // to + syncReq.Since.TypingPosition, rp.streams.TypingStreamProvider.LatestPosition(syncReq.Context), ), ReceiptPosition: rp.streams.ReceiptStreamProvider.IncrementalSync( syncReq.Context, syncReq, - syncReq.Since.ReceiptPosition, // from - rp.streams.ReceiptStreamProvider.LatestPosition(syncReq.Context), // to + syncReq.Since.ReceiptPosition, rp.streams.ReceiptStreamProvider.LatestPosition(syncReq.Context), ), InvitePosition: rp.streams.InviteStreamProvider.IncrementalSync( syncReq.Context, syncReq, - syncReq.Since.InvitePosition, // from - rp.streams.InviteStreamProvider.LatestPosition(syncReq.Context), // to + syncReq.Since.InvitePosition, rp.streams.InviteStreamProvider.LatestPosition(syncReq.Context), ), SendToDevicePosition: rp.streams.SendToDeviceStreamProvider.IncrementalSync( syncReq.Context, syncReq, - syncReq.Since.SendToDevicePosition, // from - rp.streams.SendToDeviceStreamProvider.LatestPosition(syncReq.Context), // to + syncReq.Since.SendToDevicePosition, rp.streams.SendToDeviceStreamProvider.LatestPosition(syncReq.Context), ), AccountDataPosition: rp.streams.AccountDataStreamProvider.IncrementalSync( syncReq.Context, syncReq, - syncReq.Since.AccountDataPosition, // from - rp.streams.AccountDataStreamProvider.LatestPosition(syncReq.Context), // to + syncReq.Since.AccountDataPosition, rp.streams.AccountDataStreamProvider.LatestPosition(syncReq.Context), ), DeviceListPosition: rp.streams.DeviceListStreamProvider.IncrementalSync( syncReq.Context, syncReq, - syncReq.Since.DeviceListPosition, // from - rp.streams.DeviceListStreamProvider.LatestPosition(syncReq.Context), // to + syncReq.Since.DeviceListPosition, rp.streams.DeviceListStreamProvider.LatestPosition(syncReq.Context), ), } }