Try to ask other servers in the room for missing events if the origin won't provide them

This commit is contained in:
Neil Alexander 2020-09-28 12:51:39 +01:00
parent ce318f53bc
commit cf76cfe4eb
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
3 changed files with 45 additions and 20 deletions

View file

@ -637,15 +637,43 @@ func (t *txnReq) getMissingEvents(ctx context.Context, e gomatrixserverlib.Event
if minDepth < 0 { if minDepth < 0 {
minDepth = 0 minDepth = 0
} }
missingResp, err := t.federation.LookupMissingEvents(ctx, t.Origin, e.RoomID(), gomatrixserverlib.MissingEvents{
Limit: 20, servers := []gomatrixserverlib.ServerName{t.Origin}
// synapse uses the min depth they've ever seen in that room serverReq := &api.QueryServerJoinedToRoomRequest{
MinDepth: minDepth, RoomID: e.RoomID(),
// The latest event IDs that the sender already has. These are skipped when retrieving the previous events of latest_events. }
EarliestEvents: latestEvents, serverRes := &api.QueryServerJoinedToRoomResponse{}
// The event IDs to retrieve the previous events for. if err = t.rsAPI.QueryServerJoinedToRoom(ctx, serverReq, serverRes); err == nil {
LatestEvents: []string{e.EventID()}, servers = append(servers, serverRes.ServerNames...)
}, roomVersion) }
var missingResp *gomatrixserverlib.RespMissingEvents
for _, server := range servers {
var m gomatrixserverlib.RespMissingEvents
if m, err = t.federation.LookupMissingEvents(ctx, server, e.RoomID(), gomatrixserverlib.MissingEvents{
Limit: 20,
// synapse uses the min depth they've ever seen in that room
MinDepth: minDepth,
// The latest event IDs that the sender already has. These are skipped when retrieving the previous events of latest_events.
EarliestEvents: latestEvents,
// The event IDs to retrieve the previous events for.
LatestEvents: []string{e.EventID()},
}, roomVersion); err == nil {
missingResp = &m
break
}
}
if missingResp == nil {
logger.WithError(err).Errorf(
"%s pushed us an event but %d server(s) couldn't give us details about prev_events via /get_missing_events - dropping this event until it can",
len(servers), t.Origin,
)
return nil, missingPrevEventsError{
eventID: e.EventID(),
err: err,
}
}
// security: how we handle failures depends on whether or not this event will become the new forward extremity for the room. // security: how we handle failures depends on whether or not this event will become the new forward extremity for the room.
// There's 2 scenarios to consider: // There's 2 scenarios to consider:
@ -658,16 +686,6 @@ func (t *txnReq) getMissingEvents(ctx context.Context, e gomatrixserverlib.Event
// https://github.com/matrix-org/synapse/pull/3456 // https://github.com/matrix-org/synapse/pull/3456
// https://github.com/matrix-org/synapse/blob/229eb81498b0fe1da81e9b5b333a0285acde9446/synapse/handlers/federation.py#L335 // https://github.com/matrix-org/synapse/blob/229eb81498b0fe1da81e9b5b333a0285acde9446/synapse/handlers/federation.py#L335
// For now, we do not allow Case B, so reject the event. // For now, we do not allow Case B, so reject the event.
if err != nil {
logger.WithError(err).Errorf(
"%s pushed us an event but couldn't give us details about prev_events via /get_missing_events - dropping this event until it can",
t.Origin,
)
return nil, missingPrevEventsError{
eventID: e.EventID(),
err: err,
}
}
logger.Infof("get_missing_events returned %d events", len(missingResp.Events)) logger.Infof("get_missing_events returned %d events", len(missingResp.Events))
// topologically sort and sanity check that we are making forward progress // topologically sort and sanity check that we are making forward progress

View file

@ -154,6 +154,8 @@ type QueryServerJoinedToRoomResponse struct {
RoomExists bool `json:"room_exists"` RoomExists bool `json:"room_exists"`
// True if we still believe that we are participating in the room // True if we still believe that we are participating in the room
IsInRoom bool `json:"is_in_room"` IsInRoom bool `json:"is_in_room"`
// List of servers that are also in the room
ServerNames []gomatrixserverlib.ServerName `json:"server_names"`
} }
// QueryServerAllowedToSeeEventRequest is a request to QueryServerAllowedToSeeEvent // QueryServerAllowedToSeeEventRequest is a request to QueryServerAllowedToSeeEvent

View file

@ -255,19 +255,24 @@ func (r *Queryer) QueryServerJoinedToRoom(
return fmt.Errorf("r.DB.Events: %w", err) return fmt.Errorf("r.DB.Events: %w", err)
} }
servers := map[gomatrixserverlib.ServerName]struct{}{}
for _, e := range events { for _, e := range events {
if e.Type() == gomatrixserverlib.MRoomMember && e.StateKey() != nil { if e.Type() == gomatrixserverlib.MRoomMember && e.StateKey() != nil {
_, serverName, err := gomatrixserverlib.SplitID('@', *e.StateKey()) _, serverName, err := gomatrixserverlib.SplitID('@', *e.StateKey())
if err != nil { if err != nil {
continue continue
} }
servers[serverName] = struct{}{}
if serverName == request.ServerName { if serverName == request.ServerName {
response.IsInRoom = true response.IsInRoom = true
break
} }
} }
} }
for server := range servers {
response.ServerNames = append(response.ServerNames, server)
}
return nil return nil
} }