From f09c1d8b580fd895066e21e57cf1f88df7931fdd Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 22 Jan 2020 10:34:01 +0000 Subject: [PATCH] Some updates based on review comments from @babolivier --- roomserver/api/query.go | 8 +- roomserver/query/query.go | 2 +- syncapi/routing/messages.go | 10 ++- .../postgres/output_room_events_table.go | 54 ++++++------ syncapi/storage/postgres/syncserver.go | 85 +++++++++++-------- syncapi/types/types.go | 37 ++++---- 6 files changed, 105 insertions(+), 91 deletions(-) diff --git a/roomserver/api/query.go b/roomserver/api/query.go index a990cd0b4..2084a343e 100644 --- a/roomserver/api/query.go +++ b/roomserver/api/query.go @@ -352,11 +352,11 @@ const RoomserverQueryMissingEventsPath = "/api/roomserver/queryMissingEvents" // RoomserverQueryStateAndAuthChainPath is the HTTP path for the QueryStateAndAuthChain API const RoomserverQueryStateAndAuthChainPath = "/api/roomserver/queryStateAndAuthChain" -// RoomserverQueryBackfillPath is the HTTP path for the QueryMissingEvents API +// RoomserverQueryBackfillPath is the HTTP path for the QueryBackfillPath API const RoomserverQueryBackfillPath = "/api/roomserver/queryBackfill" -// RoomserverQueryServersInRoomAtEvent is the HTTP path for the QueryServersInRoomAtEvent API -const RoomserverQueryServersInRoomAtEvent = "/api/roomserver/queryServersInRoomAtEvents" +// RoomserverQueryServersInRoomAtEventPath is the HTTP path for the QueryServersInRoomAtEvent API +const RoomserverQueryServersInRoomAtEventPath = "/api/roomserver/queryServersInRoomAtEvents" // NewRoomserverQueryAPIHTTP creates a RoomserverQueryAPI implemented by talking to a HTTP POST API. // If httpClient is nil then it uses the http.DefaultClient @@ -511,6 +511,6 @@ func (h *httpRoomserverQueryAPI) QueryServersInRoomAtEvent( span, ctx := opentracing.StartSpanFromContext(ctx, "QueryBackfill") defer span.Finish() - apiURL := h.roomserverURL + RoomserverQueryServersInRoomAtEvent + apiURL := h.roomserverURL + RoomserverQueryServersInRoomAtEventPath return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response) } diff --git a/roomserver/query/query.go b/roomserver/query/query.go index 9dbf3a44b..da8fe23e5 100644 --- a/roomserver/query/query.go +++ b/roomserver/query/query.go @@ -839,7 +839,7 @@ func (r *RoomserverQueryAPI) SetupHTTP(servMux *http.ServeMux) { }), ) servMux.Handle( - api.RoomserverQueryServersInRoomAtEvent, + api.RoomserverQueryServersInRoomAtEventPath, common.MakeInternalAPI("QueryServersInRoomAtEvent", func(req *http.Request) util.JSONResponse { var request api.QueryServersInRoomAtEventRequest var response api.QueryServersInRoomAtEventResponse diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go index acdda900c..26f48ca47 100644 --- a/syncapi/routing/messages.go +++ b/syncapi/routing/messages.go @@ -55,7 +55,7 @@ const defaultMessagesLimit = 10 // OnIncomingMessagesRequest implements the /messages endpoint from the // client-server API. -// See: https://matrix.org/docs/spec/client_server/r0.4.0.html#get-matrix-client-r0-rooms-roomid-messages +// See: https://matrix.org/docs/spec/client_server/latest.html#get-matrix-client-r0-rooms-roomid-messages func OnIncomingMessagesRequest( req *http.Request, db storage.Database, roomID string, federation *gomatrixserverlib.FederationClient, @@ -303,10 +303,12 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent if r.wasToProvided { // The condition in the SQL query is a strict "greater than" so // we need to check against to-1. - isSetLargeEnough = (r.to.PDUPosition-1 == types.StreamPosition(streamEvents[len(streamEvents)-1].StreamPosition)) + streamPos := types.StreamPosition(streamEvents[len(streamEvents)-1].StreamPosition) + isSetLargeEnough = (r.to.PDUPosition-1 == streamPos) } } else { - isSetLargeEnough = (r.from.PDUPosition-1 == types.StreamPosition(streamEvents[0].StreamPosition)) + streamPos := types.StreamPosition(streamEvents[0].StreamPosition) + isSetLargeEnough = (r.from.PDUPosition-1 == streamPos) } } @@ -381,7 +383,7 @@ func (r *messagesReq) containsBackwardExtremity(events []types.StreamEvent) (boo // backfill performs a backfill request over the federation on another // homeserver in the room. -// See: https://matrix.org/docs/spec/server_server/unstable.html#get-matrix-federation-v1-backfill-roomid +// See: https://matrix.org/docs/spec/server_server/latest#get-matrix-federation-v1-backfill-roomid // It also stores the PDUs retrieved from the remote homeserver's response to // the database. // Returns with an empty string if the remote homeserver didn't return with any diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go index 01af179ec..0ad392bde 100644 --- a/syncapi/storage/postgres/output_room_events_table.go +++ b/syncapi/storage/postgres/output_room_events_table.go @@ -37,33 +37,35 @@ CREATE SEQUENCE IF NOT EXISTS syncapi_stream_id; -- Stores output room events received from the roomserver. CREATE TABLE IF NOT EXISTS syncapi_output_room_events ( - -- An incrementing ID which denotes the position in the log that this event resides at. - -- NB: 'serial' makes no guarantees to increment by 1 every time, only that it increments. - -- This isn't a problem for us since we just want to order by this field. - id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'), - -- The event ID for the event - event_id TEXT NOT NULL, - -- The 'room_id' key for the event. - room_id TEXT NOT NULL, - -- The JSON for the event. Stored as TEXT because this should be valid UTF-8. - event_json TEXT NOT NULL, - -- The event type e.g 'm.room.member'. - type TEXT NOT NULL, - -- The 'sender' property of the event. - sender TEXT NOT NULL, - -- true if the event content contains a url key. - contains_url BOOL NOT NULL, - -- A list of event IDs which represent a delta of added/removed room state. This can be NULL - -- if there is no delta. - add_state_ids TEXT[], - remove_state_ids TEXT[], - session_id BIGINT, -- The client session that sent the event, if any - transaction_id TEXT, -- The transaction id used to send the event, if any + -- An incrementing ID which denotes the position in the log that this event resides at. + -- NB: 'serial' makes no guarantees to increment by 1 every time, only that it increments. + -- This isn't a problem for us since we just want to order by this field. + id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'), + -- The event ID for the event + event_id TEXT NOT NULL, + -- The 'room_id' key for the event. + room_id TEXT NOT NULL, + -- The JSON for the event. Stored as TEXT because this should be valid UTF-8. + event_json TEXT NOT NULL, + -- The event type e.g 'm.room.member'. + type TEXT NOT NULL, + -- The 'sender' property of the event. + sender TEXT NOT NULL, + -- true if the event content contains a url key. + contains_url BOOL NOT NULL, + -- A list of event IDs which represent a delta of added/removed room state. This can be NULL + -- if there is no delta. + add_state_ids TEXT[], + remove_state_ids TEXT[], + -- The client session that sent the event, if any + session_id BIGINT, + -- The transaction id used to send the event, if any + transaction_id TEXT, -- Should the event be excluded from responses to /sync requests. Useful for - -- events retrieved through backfilling that have a position in the stream - -- that relates to the moment these were retrieved rather than the moment these - -- were emitted. - exclude_from_sync BOOL DEFAULT FALSE + -- events retrieved through backfilling that have a position in the stream + -- that relates to the moment these were retrieved rather than the moment these + -- were emitted. + exclude_from_sync BOOL DEFAULT FALSE ); -- for event selection CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_output_room_events(event_id); diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go index 3bfb4592c..c1135bf90 100644 --- a/syncapi/storage/postgres/syncserver.go +++ b/syncapi/storage/postgres/syncserver.go @@ -113,6 +113,45 @@ func (d *SyncServerDatasource) Events(ctx context.Context, eventIDs []string) ([ return d.StreamEventsToEvents(nil, streamEvents), nil } +func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, ev *gomatrixserverlib.Event) error { + // If the event is already known as a backward extremity, don't consider + // it as such anymore now that we have it. + isBackwardExtremity, err := d.backwardExtremities.isBackwardExtremity(ctx, ev.RoomID(), ev.EventID()) + if err != nil { + return err + } + if isBackwardExtremity { + if err = d.backwardExtremities.deleteBackwardExtremity(ctx, ev.RoomID(), ev.EventID()); err != nil { + return err + } + } + + // Check if we have all of the event's previous events. If an event is + // missing, add it to the room's backward extremities. + prevEvents, err := d.events.selectEvents(ctx, nil, ev.PrevEventIDs()) + if err != nil { + return err + } + var found bool + for _, eID := range ev.PrevEventIDs() { + found = false + for _, prevEv := range prevEvents { + if eID == prevEv.EventID() { + found = true + } + } + + // If the event is missing, consider it a backward extremity. + if !found { + if err = d.backwardExtremities.insertsBackwardExtremity(ctx, ev.RoomID(), ev.EventID()); err != nil { + return err + } + } + } + + return nil +} + // WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races // when generating the sync stream position for this event. Returns the sync stream position for the inserted event. // Returns an error if there was a problem inserting this event. @@ -134,45 +173,14 @@ func (d *SyncServerDatasource) WriteEvent( } pduPosition = pos + if err := d.handleBackwardExtremities(ctx, ev); err != nil { + return err + } + if err = d.topology.insertEventInTopology(ctx, ev); err != nil { return err } - // If the event is already known as a backward extremity, don't consider - // it as such anymore now that we have it. - isBackwardExtremity, err := d.backwardExtremities.isBackwardExtremity(ctx, ev.RoomID(), ev.EventID()) - if err != nil { - return err - } - if isBackwardExtremity { - if err = d.backwardExtremities.deleteBackwardExtremity(ctx, ev.RoomID(), ev.EventID()); err != nil { - return err - } - } - - // Check if we have all of the event's previous events. If an event is - // missing, add it to the room's backward extremities. - prevEvents, err := d.events.selectEvents(ctx, nil, ev.PrevEventIDs()) - if err != nil { - return err - } - var found bool - for _, eID := range ev.PrevEventIDs() { - found = false - for _, prevEv := range prevEvents { - if eID == prevEv.EventID() { - found = true - } - } - - // If the event is missing, consider it a backward extremity. - if !found { - if err = d.backwardExtremities.insertsBackwardExtremity(ctx, ev.RoomID(), ev.EventID()); err != nil { - return err - } - } - } - if len(addStateEvents) == 0 && len(removeStateEventIDs) == 0 { // Nothing to do, the event may have just been a message event. return nil @@ -996,9 +1004,12 @@ func (d *SyncServerDatasource) getStateDeltas( return nil, nil, err } /* - s = make([]StreamEvent, len(allState)) + s = make([]types.StreamEvent, len(allState)) for i := 0; i < len(s); i++ { - s[i] = StreamEvent{Event: allState[i], StreamPosition: types.StreamPosition(0)} + s[i] = types.StreamEvent{ + Event: allState[i], + StreamPosition: types.StreamPosition(0), + } } */ state[roomID] = s diff --git a/syncapi/types/types.go b/syncapi/types/types.go index b596aedfe..e04cdd4cb 100644 --- a/syncapi/types/types.go +++ b/syncapi/types/types.go @@ -70,18 +70,17 @@ type PaginationToken struct { // error if the token couldn't be parsed into an int64, or if the token type // isn't a known type (returns ErrInvalidPaginationTokenType in the latter // case). -func NewPaginationTokenFromString(s string) (p *PaginationToken, err error) { - p = new(PaginationToken) +func NewPaginationTokenFromString(s string) (token *PaginationToken, err error) { + token = new(PaginationToken) // Check if the type is among the known ones. - p.Type = PaginationTokenType(s[:1]) - if p.Type != PaginationTokenTypeStream && p.Type != PaginationTokenTypeTopology { + token.Type = PaginationTokenType(s[:1]) + if token.Type != PaginationTokenTypeStream && token.Type != PaginationTokenTypeTopology { if pduPos, perr := strconv.ParseInt(s, 10, 64); perr != nil { return nil, ErrInvalidPaginationTokenType } else { - // TODO: should this be topograpical? - p.Type = PaginationTokenTypeTopology - p.PDUPosition = StreamPosition(pduPos) + token.Type = PaginationTokenTypeStream + token.PDUPosition = StreamPosition(pduPos) return } } @@ -94,7 +93,7 @@ func NewPaginationTokenFromString(s string) (p *PaginationToken, err error) { if pduPos, err := strconv.ParseInt(positions[0], 10, 64); err != nil { return nil, err } else { - p.PDUPosition = StreamPosition(pduPos) + token.PDUPosition = StreamPosition(pduPos) } } @@ -103,7 +102,7 @@ func NewPaginationTokenFromString(s string) (p *PaginationToken, err error) { if typPos, err := strconv.ParseInt(positions[1], 10, 64); err != nil { return nil, err } else { - p.EDUTypingPosition = StreamPosition(typPos) + token.EDUTypingPosition = StreamPosition(typPos) } } @@ -128,11 +127,11 @@ func (p *PaginationToken) String() string { return fmt.Sprintf("%s%d_%d", p.Type, p.PDUPosition, p.EDUTypingPosition) } -// WithUpdates returns a copy of the SyncPosition with updates applied from another SyncPosition. -// If the latter SyncPosition contains a field that is not 0, it is considered an update, -// and its value will replace the corresponding value in the SyncPosition on which WithUpdates is called. -func (sp *PaginationToken) WithUpdates(other PaginationToken) PaginationToken { - ret := *sp +// WithUpdates returns a copy of the PaginationToken with updates applied from another PaginationToken. +// If the latter PaginationToken contains a field that is not 0, it is considered an update, +// and its value will replace the corresponding value in the PaginationToken on which WithUpdates is called. +func (pt *PaginationToken) WithUpdates(other PaginationToken) PaginationToken { + ret := *pt if other.PDUPosition != 0 { ret.PDUPosition = other.PDUPosition } @@ -142,7 +141,7 @@ func (sp *PaginationToken) WithUpdates(other PaginationToken) PaginationToken { return ret } -// IsAfter returns whether one SyncPosition refers to states newer than another SyncPosition. +// IsAfter returns whether one PaginationToken refers to states newer than another PaginationToken. func (sp *PaginationToken) IsAfter(other PaginationToken) bool { return sp.PDUPosition > other.PDUPosition || sp.EDUTypingPosition > other.EDUTypingPosition @@ -172,9 +171,9 @@ type Response struct { } // NewResponse creates an empty response with initialised maps. -func NewResponse(pos PaginationToken) *Response { +func NewResponse(token PaginationToken) *Response { res := Response{ - NextBatch: pos.String(), + NextBatch: token.String(), } // Pre-initialise the maps. Synapse will return {} even if there are no rooms under a specific section, // so let's do the same thing. Bonus: this means we can't get dreaded 'assignment to entry in nil map' errors. @@ -193,8 +192,8 @@ func NewResponse(pos PaginationToken) *Response { // we'll always return a stream token. res.NextBatch = NewPaginationTokenFromTypeAndPosition( PaginationTokenTypeStream, - StreamPosition(pos.PDUPosition), - StreamPosition(pos.EDUTypingPosition), + StreamPosition(token.PDUPosition), + StreamPosition(token.EDUTypingPosition), ).String() return &res