diff --git a/syncapi/consumers/eduserver_typing.go b/syncapi/consumers/eduserver_typing.go index 28574b502..74b2a1905 100644 --- a/syncapi/consumers/eduserver_typing.go +++ b/syncapi/consumers/eduserver_typing.go @@ -92,6 +92,7 @@ func (s *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error typingPos = s.db.RemoveTypingUser(typingEvent.UserID, typingEvent.RoomID) } - s.notifier.OnNewTyping(output.Event.RoomID, types.StreamingToken{TypingPosition: typingPos}) + s.db.TypingStream().StreamAdvance(typingPos) + return nil } diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index 399f67ba8..b3b82b1bc 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -180,7 +180,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent( return err } - s.notifier.OnNewEvent(ev, "", nil, types.StreamingToken{PDUPosition: pduPos}) + s.db.PDUStream().StreamAdvance(pduPos) return nil } @@ -219,7 +219,7 @@ func (s *OutputRoomEventConsumer) onOldRoomEvent( return err } - s.notifier.OnNewEvent(ev, "", nil, types.StreamingToken{PDUPosition: pduPos}) + s.db.PDUStream().StreamAdvance(pduPos) return nil } diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go index 14389ebbf..224517e00 100644 --- a/syncapi/routing/messages.go +++ b/syncapi/routing/messages.go @@ -92,7 +92,7 @@ func OnIncomingMessagesRequest( if emptyFromSupplied { // NOTSPEC: We will pretend they used the latest sync token if no ?from= was provided. // We do this to allow clients to get messages without having to call `/sync` e.g Cerulean - currPos := srp.Notifier.CurrentPosition() + currPos := types.TopologyToken{} // .Notifier.CurrentPosition() fromQuery = currPos.String() } diff --git a/syncapi/storage/shared/stream_pdu.go b/syncapi/storage/shared/stream_pdu.go index 664c85fae..ed6631563 100644 --- a/syncapi/storage/shared/stream_pdu.go +++ b/syncapi/storage/shared/stream_pdu.go @@ -5,7 +5,6 @@ import ( "sync" "github.com/matrix-org/dendrite/syncapi/types" - userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" ) @@ -42,10 +41,8 @@ func (p *PDUStreamProvider) StreamAdvance( func (p *PDUStreamProvider) StreamRange( ctx context.Context, - res *types.Response, - device *userapi.Device, + req *types.StreamRangeRequest, from, to types.StreamingToken, - filter gomatrixserverlib.EventFilter, ) (newPos types.StreamingToken) { r := types.Range{ From: from.PDUPosition, @@ -59,32 +56,37 @@ func (p *PDUStreamProvider) StreamRange( var err error var events []types.StreamEvent var stateDeltas []stateDelta + var joinedRooms []string // TODO: use filter provided in request stateFilter := gomatrixserverlib.DefaultStateFilter() if from.IsEmpty() { - if stateDeltas, _, err = p.DB.getStateDeltas(ctx, device, nil, r, device.UserID, &stateFilter); err != nil { + if stateDeltas, joinedRooms, err = p.DB.getStateDeltas(ctx, req.Device, nil, r, req.Device.UserID, &stateFilter); err != nil { return } } else { - if stateDeltas, _, err = p.DB.getStateDeltasForFullStateSync(ctx, device, nil, r, device.UserID, &stateFilter); err != nil { + if stateDeltas, joinedRooms, err = p.DB.getStateDeltasForFullStateSync(ctx, req.Device, nil, r, req.Device.UserID, &stateFilter); err != nil { return } } + for _, roomID := range joinedRooms { + req.Rooms[roomID] = "join" + } + for _, stateDelta := range stateDeltas { roomID := stateDelta.roomID room := types.JoinResponse{} if r.Backwards { // When using backward ordering, we want the most recent events first. - if events, _, err = p.DB.OutputEvents.SelectRecentEvents(ctx, nil, roomID, r, filter.Limit, false, false); err != nil { + if events, _, err = p.DB.OutputEvents.SelectRecentEvents(ctx, nil, roomID, r, req.Filter.Limit, false, false); err != nil { return } } else { // When using forward ordering, we want the least recent events first. - if events, err = p.DB.OutputEvents.SelectEarlyEvents(ctx, nil, roomID, r, filter.Limit); err != nil { + if events, err = p.DB.OutputEvents.SelectEarlyEvents(ctx, nil, roomID, r, req.Filter.Limit); err != nil { return } } @@ -110,7 +112,7 @@ func (p *PDUStreamProvider) StreamRange( // TODO: fill in prev_batch - res.Rooms.Join[roomID] = room + req.Response.Rooms.Join[roomID] = room } return newPos diff --git a/syncapi/storage/shared/stream_typing.go b/syncapi/storage/shared/stream_typing.go index 1e2f8e094..5c28712ce 100644 --- a/syncapi/storage/shared/stream_typing.go +++ b/syncapi/storage/shared/stream_typing.go @@ -2,10 +2,11 @@ package shared import ( "context" + "encoding/json" + "fmt" "sync" "github.com/matrix-org/dendrite/syncapi/types" - userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" ) @@ -33,12 +34,40 @@ func (p *TypingStreamProvider) StreamAdvance( func (p *TypingStreamProvider) StreamRange( ctx context.Context, - res *types.Response, - device *userapi.Device, + req *types.StreamRangeRequest, from, to types.StreamingToken, - filter gomatrixserverlib.EventFilter, ) types.StreamingToken { - return types.StreamingToken{} + var err error + for roomID := range req.Rooms { + // This may have already been set by a previous stream, so + // reuse it if it exists. + jr := req.Response.Rooms.Join[roomID] + + if users, updated := p.DB.EDUCache.GetTypingUsersIfUpdatedAfter( + roomID, int64(from.TypingPosition), + ); updated { + ev := gomatrixserverlib.ClientEvent{ + Type: gomatrixserverlib.MTyping, + } + ev.Content, err = json.Marshal(map[string]interface{}{ + "user_ids": users, + }) + if err != nil { + return types.StreamingToken{} + } + + fmt.Println("Typing", roomID, "users", users) + + jr.Ephemeral.Events = append(jr.Ephemeral.Events, ev) + req.Response.Rooms.Join[roomID] = jr + } else { + fmt.Println("Typing", roomID, "not updated") + } + } + + return types.StreamingToken{ + TypingPosition: types.StreamPosition(p.DB.EDUCache.GetLatestSyncPosition()), + } } func (p *TypingStreamProvider) StreamNotifyAfter( diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index c7d4035f3..89a078099 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -355,8 +355,6 @@ func (d *Database) WriteEvent( } pduPosition = pos - d.PDUStreamProvider.StreamAdvance(pos) - if err = d.Topology.InsertEventInTopology(ctx, txn, ev, pos); err != nil { return fmt.Errorf("d.Topology.InsertEventInTopology: %w", err) } diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index 7357cae33..032031193 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -172,9 +172,6 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. rp.updateLastSeen(req, device) - syncData = types.NewResponse() - filter := gomatrixserverlib.DefaultEventFilter() - waitingSyncRequests.Inc() defer waitingSyncRequests.Dec() @@ -206,16 +203,23 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. // latest.ApplyUpdates(rp.inviteStream.StreamLatestPosition(syncReq.ctx)) // latest.ApplyUpdates(rp.deviceListStream.StreamLatestPosition(syncReq.ctx)) - syncData.NextBatch.ApplyUpdates(rp.pduStream.StreamRange(syncReq.ctx, syncData, device, syncReq.since, latest, filter)) - syncData.NextBatch.ApplyUpdates(rp.typingStream.StreamRange(syncReq.ctx, syncData, device, syncReq.since, latest, filter)) - // syncData.NextBatch.ApplyUpdates(rp.receiptStream.StreamRange(syncReq.ctx, syncData, device, syncReq.since, latest, filter)) - // syncData.NextBatch.ApplyUpdates(rp.sendToDeviceStream.StreamRange(syncReq.ctx, syncData, device, syncReq.since, latest, filter)) - // syncData.NextBatch.ApplyUpdates(rp.inviteStream.StreamRange(syncReq.ctx, syncData, device, syncReq.since, latest, filter)) - // syncData.NextBatch.ApplyUpdates(rp.inviteStream.StreamRange(syncReq.ctx, syncData, device, syncReq.since, latest, filter)) + sr := &types.StreamRangeRequest{ + Device: device, // + Response: types.NewResponse(), // Populated by all streams + Filter: gomatrixserverlib.DefaultEventFilter(), // + Rooms: make(map[string]string), // Populated by the PDU stream + } + + sr.Response.NextBatch.ApplyUpdates(rp.pduStream.StreamRange(syncReq.ctx, sr, syncReq.since, latest)) + sr.Response.NextBatch.ApplyUpdates(rp.typingStream.StreamRange(syncReq.ctx, sr, syncReq.since, latest)) + // sr.Response.NextBatch.ApplyUpdates(rp.receiptStream.StreamRange(syncReq.ctx, sr, syncReq.since, latest)) + // sr.Response.NextBatch.ApplyUpdates(rp.sendToDeviceStream.StreamRange(syncReq.ctx, sr, syncReq.since, latest)) + // sr.Response.NextBatch.ApplyUpdates(rp.inviteStream.StreamRange(syncReq.ctx, sr, syncReq.since, latest)) + // sr.Response.NextBatch.ApplyUpdates(rp.inviteStream.StreamRange(syncReq.ctx, sr, syncReq.since, latest)) return util.JSONResponse{ Code: http.StatusOK, - JSON: syncData, + JSON: sr.Response, } } diff --git a/syncapi/types/provider.go b/syncapi/types/provider.go index 0c6c2ea65..721efc5db 100644 --- a/syncapi/types/provider.go +++ b/syncapi/types/provider.go @@ -7,6 +7,13 @@ import ( "github.com/matrix-org/gomatrixserverlib" ) +type StreamRangeRequest struct { + Device *userapi.Device + Response *Response + Filter gomatrixserverlib.EventFilter + Rooms map[string]string +} + type StreamProvider interface { StreamSetup() @@ -17,7 +24,7 @@ type StreamProvider interface { // StreamRange will update the response to include all updates between // the from and to sync positions. It will always return immediately, // making no changes if the range contains no updates. - StreamRange(ctx context.Context, res *Response, device *userapi.Device, from, to StreamingToken, filter gomatrixserverlib.EventFilter) StreamingToken + StreamRange(ctx context.Context, req *StreamRangeRequest, from, to StreamingToken) StreamingToken // StreamNotifyAfter returns a channel which will be closed once the // stream advances past the "from" position.