Merge branch 'master' into neilalexander/sendtodevice

This commit is contained in:
Neil Alexander 2021-01-13 14:33:06 +00:00 committed by GitHub
commit 19ef429b55
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 66 additions and 60 deletions

View file

@ -273,6 +273,14 @@ func (r *messagesReq) retrieveEvents() (
return []gomatrixserverlib.ClientEvent{}, *r.from, *r.to, nil return []gomatrixserverlib.ClientEvent{}, *r.from, *r.to, nil
} }
// Get the position of the first and the last event in the room's topology.
// This position is currently determined by the event's depth, so we could
// also use it instead of retrieving from the database. However, if we ever
// change the way topological positions are defined (as depth isn't the most
// reliable way to define it), it would be easier and less troublesome to
// only have to change it in one place, i.e. the database.
start, end, err = r.getStartEnd(events)
// Sort the events to ensure we send them in the right order. // Sort the events to ensure we send them in the right order.
if r.backwardOrdering { if r.backwardOrdering {
// This reverses the array from old->new to new->old // This reverses the array from old->new to new->old
@ -292,14 +300,6 @@ func (r *messagesReq) retrieveEvents() (
// Convert all of the events into client events. // Convert all of the events into client events.
clientEvents = gomatrixserverlib.HeaderedToClientEvents(events, gomatrixserverlib.FormatAll) clientEvents = gomatrixserverlib.HeaderedToClientEvents(events, gomatrixserverlib.FormatAll)
// Get the position of the first and the last event in the room's topology.
// This position is currently determined by the event's depth, so we could
// also use it instead of retrieving from the database. However, if we ever
// change the way topological positions are defined (as depth isn't the most
// reliable way to define it), it would be easier and less troublesome to
// only have to change it in one place, i.e. the database.
start, end, err = r.getStartEnd(events)
return clientEvents, start, end, err return clientEvents, start, end, err
} }
@ -363,7 +363,7 @@ func (r *messagesReq) filterHistoryVisible(events []*gomatrixserverlib.HeaderedE
return events // apply no filtering as it defaults to Shared. return events // apply no filtering as it defaults to Shared.
} }
hisVis, _ := hisVisEvent.HistoryVisibility() hisVis, _ := hisVisEvent.HistoryVisibility()
if hisVis == "shared" { if hisVis == "shared" || hisVis == "world_readable" {
return events // apply no filtering return events // apply no filtering
} }
if membershipEvent == nil { if membershipEvent == nil {
@ -388,26 +388,16 @@ func (r *messagesReq) filterHistoryVisible(events []*gomatrixserverlib.HeaderedE
} }
func (r *messagesReq) getStartEnd(events []*gomatrixserverlib.HeaderedEvent) (start, end types.TopologyToken, err error) { func (r *messagesReq) getStartEnd(events []*gomatrixserverlib.HeaderedEvent) (start, end types.TopologyToken, err error) {
start, err = r.db.EventPositionInTopology( if r.backwardOrdering {
r.ctx, events[0].EventID(), start = *r.from
) if events[len(events)-1].Type() == gomatrixserverlib.MRoomCreate {
if err != nil { // NOTSPEC: We've hit the beginning of the room so there's really nowhere
err = fmt.Errorf("EventPositionInTopology: for start event %s: %w", events[0].EventID(), err) // else to go. This seems to fix Riot iOS from looping on /messages endlessly.
return end = types.TopologyToken{}
} } else {
if r.backwardOrdering && events[len(events)-1].Type() == gomatrixserverlib.MRoomCreate { end, err = r.db.EventPositionInTopology(
// We've hit the beginning of the room so there's really nowhere else r.ctx, events[0].EventID(),
// to go. This seems to fix Riot iOS from looping on /messages endlessly. )
end = types.TopologyToken{}
} else {
end, err = r.db.EventPositionInTopology(
r.ctx, events[len(events)-1].EventID(),
)
if err != nil {
err = fmt.Errorf("EventPositionInTopology: for end event %s: %w", events[len(events)-1].EventID(), err)
return
}
if r.backwardOrdering {
// A stream/topological position is a cursor located between two events. // A stream/topological position is a cursor located between two events.
// While they are identified in the code by the event on their right (if // While they are identified in the code by the event on their right (if
// we consider a left to right chronological order), tokens need to refer // we consider a left to right chronological order), tokens need to refer
@ -415,6 +405,15 @@ func (r *messagesReq) getStartEnd(events []*gomatrixserverlib.HeaderedEvent) (st
// end position we send in the response if we're going backward. // end position we send in the response if we're going backward.
end.Decrement() end.Decrement()
} }
} else {
start = *r.from
end, err = r.db.EventPositionInTopology(
r.ctx, events[len(events)-1].EventID(),
)
}
if err != nil {
err = fmt.Errorf("EventPositionInTopology: for end event %s: %w", events[len(events)-1].EventID(), err)
return
} }
return return
} }

View file

@ -82,11 +82,6 @@ func (p *AccountDataStreamProvider) IncrementalSync(
return from return from
} }
if len(dataTypes) == 0 {
// TODO: this fixes the sytest but is it the right thing to do?
dataTypes[""] = []string{"m.push_rules"}
}
// Iterate over the rooms // Iterate over the rooms
for roomID, dataTypes := range dataTypes { for roomID, dataTypes := range dataTypes {
// Request the missing data from the database // Request the missing data from the database
@ -114,7 +109,10 @@ func (p *AccountDataStreamProvider) IncrementalSync(
} }
} else { } else {
if roomData, ok := dataRes.RoomAccountData[roomID][dataType]; ok { if roomData, ok := dataRes.RoomAccountData[roomID][dataType]; ok {
joinData := req.Response.Rooms.Join[roomID] joinData := *types.NewJoinResponse()
if existing, ok := req.Response.Rooms.Join[roomID]; ok {
joinData = existing
}
joinData.AccountData.Events = append( joinData.AccountData.Events = append(
joinData.AccountData.Events, joinData.AccountData.Events,
gomatrixserverlib.ClientEvent{ gomatrixserverlib.ClientEvent{

View file

@ -173,22 +173,23 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
switch delta.Membership { switch delta.Membership {
case gomatrixserverlib.Join: case gomatrixserverlib.Join:
jr := types.NewJoinResponse() jr := types.NewJoinResponse()
jr.Timeline.PrevBatch = &prevBatch jr.Timeline.PrevBatch = &prevBatch
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = limited jr.Timeline.Limited = limited
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync) jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Join[delta.RoomID] = *jr res.Rooms.Join[delta.RoomID] = *jr
case gomatrixserverlib.Peek: case gomatrixserverlib.Peek:
jr := types.NewJoinResponse() jr := types.NewJoinResponse()
jr.Timeline.PrevBatch = &prevBatch jr.Timeline.PrevBatch = &prevBatch
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = limited jr.Timeline.Limited = limited
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync) jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Peek[delta.RoomID] = *jr res.Rooms.Peek[delta.RoomID] = *jr
case gomatrixserverlib.Leave: case gomatrixserverlib.Leave:
fallthrough // transitions to leave are the same as ban fallthrough // transitions to leave are the same as ban
case gomatrixserverlib.Ban: case gomatrixserverlib.Ban:
// TODO: recentEvents may contain events that this user is not allowed to see because they are // TODO: recentEvents may contain events that this user is not allowed to see because they are
// no longer in the room. // no longer in the room.

View file

@ -59,7 +59,10 @@ func (p *ReceiptStreamProvider) IncrementalSync(
} }
for roomID, receipts := range receiptsByRoom { for roomID, receipts := range receiptsByRoom {
jr := req.Response.Rooms.Join[roomID] jr := *types.NewJoinResponse()
if existing, ok := req.Response.Rooms.Join[roomID]; ok {
jr = existing
}
var ok bool var ok bool
ev := gomatrixserverlib.ClientEvent{ ev := gomatrixserverlib.ClientEvent{

View file

@ -32,7 +32,10 @@ func (p *TypingStreamProvider) IncrementalSync(
continue continue
} }
jr := req.Response.Rooms.Join[roomID] jr := *types.NewJoinResponse()
if existing, ok := req.Response.Rooms.Join[roomID]; ok {
jr = existing
}
if users, updated := p.EDUCache.GetTypingUsersIfUpdatedAfter( if users, updated := p.EDUCache.GetTypingUsersIfUpdatedAfter(
roomID, int64(from), roomID, int64(from),

View file

@ -360,11 +360,11 @@ type PrevEventRef struct {
type Response struct { type Response struct {
NextBatch StreamingToken `json:"next_batch"` NextBatch StreamingToken `json:"next_batch"`
AccountData struct { AccountData struct {
Events []gomatrixserverlib.ClientEvent `json:"events"` Events []gomatrixserverlib.ClientEvent `json:"events,omitempty"`
} `json:"account_data,omitempty"` } `json:"account_data"`
Presence struct { Presence struct {
Events []gomatrixserverlib.ClientEvent `json:"events"` Events []gomatrixserverlib.ClientEvent `json:"events,omitempty"`
} `json:"presence,omitempty"` } `json:"presence"`
Rooms struct { Rooms struct {
Join map[string]JoinResponse `json:"join"` Join map[string]JoinResponse `json:"join"`
Peek map[string]JoinResponse `json:"peek"` Peek map[string]JoinResponse `json:"peek"`
@ -372,13 +372,13 @@ type Response struct {
Leave map[string]LeaveResponse `json:"leave"` Leave map[string]LeaveResponse `json:"leave"`
} `json:"rooms"` } `json:"rooms"`
ToDevice struct { ToDevice struct {
Events []gomatrixserverlib.SendToDeviceEvent `json:"events"` Events []gomatrixserverlib.SendToDeviceEvent `json:"events,omitempty"`
} `json:"to_device"` } `json:"to_device"`
DeviceLists struct { DeviceLists struct {
Changed []string `json:"changed,omitempty"` Changed []string `json:"changed,omitempty"`
Left []string `json:"left,omitempty"` Left []string `json:"left,omitempty"`
} `json:"device_lists,omitempty"` } `json:"device_lists"`
DeviceListsOTKCount map[string]int `json:"device_one_time_keys_count"` DeviceListsOTKCount map[string]int `json:"device_one_time_keys_count,omitempty"`
} }
// NewResponse creates an empty response with initialised maps. // NewResponse creates an empty response with initialised maps.
@ -386,19 +386,19 @@ func NewResponse() *Response {
res := Response{} res := Response{}
// Pre-initialise the maps. Synapse will return {} even if there are no rooms under a specific section, // Pre-initialise the maps. Synapse will return {} even if there are no rooms under a specific section,
// so let's do the same thing. Bonus: this means we can't get dreaded 'assignment to entry in nil map' errors. // so let's do the same thing. Bonus: this means we can't get dreaded 'assignment to entry in nil map' errors.
res.Rooms.Join = make(map[string]JoinResponse) res.Rooms.Join = map[string]JoinResponse{}
res.Rooms.Peek = make(map[string]JoinResponse) res.Rooms.Peek = map[string]JoinResponse{}
res.Rooms.Invite = make(map[string]InviteResponse) res.Rooms.Invite = map[string]InviteResponse{}
res.Rooms.Leave = make(map[string]LeaveResponse) res.Rooms.Leave = map[string]LeaveResponse{}
// Also pre-intialise empty slices or else we'll insert 'null' instead of '[]' for the value. // Also pre-intialise empty slices or else we'll insert 'null' instead of '[]' for the value.
// TODO: We really shouldn't have to do all this to coerce encoding/json to Do The Right Thing. We should // TODO: We really shouldn't have to do all this to coerce encoding/json to Do The Right Thing. We should
// really be using our own Marshal/Unmarshal implementations otherwise this may prove to be a CPU bottleneck. // really be using our own Marshal/Unmarshal implementations otherwise this may prove to be a CPU bottleneck.
// This also applies to NewJoinResponse, NewInviteResponse and NewLeaveResponse. // This also applies to NewJoinResponse, NewInviteResponse and NewLeaveResponse.
res.AccountData.Events = make([]gomatrixserverlib.ClientEvent, 0) res.AccountData.Events = []gomatrixserverlib.ClientEvent{}
res.Presence.Events = make([]gomatrixserverlib.ClientEvent, 0) res.Presence.Events = []gomatrixserverlib.ClientEvent{}
res.ToDevice.Events = make([]gomatrixserverlib.SendToDeviceEvent, 0) res.ToDevice.Events = []gomatrixserverlib.SendToDeviceEvent{}
res.DeviceListsOTKCount = make(map[string]int) res.DeviceListsOTKCount = map[string]int{}
return &res return &res
} }
@ -435,10 +435,10 @@ type JoinResponse struct {
// NewJoinResponse creates an empty response with initialised arrays. // NewJoinResponse creates an empty response with initialised arrays.
func NewJoinResponse() *JoinResponse { func NewJoinResponse() *JoinResponse {
res := JoinResponse{} res := JoinResponse{}
res.State.Events = make([]gomatrixserverlib.ClientEvent, 0) res.State.Events = []gomatrixserverlib.ClientEvent{}
res.Timeline.Events = make([]gomatrixserverlib.ClientEvent, 0) res.Timeline.Events = []gomatrixserverlib.ClientEvent{}
res.Ephemeral.Events = make([]gomatrixserverlib.ClientEvent, 0) res.Ephemeral.Events = []gomatrixserverlib.ClientEvent{}
res.AccountData.Events = make([]gomatrixserverlib.ClientEvent, 0) res.AccountData.Events = []gomatrixserverlib.ClientEvent{}
return &res return &res
} }
@ -487,8 +487,8 @@ type LeaveResponse struct {
// NewLeaveResponse creates an empty response with initialised arrays. // NewLeaveResponse creates an empty response with initialised arrays.
func NewLeaveResponse() *LeaveResponse { func NewLeaveResponse() *LeaveResponse {
res := LeaveResponse{} res := LeaveResponse{}
res.State.Events = make([]gomatrixserverlib.ClientEvent, 0) res.State.Events = []gomatrixserverlib.ClientEvent{}
res.Timeline.Events = make([]gomatrixserverlib.ClientEvent, 0) res.Timeline.Events = []gomatrixserverlib.ClientEvent{}
return &res return &res
} }

View file

@ -501,3 +501,5 @@ Can forget room you've been kicked from
/joined_members return joined members /joined_members return joined members
A next_batch token can be used in the v1 messages API A next_batch token can be used in the v1 messages API
Users receive device_list updates for their own devices Users receive device_list updates for their own devices
m.room.history_visibility == "world_readable" allows/forbids appropriately for Guest users
m.room.history_visibility == "world_readable" allows/forbids appropriately for Real users