diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index 932bda164..cb9789bb8 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -765,6 +765,7 @@ func (d *Database) GetStateDeltas( } } + var membershipPositions = make(map[string]types.StreamPosition) // handle newly joined rooms and non-joined rooms for roomID, stateStreamEvents := range state { for _, ev := range stateStreamEvents { @@ -784,6 +785,7 @@ func (d *Database) GetStateDeltas( } return nil, nil, err } + membershipPositions[roomID] = ev.StreamPosition state[roomID] = s continue // we'll add this room in when we do joined rooms } @@ -802,9 +804,10 @@ func (d *Database) GetStateDeltas( // Add in currently joined rooms for _, joinedRoomID := range joinedRoomIDs { deltas = append(deltas, types.StateDelta{ - Membership: gomatrixserverlib.Join, - StateEvents: d.StreamEventsToEvents(device, state[joinedRoomID]), - RoomID: joinedRoomID, + Membership: gomatrixserverlib.Join, + MembershipPos: membershipPositions[joinedRoomID], + StateEvents: d.StreamEventsToEvents(device, state[joinedRoomID]), + RoomID: joinedRoomID, }) } diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index 00b3dfe3b..2d193f4f2 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -229,6 +229,11 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( // This is all "okay" assuming history_visibility == "shared" which it is by default. r.To = delta.MembershipPos } + // if we joined in between sync calls, send reset From to simulate a complete sync + if delta.Membership == gomatrixserverlib.Join && delta.MembershipPos <= r.To && delta.MembershipPos > r.From { + r.From = 0 + } + recentStreamEvents, limited, err := p.DB.RecentEvents( ctx, delta.RoomID, r, eventFilter, true, true,