From bd3788f6c0daf6b33e90950863add0c5a14913fa Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Thu, 19 Mar 2020 19:17:27 +0000 Subject: [PATCH] Logging --- syncapi/routing/messages.go | 74 +++++++++++-------- .../postgres/output_room_events_table.go | 7 +- syncapi/storage/postgres/syncserver.go | 2 + syncapi/sync/requestpool.go | 13 ++-- 4 files changed, 60 insertions(+), 36 deletions(-) diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go index 2adb7acff..b5ec0eeb3 100644 --- a/syncapi/routing/messages.go +++ b/syncapi/routing/messages.go @@ -28,6 +28,7 @@ import ( "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" + "github.com/sirupsen/logrus" ) type messagesReq struct { @@ -150,6 +151,14 @@ func OnIncomingMessagesRequest( util.GetLogger(req.Context()).WithError(err).Error("mreq.retrieveEvents failed") return jsonerror.InternalServerError() } + util.GetLogger(req.Context()).WithFields(logrus.Fields{ + "from": from.String(), + "to": to.String(), + "limit": limit, + "backwards": backwardOrdering, + "return_start": start.String(), + "return_end": end.String(), + }).Info("Responding") // Respond with the events. return util.JSONResponse{ @@ -179,6 +188,11 @@ func (r *messagesReq) retrieveEvents() ( err = fmt.Errorf("GetEventsInRange: %w", err) return } + for i := range streamEvents { + s := streamEvents[i] + util.GetLogger(r.ctx).Info("spos=", s.StreamPosition, " content=", string(s.Content())) + } + util.GetLogger(r.ctx).Info("Found ", len(streamEvents), " in the database") var events []gomatrixserverlib.HeaderedEvent @@ -301,8 +315,9 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent events []gomatrixserverlib.HeaderedEvent, err error, ) { // Check if we have enough events. - isSetLargeEnough := true - if len(streamEvents) < r.limit { + isSetLargeEnough := len(streamEvents) >= r.limit + if !isSetLargeEnough { + // it might be fine we don't have up to 'limit' events, let's find out if r.backwardOrdering { if r.wasToProvided { // The condition in the SQL query is a strict "greater than" so @@ -422,8 +437,8 @@ func (r *messagesReq) backfill(fromEventIDs []string, limit int) ([]gomatrixserv if len(serversResponse.Servers) > 1 { srvToBackfillFrom = serversResponse.Servers[1] } else { - srvToBackfillFrom = gomatrixserverlib.ServerName("") util.GetLogger(r.ctx).Info("Not enough servers to backfill from") + return nil, nil } } @@ -431,33 +446,33 @@ func (r *messagesReq) backfill(fromEventIDs []string, limit int) ([]gomatrixserv // If the roomserver responded with at least one server that isn't us, // send it a request for backfill. - if len(srvToBackfillFrom) > 0 { - txn, err := r.federation.Backfill( - r.ctx, srvToBackfillFrom, r.roomID, limit, fromEventIDs, - ) - if err != nil { + util.GetLogger(r.ctx).WithField("server", srvToBackfillFrom).WithField("limit", limit).Info("Backfilling from server") + txn, err := r.federation.Backfill( + r.ctx, srvToBackfillFrom, r.roomID, limit, fromEventIDs, + ) + if err != nil { + return nil, err + } + + for _, p := range txn.PDUs { + pdus = append(pdus, p.Headered(gomatrixserverlib.RoomVersionV1)) + } + util.GetLogger(r.ctx).WithField("server", srvToBackfillFrom).WithField("new_events", len(pdus)).Info("Storing new events from backfill") + + // Store the events in the database, while marking them as unfit to show + // up in responses to sync requests. + for _, pdu := range pdus { + headered := pdu.Headered(gomatrixserverlib.RoomVersionV1) + if _, err = r.db.WriteEvent( + r.ctx, + &headered, + []gomatrixserverlib.HeaderedEvent{}, + []string{}, + []string{}, + nil, true, + ); err != nil { return nil, err } - - for _, p := range txn.PDUs { - pdus = append(pdus, p.Headered(gomatrixserverlib.RoomVersionV1)) - } - - // Store the events in the database, while marking them as unfit to show - // up in responses to sync requests. - for _, pdu := range pdus { - headered := pdu.Headered(gomatrixserverlib.RoomVersionV1) - if _, err = r.db.WriteEvent( - r.ctx, - &headered, - []gomatrixserverlib.HeaderedEvent{}, - []string{}, - []string{}, - nil, true, - ); err != nil { - return nil, err - } - } } return pdus, nil @@ -474,7 +489,8 @@ func setToDefault( roomID string, ) (to *types.PaginationToken, err error) { if backwardOrdering { - to = types.NewPaginationTokenFromTypeAndPosition(types.PaginationTokenTypeTopology, 1, 0) + // go 1 earlier than the first event so we correctly fetch the earliest event + to = types.NewPaginationTokenFromTypeAndPosition(types.PaginationTokenTypeTopology, 0, 0) } else { var pos types.StreamPosition pos, err = db.MaxTopologicalPosition(ctx, roomID) diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go index 5f9a1d0c6..1f4bb31ca 100644 --- a/syncapi/storage/postgres/output_room_events_table.go +++ b/syncapi/storage/postgres/output_room_events_table.go @@ -23,6 +23,7 @@ import ( "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/util" "github.com/lib/pq" "github.com/matrix-org/dendrite/common" @@ -288,6 +289,7 @@ func (s *outputRoomEventsStatements) insertEvent( txnID, excludeFromSync, ).Scan(&streamPos) + util.GetLogger(ctx).Info("INSERT EVENT: ", event.RoomID(), " Type:", event.Type(), " pos:", streamPos) return } @@ -305,7 +307,7 @@ func (s *outputRoomEventsStatements) selectRecentEvents( } else { stmt = common.TxStmt(txn, s.selectRecentEventsStmt) } - + util.GetLogger(ctx).Info("SELECT ", roomID, " from=", fromPos, " to=", toPos, " limit=", limit) rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, limit) if err != nil { return nil, err @@ -315,6 +317,9 @@ func (s *outputRoomEventsStatements) selectRecentEvents( if err != nil { return nil, err } + for _, e := range events { + util.GetLogger(ctx).Info("SELECTED ", e.RoomID(), " Type:", e.Type(), " pos:", e.StreamPosition) + } if chronologicalOrder { // The events need to be returned from oldest to latest, which isn't // necessary the way the SQL query returns them, so a sort is necessary to diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go index ccf1c5656..80af06d59 100644 --- a/syncapi/storage/postgres/syncserver.go +++ b/syncapi/storage/postgres/syncserver.go @@ -26,6 +26,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/util" // Import the postgres database driver. _ "github.com/lib/pq" @@ -296,6 +297,7 @@ func (d *SyncServerDatasource) GetEventsInRange( ); err != nil { return } + util.GetLogger(ctx).Info("selectRecentEvents from token: ", from.PDUPosition) } else { // When using forward ordering, we want the least recent events first. if events, err = d.events.selectEarlyEvents( diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index b4ccbd272..69efd8aa8 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -47,7 +47,6 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype var syncData *types.Response // Extract values from request - logger := util.GetLogger(req.Context()) userID := device.UserID syncReq, err := newSyncRequest(req, *device) if err != nil { @@ -56,20 +55,21 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype JSON: jsonerror.Unknown(err.Error()), } } - logger.WithFields(log.Fields{ + logger := util.GetLogger(req.Context()).WithFields(log.Fields{ "userID": userID, "since": syncReq.since, "timeout": syncReq.timeout, - }).Info("Incoming /sync request") + }) currPos := rp.notifier.CurrentPosition() if shouldReturnImmediately(syncReq) { syncData, err = rp.currentSyncForUser(*syncReq, currPos) if err != nil { - util.GetLogger(req.Context()).WithError(err).Error("rp.currentSyncForUser failed") + logger.WithError(err).Error("rp.currentSyncForUser failed") return jsonerror.InternalServerError() } + logger.WithField("next", syncData.NextBatch).Info("Responding immediately") return util.JSONResponse{ Code: http.StatusOK, JSON: syncData, @@ -107,7 +107,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype hasTimedOut = true // Or for the request to be cancelled case <-req.Context().Done(): - util.GetLogger(req.Context()).WithError(err).Error("request cancelled") + logger.WithError(err).Error("request cancelled") return jsonerror.InternalServerError() } @@ -118,11 +118,12 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype syncData, err = rp.currentSyncForUser(*syncReq, currPos) if err != nil { - util.GetLogger(req.Context()).WithError(err).Error("rp.currentSyncForUser failed") + logger.WithError(err).Error("rp.currentSyncForUser failed") return jsonerror.InternalServerError() } if !syncData.IsEmpty() || hasTimedOut { + logger.WithField("next", syncData.NextBatch).WithField("timed_out", hasTimedOut).Info("Responding") return util.JSONResponse{ Code: http.StatusOK, JSON: syncData,