This commit is contained in:
Kegan Dougal 2020-03-19 19:17:27 +00:00
parent 20cdc66879
commit bd3788f6c0
4 changed files with 60 additions and 36 deletions

View file

@ -28,6 +28,7 @@ import (
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
)
type messagesReq struct {
@ -150,6 +151,14 @@ func OnIncomingMessagesRequest(
util.GetLogger(req.Context()).WithError(err).Error("mreq.retrieveEvents failed")
return jsonerror.InternalServerError()
}
util.GetLogger(req.Context()).WithFields(logrus.Fields{
"from": from.String(),
"to": to.String(),
"limit": limit,
"backwards": backwardOrdering,
"return_start": start.String(),
"return_end": end.String(),
}).Info("Responding")
// Respond with the events.
return util.JSONResponse{
@ -179,6 +188,11 @@ func (r *messagesReq) retrieveEvents() (
err = fmt.Errorf("GetEventsInRange: %w", err)
return
}
for i := range streamEvents {
s := streamEvents[i]
util.GetLogger(r.ctx).Info("spos=", s.StreamPosition, " content=", string(s.Content()))
}
util.GetLogger(r.ctx).Info("Found ", len(streamEvents), " in the database")
var events []gomatrixserverlib.HeaderedEvent
@ -301,8 +315,9 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent
events []gomatrixserverlib.HeaderedEvent, err error,
) {
// Check if we have enough events.
isSetLargeEnough := true
if len(streamEvents) < r.limit {
isSetLargeEnough := len(streamEvents) >= r.limit
if !isSetLargeEnough {
// it might be fine we don't have up to 'limit' events, let's find out
if r.backwardOrdering {
if r.wasToProvided {
// The condition in the SQL query is a strict "greater than" so
@ -422,8 +437,8 @@ func (r *messagesReq) backfill(fromEventIDs []string, limit int) ([]gomatrixserv
if len(serversResponse.Servers) > 1 {
srvToBackfillFrom = serversResponse.Servers[1]
} else {
srvToBackfillFrom = gomatrixserverlib.ServerName("")
util.GetLogger(r.ctx).Info("Not enough servers to backfill from")
return nil, nil
}
}
@ -431,7 +446,7 @@ func (r *messagesReq) backfill(fromEventIDs []string, limit int) ([]gomatrixserv
// If the roomserver responded with at least one server that isn't us,
// send it a request for backfill.
if len(srvToBackfillFrom) > 0 {
util.GetLogger(r.ctx).WithField("server", srvToBackfillFrom).WithField("limit", limit).Info("Backfilling from server")
txn, err := r.federation.Backfill(
r.ctx, srvToBackfillFrom, r.roomID, limit, fromEventIDs,
)
@ -442,6 +457,7 @@ func (r *messagesReq) backfill(fromEventIDs []string, limit int) ([]gomatrixserv
for _, p := range txn.PDUs {
pdus = append(pdus, p.Headered(gomatrixserverlib.RoomVersionV1))
}
util.GetLogger(r.ctx).WithField("server", srvToBackfillFrom).WithField("new_events", len(pdus)).Info("Storing new events from backfill")
// Store the events in the database, while marking them as unfit to show
// up in responses to sync requests.
@ -458,7 +474,6 @@ func (r *messagesReq) backfill(fromEventIDs []string, limit int) ([]gomatrixserv
return nil, err
}
}
}
return pdus, nil
}
@ -474,7 +489,8 @@ func setToDefault(
roomID string,
) (to *types.PaginationToken, err error) {
if backwardOrdering {
to = types.NewPaginationTokenFromTypeAndPosition(types.PaginationTokenTypeTopology, 1, 0)
// go 1 earlier than the first event so we correctly fetch the earliest event
to = types.NewPaginationTokenFromTypeAndPosition(types.PaginationTokenTypeTopology, 0, 0)
} else {
var pos types.StreamPosition
pos, err = db.MaxTopologicalPosition(ctx, roomID)

View file

@ -23,6 +23,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/util"
"github.com/lib/pq"
"github.com/matrix-org/dendrite/common"
@ -288,6 +289,7 @@ func (s *outputRoomEventsStatements) insertEvent(
txnID,
excludeFromSync,
).Scan(&streamPos)
util.GetLogger(ctx).Info("INSERT EVENT: ", event.RoomID(), " Type:", event.Type(), " pos:", streamPos)
return
}
@ -305,7 +307,7 @@ func (s *outputRoomEventsStatements) selectRecentEvents(
} else {
stmt = common.TxStmt(txn, s.selectRecentEventsStmt)
}
util.GetLogger(ctx).Info("SELECT ", roomID, " from=", fromPos, " to=", toPos, " limit=", limit)
rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, limit)
if err != nil {
return nil, err
@ -315,6 +317,9 @@ func (s *outputRoomEventsStatements) selectRecentEvents(
if err != nil {
return nil, err
}
for _, e := range events {
util.GetLogger(ctx).Info("SELECTED ", e.RoomID(), " Type:", e.Type(), " pos:", e.StreamPosition)
}
if chronologicalOrder {
// The events need to be returned from oldest to latest, which isn't
// necessary the way the SQL query returns them, so a sort is necessary to

View file

@ -26,6 +26,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/util"
// Import the postgres database driver.
_ "github.com/lib/pq"
@ -296,6 +297,7 @@ func (d *SyncServerDatasource) GetEventsInRange(
); err != nil {
return
}
util.GetLogger(ctx).Info("selectRecentEvents from token: ", from.PDUPosition)
} else {
// When using forward ordering, we want the least recent events first.
if events, err = d.events.selectEarlyEvents(

View file

@ -47,7 +47,6 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
var syncData *types.Response
// Extract values from request
logger := util.GetLogger(req.Context())
userID := device.UserID
syncReq, err := newSyncRequest(req, *device)
if err != nil {
@ -56,20 +55,21 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
JSON: jsonerror.Unknown(err.Error()),
}
}
logger.WithFields(log.Fields{
logger := util.GetLogger(req.Context()).WithFields(log.Fields{
"userID": userID,
"since": syncReq.since,
"timeout": syncReq.timeout,
}).Info("Incoming /sync request")
})
currPos := rp.notifier.CurrentPosition()
if shouldReturnImmediately(syncReq) {
syncData, err = rp.currentSyncForUser(*syncReq, currPos)
if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("rp.currentSyncForUser failed")
logger.WithError(err).Error("rp.currentSyncForUser failed")
return jsonerror.InternalServerError()
}
logger.WithField("next", syncData.NextBatch).Info("Responding immediately")
return util.JSONResponse{
Code: http.StatusOK,
JSON: syncData,
@ -107,7 +107,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
hasTimedOut = true
// Or for the request to be cancelled
case <-req.Context().Done():
util.GetLogger(req.Context()).WithError(err).Error("request cancelled")
logger.WithError(err).Error("request cancelled")
return jsonerror.InternalServerError()
}
@ -118,11 +118,12 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
syncData, err = rp.currentSyncForUser(*syncReq, currPos)
if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("rp.currentSyncForUser failed")
logger.WithError(err).Error("rp.currentSyncForUser failed")
return jsonerror.InternalServerError()
}
if !syncData.IsEmpty() || hasTimedOut {
logger.WithField("next", syncData.NextBatch).WithField("timed_out", hasTimedOut).Info("Responding")
return util.JSONResponse{
Code: http.StatusOK,
JSON: syncData,