diff --git a/src/github.com/matrix-org/dendrite/syncapi/routing/messages.go b/src/github.com/matrix-org/dendrite/syncapi/routing/messages.go new file mode 100644 index 000000000..68675e16b --- /dev/null +++ b/src/github.com/matrix-org/dendrite/syncapi/routing/messages.go @@ -0,0 +1,170 @@ +// Copyright 2018 New Vector Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package routing + +import ( + // "encoding/json" + "net/http" + "strconv" + + // "github.com/matrix-org/dendrite/clientapi/httputil" + "github.com/matrix-org/dendrite/clientapi/jsonerror" + "github.com/matrix-org/dendrite/syncapi/storage" + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" + log "github.com/sirupsen/logrus" +) + +type messageResp struct { + Start string `json:"start"` + End string `json:"end"` + Chunk []gomatrixserverlib.ClientEvent `json:"chunk"` +} + +const defaultMessagesLimit = 10 + +func OnIncomingMessagesRequest(req *http.Request, db *storage.SyncServerDatabase, roomID string) util.JSONResponse { + var from, to int + var err error + // Extract parameters from the request's URL. + // Pagination tokens. + from, err = strconv.Atoi(req.URL.Query().Get("from")) + if err != nil { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.InvalidArgumentValue("from could not be parsed into an integer: " + err.Error()), + } + } + fromPos := types.StreamPosition(from) + + // Direction to return events from. + dir := req.URL.Query().Get("dir") + if dir != "b" && dir != "f" { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.MissingArgument("Bad or missing dir query parameter (should be either 'b' or 'f')"), + } + } + backwardOrdering := (dir == "b") + + toStr := req.URL.Query().Get("to") + var toPos types.StreamPosition + if len(toStr) > 0 { + to, err = strconv.Atoi(toStr) + if err != nil { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.InvalidArgumentValue("to could not be parsed into an integer: " + err.Error()), + } + } + toPos = types.StreamPosition(to) + } else { + if backwardOrdering { + toPos = types.StreamPosition(0) + } else { + toPos, err = db.SyncStreamPosition(req.Context()) + if err != nil { + return jsonerror.InternalServerError() + } + } + } + + // Maximum number of events to return; defaults to 10. + limit := defaultMessagesLimit + if len(req.URL.Query().Get("limit")) > 0 { + limit, err = strconv.Atoi(req.URL.Query().Get("limit")) + + if err != nil { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.InvalidArgumentValue("limit could not be parsed into an integer: " + err.Error()), + } + } + } + // TODO: Implement filtering (#587) + + // Check the room ID's format. + if _, _, err = gomatrixserverlib.SplitID('!', roomID); err != nil { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.MissingArgument("Bad room ID: " + err.Error()), + } + } + + streamEvents, err := db.GetEventsInRange( + req.Context(), fromPos, toPos, roomID, limit, backwardOrdering, + ) + if err != nil { + return jsonerror.InternalServerError() + } + + // Check if we don't have enough events, i.e. len(sev) < limit and the events + isSetLargeEnough := true + if len(streamEvents) < limit { + if backwardOrdering { + if len(toStr) > 0 { + // The condition in the SQL query is a strict "greater than" so + // we need to check against to-1. + isSetLargeEnough = (toPos-1 == streamEvents[0].StreamPosition) + } + } else { + // We need all events from < streamPos < to + isSetLargeEnough = (fromPos-1 == streamEvents[0].StreamPosition) + } + } + // Check if earliest event is a backward extremity, i.e. if one of its + // previous events is missing from the db. + prevIDs := streamEvents[0].PrevEventIDs() + prevs, err := db.Events(req.Context(), prevIDs) + var eventInDB, isBackwardExtremity bool + var id string + for _, id = range prevIDs { + eventInDB = false + for _, ev := range prevs { + if ev.EventID() == id { + eventInDB = true + } + } + if !eventInDB { + isBackwardExtremity = true + break + } + } + + if isBackwardExtremity && !isSetLargeEnough { + log.WithFields(log.Fields{ + "limit": limit, + "nb_events": len(streamEvents), + "from": fromPos.String(), + "to": toPos.String(), + "isBackwardExtremity": isBackwardExtremity, + "isSetLargeEnough": isSetLargeEnough, + }).Info("Backfilling!") + println("Backfilling!") + } + + events := storage.StreamEventsToEvents(nil, streamEvents) + clientEvents := gomatrixserverlib.ToClientEvents(events, gomatrixserverlib.FormatAll) + + return util.JSONResponse{ + Code: http.StatusOK, + JSON: messageResp{ + Chunk: clientEvents, + Start: streamEvents[0].StreamPosition.String(), + End: streamEvents[len(streamEvents)-1].StreamPosition.String(), + }, + } +} diff --git a/src/github.com/matrix-org/dendrite/syncapi/routing/routing.go b/src/github.com/matrix-org/dendrite/syncapi/routing/routing.go index 0671eca8e..5a47de73c 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/routing/routing.go +++ b/src/github.com/matrix-org/dendrite/syncapi/routing/routing.go @@ -54,4 +54,9 @@ func Setup(apiMux *mux.Router, srp *sync.RequestPool, syncDB *storage.SyncServer vars := mux.Vars(req) return OnIncomingStateTypeRequest(req, syncDB, vars["roomID"], vars["type"], vars["stateKey"]) })).Methods(http.MethodGet, http.MethodOptions) + + r0mux.Handle("/rooms/{roomID}/messages", common.MakeAuthAPI("room_messages", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse { + vars := mux.Vars(req) + return OnIncomingMessagesRequest(req, syncDB, vars["roomID"]) + })).Methods(http.MethodGet, http.MethodOptions) } diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go index 686fc39e5..45952b727 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go @@ -68,6 +68,11 @@ const selectRecentEventsSQL = "" + " WHERE room_id = $1 AND id > $2 AND id <= $3" + " ORDER BY id DESC LIMIT $4" +const selectEarlyEventsSQL = "" + + "SELECT id, event_json, device_id, transaction_id FROM syncapi_output_room_events" + + " WHERE room_id = $1 AND id > $2 AND id <= $3" + + " ORDER BY id ASC LIMIT $4" + const selectMaxEventIDSQL = "" + "SELECT MAX(id) FROM syncapi_output_room_events" @@ -83,6 +88,7 @@ type outputRoomEventsStatements struct { selectEventsStmt *sql.Stmt selectMaxEventIDStmt *sql.Stmt selectRecentEventsStmt *sql.Stmt + selectEarlyEventsStmt *sql.Stmt selectStateInRangeStmt *sql.Stmt } @@ -103,6 +109,9 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) { if s.selectRecentEventsStmt, err = db.Prepare(selectRecentEventsSQL); err != nil { return } + if s.selectEarlyEventsStmt, err = db.Prepare(selectEarlyEventsSQL); err != nil { + return + } if s.selectStateInRangeStmt, err = db.Prepare(selectStateInRangeSQL); err != nil { return } @@ -171,7 +180,7 @@ func (s *outputRoomEventsStatements) selectStateInRange( eventIDToEvent[ev.EventID()] = StreamEvent{ Event: ev, - streamPosition: types.StreamPosition(streamPos), + StreamPosition: types.StreamPosition(streamPos), } } @@ -224,6 +233,7 @@ func (s *outputRoomEventsStatements) insertEvent( func (s *outputRoomEventsStatements) selectRecentEvents( ctx context.Context, txn *sql.Tx, roomID string, fromPos, toPos types.StreamPosition, limit int, + chronologicalOrder bool, ) ([]StreamEvent, error) { stmt := common.TxStmt(txn, s.selectRecentEventsStmt) rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, limit) @@ -235,12 +245,33 @@ func (s *outputRoomEventsStatements) selectRecentEvents( if err != nil { return nil, err } - // The events need to be returned from oldest to latest, which isn't - // necessary the way the SQL query returns them, so a sort is necessary to - // ensure the events are in the right order in the slice. - sort.SliceStable(events, func(i int, j int) bool { - return events[i].streamPosition < events[j].streamPosition - }) + if chronologicalOrder { + // The events need to be returned from oldest to latest, which isn't + // necessary the way the SQL query returns them, so a sort is necessary to + // ensure the events are in the right order in the slice. + sort.SliceStable(events, func(i int, j int) bool { + return events[i].StreamPosition < events[j].StreamPosition + }) + } + return events, nil +} + +// selectEarlyEvents returns the earliest events in the given room, starting +// from a given position, up to a maximum of 'limit'. +func (s *outputRoomEventsStatements) selectEarlyEvents( + ctx context.Context, txn *sql.Tx, + roomID string, fromPos, toPos types.StreamPosition, limit int, +) ([]StreamEvent, error) { + stmt := common.TxStmt(txn, s.selectEarlyEventsStmt) + rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, limit) + if err != nil { + return nil, err + } + defer rows.Close() // nolint: errcheck + events, err := rowsToStreamEvents(rows) + if err != nil { + return nil, err + } return events, nil } @@ -286,8 +317,8 @@ func rowsToStreamEvents(rows *sql.Rows) ([]StreamEvent, error) { result = append(result, StreamEvent{ Event: ev, - streamPosition: types.StreamPosition(streamPos), - transactionID: transactionID, + StreamPosition: types.StreamPosition(streamPos), + TransactionID: transactionID, }) } return result, nil diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go index 4476da81d..d2fbf3d6b 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go @@ -43,8 +43,8 @@ type stateDelta struct { // position for this event. type StreamEvent struct { gomatrixserverlib.Event - streamPosition types.StreamPosition - transactionID *api.TransactionID + StreamPosition types.StreamPosition + TransactionID *api.TransactionID } // SyncServerDatabase represents a sync server database @@ -100,7 +100,7 @@ func (d *SyncServerDatabase) Events(ctx context.Context, eventIDs []string) ([]g // We don't include a device here as we only include transaction IDs in // incremental syncs. - return streamEventsToEvents(nil, streamEvents), nil + return StreamEventsToEvents(nil, streamEvents), nil } // WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races @@ -187,6 +187,24 @@ func (d *SyncServerDatabase) GetStateEventsForRoom( return } +// GetEventsInRange retrieves all of the events on a given ordering using the +// given extremities and limit. +func (d *SyncServerDatabase) GetEventsInRange( + ctx context.Context, + from, to types.StreamPosition, + roomID string, limit int, + backwardOrdering bool, +) (events []StreamEvent, err error) { + + if backwardOrdering { + // We need all events matching to < streamPos < from + return d.events.selectRecentEvents(ctx, nil, roomID, to, from, limit, false) + } + + // We need all events from < streamPos < to + return d.events.selectEarlyEvents(ctx, nil, roomID, from, to, limit) +} + // SyncStreamPosition returns the latest position in the sync stream. Returns 0 if there are no events yet. func (d *SyncServerDatabase) SyncStreamPosition(ctx context.Context) (types.StreamPosition, error) { return d.syncStreamPositionTx(ctx, nil) @@ -299,7 +317,8 @@ func (d *SyncServerDatabase) CompleteSync( // See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316 var recentStreamEvents []StreamEvent recentStreamEvents, err = d.events.selectRecentEvents( - ctx, txn, roomID, types.StreamPosition(0), pos, numRecentEventsPerRoom, + ctx, txn, roomID, types.StreamPosition(0), pos, + numRecentEventsPerRoom, true, ) if err != nil { return nil, err @@ -307,8 +326,7 @@ func (d *SyncServerDatabase) CompleteSync( // We don't include a device here as we don't need to send down // transaction IDs for complete syncs - recentEvents := streamEventsToEvents(nil, recentStreamEvents) - + recentEvents := StreamEventsToEvents(nil, recentStreamEvents) stateEvents = removeDuplicates(stateEvents, recentEvents) jr := types.NewJoinResponse() jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) @@ -424,12 +442,12 @@ func (d *SyncServerDatabase) addRoomDeltaToResponse( endPos = delta.membershipPos } recentStreamEvents, err := d.events.selectRecentEvents( - ctx, txn, delta.roomID, fromPos, endPos, numRecentEventsPerRoom, + ctx, txn, delta.roomID, fromPos, endPos, numRecentEventsPerRoom, true, ) if err != nil { return err } - recentEvents := streamEventsToEvents(device, recentStreamEvents) + recentEvents := StreamEventsToEvents(device, recentStreamEvents) delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back // Don't bother appending empty room entries @@ -586,7 +604,7 @@ func (d *SyncServerDatabase) getStateDeltas( } s := make([]StreamEvent, len(allState)) for i := 0; i < len(s); i++ { - s[i] = StreamEvent{Event: allState[i], streamPosition: types.StreamPosition(0)} + s[i] = StreamEvent{Event: allState[i], StreamPosition: types.StreamPosition(0)} } state[roomID] = s continue // we'll add this room in when we do joined rooms @@ -594,8 +612,8 @@ func (d *SyncServerDatabase) getStateDeltas( deltas = append(deltas, stateDelta{ membership: membership, - membershipPos: ev.streamPosition, - stateEvents: streamEventsToEvents(device, stateStreamEvents), + membershipPos: ev.StreamPosition, + stateEvents: StreamEventsToEvents(device, stateStreamEvents), roomID: roomID, }) break @@ -611,7 +629,7 @@ func (d *SyncServerDatabase) getStateDeltas( for _, joinedRoomID := range joinedRoomIDs { deltas = append(deltas, stateDelta{ membership: "join", - stateEvents: streamEventsToEvents(device, state[joinedRoomID]), + stateEvents: StreamEventsToEvents(device, state[joinedRoomID]), roomID: joinedRoomID, }) } @@ -619,17 +637,17 @@ func (d *SyncServerDatabase) getStateDeltas( return deltas, nil } -// streamEventsToEvents converts StreamEvent to Event. If device is non-nil and +// StreamEventsToEvents converts StreamEvent to Event. If device is non-nil and // matches the streamevent.transactionID device then the transaction ID gets // added to the unsigned section of the output event. -func streamEventsToEvents(device *authtypes.Device, in []StreamEvent) []gomatrixserverlib.Event { +func StreamEventsToEvents(device *authtypes.Device, in []StreamEvent) []gomatrixserverlib.Event { out := make([]gomatrixserverlib.Event, len(in)) for i := 0; i < len(in); i++ { out[i] = in[i].Event - if device != nil && in[i].transactionID != nil { - if device.UserID == in[i].Sender() && device.ID == in[i].transactionID.DeviceID { + if device != nil && in[i].TransactionID != nil { + if device.UserID == in[i].Sender() && device.ID == in[i].TransactionID.DeviceID { err := out[i].SetUnsignedField( - "transaction_id", in[i].transactionID.TransactionID, + "transaction_id", in[i].TransactionID.TransactionID, ) if err != nil { logrus.WithFields(logrus.Fields{