Some updates based on review comments from @babolivier

This commit is contained in:
Neil Alexander 2020-01-22 10:34:01 +00:00
parent 1366f25de2
commit f09c1d8b58
6 changed files with 105 additions and 91 deletions

View file

@ -352,11 +352,11 @@ const RoomserverQueryMissingEventsPath = "/api/roomserver/queryMissingEvents"
// RoomserverQueryStateAndAuthChainPath is the HTTP path for the QueryStateAndAuthChain API // RoomserverQueryStateAndAuthChainPath is the HTTP path for the QueryStateAndAuthChain API
const RoomserverQueryStateAndAuthChainPath = "/api/roomserver/queryStateAndAuthChain" const RoomserverQueryStateAndAuthChainPath = "/api/roomserver/queryStateAndAuthChain"
// RoomserverQueryBackfillPath is the HTTP path for the QueryMissingEvents API // RoomserverQueryBackfillPath is the HTTP path for the QueryBackfillPath API
const RoomserverQueryBackfillPath = "/api/roomserver/queryBackfill" const RoomserverQueryBackfillPath = "/api/roomserver/queryBackfill"
// RoomserverQueryServersInRoomAtEvent is the HTTP path for the QueryServersInRoomAtEvent API // RoomserverQueryServersInRoomAtEventPath is the HTTP path for the QueryServersInRoomAtEvent API
const RoomserverQueryServersInRoomAtEvent = "/api/roomserver/queryServersInRoomAtEvents" const RoomserverQueryServersInRoomAtEventPath = "/api/roomserver/queryServersInRoomAtEvents"
// NewRoomserverQueryAPIHTTP creates a RoomserverQueryAPI implemented by talking to a HTTP POST API. // NewRoomserverQueryAPIHTTP creates a RoomserverQueryAPI implemented by talking to a HTTP POST API.
// If httpClient is nil then it uses the http.DefaultClient // If httpClient is nil then it uses the http.DefaultClient
@ -511,6 +511,6 @@ func (h *httpRoomserverQueryAPI) QueryServersInRoomAtEvent(
span, ctx := opentracing.StartSpanFromContext(ctx, "QueryBackfill") span, ctx := opentracing.StartSpanFromContext(ctx, "QueryBackfill")
defer span.Finish() defer span.Finish()
apiURL := h.roomserverURL + RoomserverQueryServersInRoomAtEvent apiURL := h.roomserverURL + RoomserverQueryServersInRoomAtEventPath
return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response) return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
} }

View file

@ -839,7 +839,7 @@ func (r *RoomserverQueryAPI) SetupHTTP(servMux *http.ServeMux) {
}), }),
) )
servMux.Handle( servMux.Handle(
api.RoomserverQueryServersInRoomAtEvent, api.RoomserverQueryServersInRoomAtEventPath,
common.MakeInternalAPI("QueryServersInRoomAtEvent", func(req *http.Request) util.JSONResponse { common.MakeInternalAPI("QueryServersInRoomAtEvent", func(req *http.Request) util.JSONResponse {
var request api.QueryServersInRoomAtEventRequest var request api.QueryServersInRoomAtEventRequest
var response api.QueryServersInRoomAtEventResponse var response api.QueryServersInRoomAtEventResponse

View file

@ -55,7 +55,7 @@ const defaultMessagesLimit = 10
// OnIncomingMessagesRequest implements the /messages endpoint from the // OnIncomingMessagesRequest implements the /messages endpoint from the
// client-server API. // client-server API.
// See: https://matrix.org/docs/spec/client_server/r0.4.0.html#get-matrix-client-r0-rooms-roomid-messages // See: https://matrix.org/docs/spec/client_server/latest.html#get-matrix-client-r0-rooms-roomid-messages
func OnIncomingMessagesRequest( func OnIncomingMessagesRequest(
req *http.Request, db storage.Database, roomID string, req *http.Request, db storage.Database, roomID string,
federation *gomatrixserverlib.FederationClient, federation *gomatrixserverlib.FederationClient,
@ -303,10 +303,12 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent
if r.wasToProvided { if r.wasToProvided {
// The condition in the SQL query is a strict "greater than" so // The condition in the SQL query is a strict "greater than" so
// we need to check against to-1. // we need to check against to-1.
isSetLargeEnough = (r.to.PDUPosition-1 == types.StreamPosition(streamEvents[len(streamEvents)-1].StreamPosition)) streamPos := types.StreamPosition(streamEvents[len(streamEvents)-1].StreamPosition)
isSetLargeEnough = (r.to.PDUPosition-1 == streamPos)
} }
} else { } else {
isSetLargeEnough = (r.from.PDUPosition-1 == types.StreamPosition(streamEvents[0].StreamPosition)) streamPos := types.StreamPosition(streamEvents[0].StreamPosition)
isSetLargeEnough = (r.from.PDUPosition-1 == streamPos)
} }
} }
@ -381,7 +383,7 @@ func (r *messagesReq) containsBackwardExtremity(events []types.StreamEvent) (boo
// backfill performs a backfill request over the federation on another // backfill performs a backfill request over the federation on another
// homeserver in the room. // homeserver in the room.
// See: https://matrix.org/docs/spec/server_server/unstable.html#get-matrix-federation-v1-backfill-roomid // See: https://matrix.org/docs/spec/server_server/latest#get-matrix-federation-v1-backfill-roomid
// It also stores the PDUs retrieved from the remote homeserver's response to // It also stores the PDUs retrieved from the remote homeserver's response to
// the database. // the database.
// Returns with an empty string if the remote homeserver didn't return with any // Returns with an empty string if the remote homeserver didn't return with any

View file

@ -37,33 +37,35 @@ CREATE SEQUENCE IF NOT EXISTS syncapi_stream_id;
-- Stores output room events received from the roomserver. -- Stores output room events received from the roomserver.
CREATE TABLE IF NOT EXISTS syncapi_output_room_events ( CREATE TABLE IF NOT EXISTS syncapi_output_room_events (
-- An incrementing ID which denotes the position in the log that this event resides at. -- An incrementing ID which denotes the position in the log that this event resides at.
-- NB: 'serial' makes no guarantees to increment by 1 every time, only that it increments. -- NB: 'serial' makes no guarantees to increment by 1 every time, only that it increments.
-- This isn't a problem for us since we just want to order by this field. -- This isn't a problem for us since we just want to order by this field.
id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'), id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'),
-- The event ID for the event -- The event ID for the event
event_id TEXT NOT NULL, event_id TEXT NOT NULL,
-- The 'room_id' key for the event. -- The 'room_id' key for the event.
room_id TEXT NOT NULL, room_id TEXT NOT NULL,
-- The JSON for the event. Stored as TEXT because this should be valid UTF-8. -- The JSON for the event. Stored as TEXT because this should be valid UTF-8.
event_json TEXT NOT NULL, event_json TEXT NOT NULL,
-- The event type e.g 'm.room.member'. -- The event type e.g 'm.room.member'.
type TEXT NOT NULL, type TEXT NOT NULL,
-- The 'sender' property of the event. -- The 'sender' property of the event.
sender TEXT NOT NULL, sender TEXT NOT NULL,
-- true if the event content contains a url key. -- true if the event content contains a url key.
contains_url BOOL NOT NULL, contains_url BOOL NOT NULL,
-- A list of event IDs which represent a delta of added/removed room state. This can be NULL -- A list of event IDs which represent a delta of added/removed room state. This can be NULL
-- if there is no delta. -- if there is no delta.
add_state_ids TEXT[], add_state_ids TEXT[],
remove_state_ids TEXT[], remove_state_ids TEXT[],
session_id BIGINT, -- The client session that sent the event, if any -- The client session that sent the event, if any
transaction_id TEXT, -- The transaction id used to send the event, if any session_id BIGINT,
-- The transaction id used to send the event, if any
transaction_id TEXT,
-- Should the event be excluded from responses to /sync requests. Useful for -- Should the event be excluded from responses to /sync requests. Useful for
-- events retrieved through backfilling that have a position in the stream -- events retrieved through backfilling that have a position in the stream
-- that relates to the moment these were retrieved rather than the moment these -- that relates to the moment these were retrieved rather than the moment these
-- were emitted. -- were emitted.
exclude_from_sync BOOL DEFAULT FALSE exclude_from_sync BOOL DEFAULT FALSE
); );
-- for event selection -- for event selection
CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_output_room_events(event_id); CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_output_room_events(event_id);

View file

@ -113,6 +113,45 @@ func (d *SyncServerDatasource) Events(ctx context.Context, eventIDs []string) ([
return d.StreamEventsToEvents(nil, streamEvents), nil return d.StreamEventsToEvents(nil, streamEvents), nil
} }
func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, ev *gomatrixserverlib.Event) error {
// If the event is already known as a backward extremity, don't consider
// it as such anymore now that we have it.
isBackwardExtremity, err := d.backwardExtremities.isBackwardExtremity(ctx, ev.RoomID(), ev.EventID())
if err != nil {
return err
}
if isBackwardExtremity {
if err = d.backwardExtremities.deleteBackwardExtremity(ctx, ev.RoomID(), ev.EventID()); err != nil {
return err
}
}
// Check if we have all of the event's previous events. If an event is
// missing, add it to the room's backward extremities.
prevEvents, err := d.events.selectEvents(ctx, nil, ev.PrevEventIDs())
if err != nil {
return err
}
var found bool
for _, eID := range ev.PrevEventIDs() {
found = false
for _, prevEv := range prevEvents {
if eID == prevEv.EventID() {
found = true
}
}
// If the event is missing, consider it a backward extremity.
if !found {
if err = d.backwardExtremities.insertsBackwardExtremity(ctx, ev.RoomID(), ev.EventID()); err != nil {
return err
}
}
}
return nil
}
// WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races // WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races
// when generating the sync stream position for this event. Returns the sync stream position for the inserted event. // when generating the sync stream position for this event. Returns the sync stream position for the inserted event.
// Returns an error if there was a problem inserting this event. // Returns an error if there was a problem inserting this event.
@ -134,45 +173,14 @@ func (d *SyncServerDatasource) WriteEvent(
} }
pduPosition = pos pduPosition = pos
if err := d.handleBackwardExtremities(ctx, ev); err != nil {
return err
}
if err = d.topology.insertEventInTopology(ctx, ev); err != nil { if err = d.topology.insertEventInTopology(ctx, ev); err != nil {
return err return err
} }
// If the event is already known as a backward extremity, don't consider
// it as such anymore now that we have it.
isBackwardExtremity, err := d.backwardExtremities.isBackwardExtremity(ctx, ev.RoomID(), ev.EventID())
if err != nil {
return err
}
if isBackwardExtremity {
if err = d.backwardExtremities.deleteBackwardExtremity(ctx, ev.RoomID(), ev.EventID()); err != nil {
return err
}
}
// Check if we have all of the event's previous events. If an event is
// missing, add it to the room's backward extremities.
prevEvents, err := d.events.selectEvents(ctx, nil, ev.PrevEventIDs())
if err != nil {
return err
}
var found bool
for _, eID := range ev.PrevEventIDs() {
found = false
for _, prevEv := range prevEvents {
if eID == prevEv.EventID() {
found = true
}
}
// If the event is missing, consider it a backward extremity.
if !found {
if err = d.backwardExtremities.insertsBackwardExtremity(ctx, ev.RoomID(), ev.EventID()); err != nil {
return err
}
}
}
if len(addStateEvents) == 0 && len(removeStateEventIDs) == 0 { if len(addStateEvents) == 0 && len(removeStateEventIDs) == 0 {
// Nothing to do, the event may have just been a message event. // Nothing to do, the event may have just been a message event.
return nil return nil
@ -996,9 +1004,12 @@ func (d *SyncServerDatasource) getStateDeltas(
return nil, nil, err return nil, nil, err
} }
/* /*
s = make([]StreamEvent, len(allState)) s = make([]types.StreamEvent, len(allState))
for i := 0; i < len(s); i++ { for i := 0; i < len(s); i++ {
s[i] = StreamEvent{Event: allState[i], StreamPosition: types.StreamPosition(0)} s[i] = types.StreamEvent{
Event: allState[i],
StreamPosition: types.StreamPosition(0),
}
} }
*/ */
state[roomID] = s state[roomID] = s

View file

@ -70,18 +70,17 @@ type PaginationToken struct {
// error if the token couldn't be parsed into an int64, or if the token type // error if the token couldn't be parsed into an int64, or if the token type
// isn't a known type (returns ErrInvalidPaginationTokenType in the latter // isn't a known type (returns ErrInvalidPaginationTokenType in the latter
// case). // case).
func NewPaginationTokenFromString(s string) (p *PaginationToken, err error) { func NewPaginationTokenFromString(s string) (token *PaginationToken, err error) {
p = new(PaginationToken) token = new(PaginationToken)
// Check if the type is among the known ones. // Check if the type is among the known ones.
p.Type = PaginationTokenType(s[:1]) token.Type = PaginationTokenType(s[:1])
if p.Type != PaginationTokenTypeStream && p.Type != PaginationTokenTypeTopology { if token.Type != PaginationTokenTypeStream && token.Type != PaginationTokenTypeTopology {
if pduPos, perr := strconv.ParseInt(s, 10, 64); perr != nil { if pduPos, perr := strconv.ParseInt(s, 10, 64); perr != nil {
return nil, ErrInvalidPaginationTokenType return nil, ErrInvalidPaginationTokenType
} else { } else {
// TODO: should this be topograpical? token.Type = PaginationTokenTypeStream
p.Type = PaginationTokenTypeTopology token.PDUPosition = StreamPosition(pduPos)
p.PDUPosition = StreamPosition(pduPos)
return return
} }
} }
@ -94,7 +93,7 @@ func NewPaginationTokenFromString(s string) (p *PaginationToken, err error) {
if pduPos, err := strconv.ParseInt(positions[0], 10, 64); err != nil { if pduPos, err := strconv.ParseInt(positions[0], 10, 64); err != nil {
return nil, err return nil, err
} else { } else {
p.PDUPosition = StreamPosition(pduPos) token.PDUPosition = StreamPosition(pduPos)
} }
} }
@ -103,7 +102,7 @@ func NewPaginationTokenFromString(s string) (p *PaginationToken, err error) {
if typPos, err := strconv.ParseInt(positions[1], 10, 64); err != nil { if typPos, err := strconv.ParseInt(positions[1], 10, 64); err != nil {
return nil, err return nil, err
} else { } else {
p.EDUTypingPosition = StreamPosition(typPos) token.EDUTypingPosition = StreamPosition(typPos)
} }
} }
@ -128,11 +127,11 @@ func (p *PaginationToken) String() string {
return fmt.Sprintf("%s%d_%d", p.Type, p.PDUPosition, p.EDUTypingPosition) return fmt.Sprintf("%s%d_%d", p.Type, p.PDUPosition, p.EDUTypingPosition)
} }
// WithUpdates returns a copy of the SyncPosition with updates applied from another SyncPosition. // WithUpdates returns a copy of the PaginationToken with updates applied from another PaginationToken.
// If the latter SyncPosition contains a field that is not 0, it is considered an update, // If the latter PaginationToken contains a field that is not 0, it is considered an update,
// and its value will replace the corresponding value in the SyncPosition on which WithUpdates is called. // and its value will replace the corresponding value in the PaginationToken on which WithUpdates is called.
func (sp *PaginationToken) WithUpdates(other PaginationToken) PaginationToken { func (pt *PaginationToken) WithUpdates(other PaginationToken) PaginationToken {
ret := *sp ret := *pt
if other.PDUPosition != 0 { if other.PDUPosition != 0 {
ret.PDUPosition = other.PDUPosition ret.PDUPosition = other.PDUPosition
} }
@ -142,7 +141,7 @@ func (sp *PaginationToken) WithUpdates(other PaginationToken) PaginationToken {
return ret return ret
} }
// IsAfter returns whether one SyncPosition refers to states newer than another SyncPosition. // IsAfter returns whether one PaginationToken refers to states newer than another PaginationToken.
func (sp *PaginationToken) IsAfter(other PaginationToken) bool { func (sp *PaginationToken) IsAfter(other PaginationToken) bool {
return sp.PDUPosition > other.PDUPosition || return sp.PDUPosition > other.PDUPosition ||
sp.EDUTypingPosition > other.EDUTypingPosition sp.EDUTypingPosition > other.EDUTypingPosition
@ -172,9 +171,9 @@ type Response struct {
} }
// NewResponse creates an empty response with initialised maps. // NewResponse creates an empty response with initialised maps.
func NewResponse(pos PaginationToken) *Response { func NewResponse(token PaginationToken) *Response {
res := Response{ res := Response{
NextBatch: pos.String(), NextBatch: token.String(),
} }
// Pre-initialise the maps. Synapse will return {} even if there are no rooms under a specific section, // Pre-initialise the maps. Synapse will return {} even if there are no rooms under a specific section,
// so let's do the same thing. Bonus: this means we can't get dreaded 'assignment to entry in nil map' errors. // so let's do the same thing. Bonus: this means we can't get dreaded 'assignment to entry in nil map' errors.
@ -193,8 +192,8 @@ func NewResponse(pos PaginationToken) *Response {
// we'll always return a stream token. // we'll always return a stream token.
res.NextBatch = NewPaginationTokenFromTypeAndPosition( res.NextBatch = NewPaginationTokenFromTypeAndPosition(
PaginationTokenTypeStream, PaginationTokenTypeStream,
StreamPosition(pos.PDUPosition), StreamPosition(token.PDUPosition),
StreamPosition(pos.EDUTypingPosition), StreamPosition(token.EDUTypingPosition),
).String() ).String()
return &res return &res