From d31f6fbcc271242c4ab709ff18b751983521736a Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 23 Aug 2017 12:01:29 +0100 Subject: [PATCH] Use prefix in monolith + components --- .../clientapi/consumers/roomserver.go | 5 +- .../dendrite/clientapi/httputil/httputil.go | 2 +- .../dendrite/clientapi/readers/login.go | 2 +- .../dendrite/clientapi/writers/createroom.go | 2 +- .../dendrite/clientapi/writers/register.go | 2 +- .../cmd/dendrite-monolith-server/main.go | 55 ++++++++++--------- .../federationsender/consumers/roomserver.go | 16 +++--- .../queue/destinationqueue.go | 1 + .../dendrite/federationsender/queue/queue.go | 4 +- .../dendrite/mediaapi/writers/download.go | 1 + .../dendrite/mediaapi/writers/upload.go | 2 +- .../publicroomsapi/consumers/roomserver.go | 6 +- .../dendrite/syncapi/consumers/clientapi.go | 14 +++-- .../dendrite/syncapi/consumers/roomserver.go | 26 +++++---- .../storage/output_room_events_table.go | 1 + .../dendrite/syncapi/sync/notifier.go | 5 +- .../dendrite/syncapi/sync/request.go | 3 +- .../dendrite/syncapi/sync/requestpool.go | 2 +- 18 files changed, 85 insertions(+), 64 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/clientapi/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/clientapi/consumers/roomserver.go index a94750d3c..afc21040a 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/consumers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/clientapi/consumers/roomserver.go @@ -23,7 +23,7 @@ import ( "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" - log "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus" sarama "gopkg.in/Shopify/sarama.v1" ) @@ -68,6 +68,7 @@ func (s *OutputRoomEvent) Start() error { // It is not safe for this function to be called from multiple goroutines, or else the // sync stream position may race and be incorrectly calculated. func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error { + log := logrus.WithField("prefix", "clientapi") // Parse out the event JSON var output api.OutputEvent if err := json.Unmarshal(msg.Value, &output); err != nil { @@ -84,7 +85,7 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error { } ev := output.NewRoomEvent.Event - log.WithFields(log.Fields{ + log.WithFields(logrus.Fields{ "event_id": ev.EventID(), "room_id": ev.RoomID(), "type": ev.Type(), diff --git a/src/github.com/matrix-org/dendrite/clientapi/httputil/httputil.go b/src/github.com/matrix-org/dendrite/clientapi/httputil/httputil.go index 7bc8a46d2..412be0074 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/httputil/httputil.go +++ b/src/github.com/matrix-org/dendrite/clientapi/httputil/httputil.go @@ -42,6 +42,6 @@ func UnmarshalJSONRequest(req *http.Request, iface interface{}) *util.JSONRespon // This should be used to log fatal errors which require investigation. It should not be used // to log client validation errors, etc. func LogThenError(req *http.Request, err error) util.JSONResponse { - util.GetLogger(req.Context()).WithError(err).Error("request failed") + util.GetLogger(req.Context()).WithField("prefix", "clientapi").WithError(err).Error("request failed") return jsonerror.InternalServerError() } diff --git a/src/github.com/matrix-org/dendrite/clientapi/readers/login.go b/src/github.com/matrix-org/dendrite/clientapi/readers/login.go index 270b2e5ac..3a1843afb 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/readers/login.go +++ b/src/github.com/matrix-org/dendrite/clientapi/readers/login.go @@ -78,7 +78,7 @@ func Login( } } - util.GetLogger(req.Context()).WithField("user", r.User).Info("Processing login request") + util.GetLogger(req.Context()).WithField("prefix", "clientapi").WithField("user", r.User).Info("Processing login request") acc, err := accountDB.GetAccountByPassword(r.User, r.Password) if err != nil { diff --git a/src/github.com/matrix-org/dendrite/clientapi/writers/createroom.go b/src/github.com/matrix-org/dendrite/clientapi/writers/createroom.go index ccc4daf92..cb9770d87 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/writers/createroom.go +++ b/src/github.com/matrix-org/dendrite/clientapi/writers/createroom.go @@ -100,7 +100,7 @@ func createRoom(req *http.Request, device *authtypes.Device, cfg config.Dendrite, roomID string, producer *producers.RoomserverProducer, accountDB *accounts.Database, ) util.JSONResponse { - logger := util.GetLogger(req.Context()) + logger := util.GetLogger(req.Context()).WithField("prefix", "clientapi") userID := device.UserID var r createRoomRequest resErr := httputil.UnmarshalJSONRequest(req, &r) diff --git a/src/github.com/matrix-org/dendrite/clientapi/writers/register.go b/src/github.com/matrix-org/dendrite/clientapi/writers/register.go index 318771d32..3057c01ac 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/writers/register.go +++ b/src/github.com/matrix-org/dendrite/clientapi/writers/register.go @@ -102,7 +102,7 @@ func Register(req *http.Request, accountDB *accounts.Database, deviceDB *devices return *resErr } - logger := util.GetLogger(req.Context()) + logger := util.GetLogger(req.Context()).WithField("prefix", "clientapi") logger.WithFields(log.Fields{ "username": r.Username, "auth.type": r.Auth.Type, diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go index dee5c986c..7580e5180 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go @@ -72,6 +72,8 @@ var ( func main() { common.SetupLogging(logDir) + prefixedLog := log.WithField("prefix", "monolith") + flag.Parse() if *configPath == "" { @@ -82,7 +84,7 @@ func main() { log.Fatalf("Invalid config file: %s", err) } - m := newMonolith(cfg) + m := newMonolith(cfg, prefixedLog) m.setupDatabases() m.setupFederation() m.setupKafka() @@ -94,14 +96,14 @@ func main() { // Expose the matrix APIs directly rather than putting them under a /api path. go func() { - log.Info("Listening on ", *httpBindAddr) - log.Fatal(http.ListenAndServe(*httpBindAddr, m.api)) + prefixedLog.Info("Listening on ", *httpBindAddr) + prefixedLog.Fatal(http.ListenAndServe(*httpBindAddr, m.api)) }() // Handle HTTPS if certificate and key are provided go func() { if *certFile != "" && *keyFile != "" { - log.Info("Listening on ", *httpsBindAddr) - log.Fatal(http.ListenAndServeTLS(*httpsBindAddr, *certFile, *keyFile, m.api)) + prefixedLog.Info("Listening on ", *httpsBindAddr) + prefixedLog.Fatal(http.ListenAndServeTLS(*httpsBindAddr, *certFile, *keyFile, m.api)) } }() @@ -113,8 +115,9 @@ func main() { // Some of the setup functions depend on previous setup functions, so they must // be called in the same order as they are defined in the file. type monolith struct { - cfg *config.Dendrite - api *mux.Router + cfg *config.Dendrite + api *mux.Router + logEntry *log.Entry roomServerDB *roomserver_storage.Database accountDB *accounts.Database @@ -142,8 +145,8 @@ type monolith struct { syncAPINotifier *syncapi_sync.Notifier } -func newMonolith(cfg *config.Dendrite) *monolith { - return &monolith{cfg: cfg, api: mux.NewRouter()} +func newMonolith(cfg *config.Dendrite, log *log.Entry) *monolith { + return &monolith{cfg: cfg, api: mux.NewRouter(), logEntry: log} } func (m *monolith) setupDatabases() { @@ -154,31 +157,31 @@ func (m *monolith) setupDatabases() { } m.accountDB, err = accounts.NewDatabase(string(m.cfg.Database.Account), m.cfg.Matrix.ServerName) if err != nil { - log.Panicf("Failed to setup account database(%q): %s", m.cfg.Database.Account, err.Error()) + m.logEntry.Panicf("Failed to setup account database(%q): %s", m.cfg.Database.Account, err.Error()) } m.deviceDB, err = devices.NewDatabase(string(m.cfg.Database.Device), m.cfg.Matrix.ServerName) if err != nil { - log.Panicf("Failed to setup device database(%q): %s", m.cfg.Database.Device, err.Error()) + m.logEntry.Panicf("Failed to setup device database(%q): %s", m.cfg.Database.Device, err.Error()) } m.keyDB, err = keydb.NewDatabase(string(m.cfg.Database.ServerKey)) if err != nil { - log.Panicf("Failed to setup key database(%q): %s", m.cfg.Database.ServerKey, err.Error()) + m.logEntry.Panicf("Failed to setup key database(%q): %s", m.cfg.Database.ServerKey, err.Error()) } m.mediaAPIDB, err = mediaapi_storage.Open(string(m.cfg.Database.MediaAPI)) if err != nil { - log.Panicf("Failed to setup sync api database(%q): %s", m.cfg.Database.MediaAPI, err.Error()) + m.logEntry.Panicf("Failed to setup sync api database(%q): %s", m.cfg.Database.MediaAPI, err.Error()) } m.syncAPIDB, err = syncapi_storage.NewSyncServerDatabase(string(m.cfg.Database.SyncAPI)) if err != nil { - log.Panicf("Failed to setup sync api database(%q): %s", m.cfg.Database.SyncAPI, err.Error()) + m.logEntry.Panicf("Failed to setup sync api database(%q): %s", m.cfg.Database.SyncAPI, err.Error()) } m.federationSenderDB, err = federationsender_storage.NewDatabase(string(m.cfg.Database.FederationSender)) if err != nil { - log.Panicf("startup: failed to create federation sender database with data source %s : %s", m.cfg.Database.FederationSender, err) + m.logEntry.Panicf("startup: failed to create federation sender database with data source %s : %s", m.cfg.Database.FederationSender, err) } m.publicRoomsAPIDB, err = publicroomsapi_storage.NewPublicRoomsServerDatabase(string(m.cfg.Database.PublicRoomsAPI)) if err != nil { - log.Panicf("startup: failed to setup public rooms api database with data source %s : %s", m.cfg.Database.PublicRoomsAPI, err) + m.logEntry.Panicf("startup: failed to setup public rooms api database with data source %s : %s", m.cfg.Database.PublicRoomsAPI, err) } } @@ -201,7 +204,7 @@ func (m *monolith) setupKafka() { if m.cfg.Kafka.UseNaffka { naff, err := naffka.New(&naffka.MemoryDatabase{}) if err != nil { - log.WithFields(log.Fields{ + m.logEntry.WithFields(log.Fields{ log.ErrorKey: err, }).Panic("Failed to setup naffka") } @@ -210,7 +213,7 @@ func (m *monolith) setupKafka() { } else { m.kafkaProducer, err = sarama.NewSyncProducer(m.cfg.Kafka.Addresses, nil) if err != nil { - log.WithFields(log.Fields{ + m.logEntry.WithFields(log.Fields{ log.ErrorKey: err, "addresses": m.cfg.Kafka.Addresses, }).Panic("Failed to setup kafka producers") @@ -224,7 +227,7 @@ func (m *monolith) kafkaConsumer() sarama.Consumer { } consumer, err := sarama.NewConsumer(m.cfg.Kafka.Addresses, nil) if err != nil { - log.WithFields(log.Fields{ + m.logEntry.WithFields(log.Fields{ log.ErrorKey: err, "addresses": m.cfg.Kafka.Addresses, }).Panic("Failed to setup kafka consumers") @@ -266,12 +269,12 @@ func (m *monolith) setupProducers() { func (m *monolith) setupNotifiers() { pos, err := m.syncAPIDB.SyncStreamPosition() if err != nil { - log.Panicf("startup: failed to get latest sync stream position : %s", err) + m.logEntry.Panicf("startup: failed to get latest sync stream position : %s", err) } m.syncAPINotifier = syncapi_sync.NewNotifier(syncapi_types.StreamPosition(pos)) if err = m.syncAPINotifier.Load(m.syncAPIDB); err != nil { - log.Panicf("startup: failed to set up notifier: %s", err) + m.logEntry.Panicf("startup: failed to set up notifier: %s", err) } } @@ -282,28 +285,28 @@ func (m *monolith) setupConsumers() { m.cfg, m.kafkaConsumer(), m.accountDB, m.queryAPI, ) if err = clientAPIConsumer.Start(); err != nil { - log.Panicf("startup: failed to start room server consumer") + m.logEntry.Panicf("startup: failed to start room server consumer") } syncAPIRoomConsumer := syncapi_consumers.NewOutputRoomEvent( m.cfg, m.kafkaConsumer(), m.syncAPINotifier, m.syncAPIDB, m.queryAPI, ) if err = syncAPIRoomConsumer.Start(); err != nil { - log.Panicf("startup: failed to start room server consumer: %s", err) + m.logEntry.Panicf("startup: failed to start room server consumer: %s", err) } syncAPIClientConsumer := syncapi_consumers.NewOutputClientData( m.cfg, m.kafkaConsumer(), m.syncAPINotifier, m.syncAPIDB, ) if err = syncAPIClientConsumer.Start(); err != nil { - log.Panicf("startup: failed to start client API server consumer: %s", err) + m.logEntry.Panicf("startup: failed to start client API server consumer: %s", err) } publicRoomsAPIConsumer := publicroomsapi_consumers.NewOutputRoomEvent( m.cfg, m.kafkaConsumer(), m.publicRoomsAPIDB, m.queryAPI, ) if err = publicRoomsAPIConsumer.Start(); err != nil { - log.Panicf("startup: failed to start room server consumer: %s", err) + m.logEntry.Panicf("startup: failed to start room server consumer: %s", err) } federationSenderQueues := queue.NewOutgoingQueues(m.cfg.Matrix.ServerName, m.federation) @@ -312,7 +315,7 @@ func (m *monolith) setupConsumers() { m.cfg, m.kafkaConsumer(), federationSenderQueues, m.federationSenderDB, m.queryAPI, ) if err = federationSenderRoomConsumer.Start(); err != nil { - log.WithError(err).Panicf("startup: failed to start room server consumer") + m.logEntry.WithError(err).Panicf("startup: failed to start room server consumer") } } diff --git a/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver.go index c92fed147..4796f6501 100644 --- a/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver.go @@ -25,7 +25,8 @@ import ( "github.com/matrix-org/dendrite/federationsender/types" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" - log "github.com/sirupsen/logrus" + + "github.com/sirupsen/logrus" sarama "gopkg.in/Shopify/sarama.v1" ) @@ -71,6 +72,7 @@ func (s *OutputRoomEvent) Start() error { // because updates it will likely fail with a types.EventIDMismatchError when it // realises that it cannot update the room state using the deltas. func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error { + log := logrus.WithField("prefix", "roomserver") // Parse out the event JSON var output api.OutputEvent if err := json.Unmarshal(msg.Value, &output); err != nil { @@ -85,7 +87,7 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error { return nil } ev := &output.NewRoomEvent.Event - log.WithFields(log.Fields{ + log.WithFields(logrus.Fields{ "event_id": ev.EventID(), "room_id": ev.RoomID(), "send_as_server": output.NewRoomEvent.SendAsServer, @@ -93,11 +95,11 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error { if err := s.processMessage(*output.NewRoomEvent); err != nil { // panic rather than continue with an inconsistent database - log.WithFields(log.Fields{ - "event": string(ev.JSON()), - log.ErrorKey: err, - "add": output.NewRoomEvent.AddsStateEventIDs, - "del": output.NewRoomEvent.RemovesStateEventIDs, + log.WithFields(logrus.Fields{ + "event": string(ev.JSON()), + logrus.ErrorKey: err, + "add": output.NewRoomEvent.AddsStateEventIDs, + "del": output.NewRoomEvent.RemovesStateEventIDs, }).Panicf("roomserver output log: write event failure") return nil } diff --git a/src/github.com/matrix-org/dendrite/federationsender/queue/destinationqueue.go b/src/github.com/matrix-org/dendrite/federationsender/queue/destinationqueue.go index 1f48f6c71..209545967 100644 --- a/src/github.com/matrix-org/dendrite/federationsender/queue/destinationqueue.go +++ b/src/github.com/matrix-org/dendrite/federationsender/queue/destinationqueue.go @@ -68,6 +68,7 @@ func (oq *destinationQueue) backgroundSend() { _, err := oq.client.SendTransaction(*t) if err != nil { log.WithFields(log.Fields{ + "prefix": "federationsender", "destination": oq.destination, log.ErrorKey: err, }).Info("problem sending transaction") diff --git a/src/github.com/matrix-org/dendrite/federationsender/queue/queue.go b/src/github.com/matrix-org/dendrite/federationsender/queue/queue.go index d31c12f99..1ad6606c2 100644 --- a/src/github.com/matrix-org/dendrite/federationsender/queue/queue.go +++ b/src/github.com/matrix-org/dendrite/federationsender/queue/queue.go @@ -61,7 +61,9 @@ func (oqs *OutgoingQueues) SendEvent( destinations = filterDestinations(oqs.origin, destinations) log.WithFields(log.Fields{ - "destinations": destinations, "event": ev.EventID(), + "prefix": "federationsender", + "destinations": destinations, + "event": ev.EventID(), }).Info("Sending event") oqs.queuesMutex.Lock() diff --git a/src/github.com/matrix-org/dendrite/mediaapi/writers/download.go b/src/github.com/matrix-org/dendrite/mediaapi/writers/download.go index 6186e24fb..2651f8cbe 100644 --- a/src/github.com/matrix-org/dendrite/mediaapi/writers/download.go +++ b/src/github.com/matrix-org/dendrite/mediaapi/writers/download.go @@ -67,6 +67,7 @@ func Download(w http.ResponseWriter, req *http.Request, origin gomatrixserverlib }, IsThumbnailRequest: isThumbnailRequest, Logger: util.GetLogger(req.Context()).WithFields(log.Fields{ + "prefix": "mediaapi", "Origin": origin, "MediaID": mediaID, }), diff --git a/src/github.com/matrix-org/dendrite/mediaapi/writers/upload.go b/src/github.com/matrix-org/dendrite/mediaapi/writers/upload.go index aa932b53e..b08947f2f 100644 --- a/src/github.com/matrix-org/dendrite/mediaapi/writers/upload.go +++ b/src/github.com/matrix-org/dendrite/mediaapi/writers/upload.go @@ -87,7 +87,7 @@ func parseAndValidateRequest(req *http.Request, cfg *config.Dendrite) (*uploadRe ContentType: types.ContentType(req.Header.Get("Content-Type")), UploadName: types.Filename(url.PathEscape(req.FormValue("filename"))), }, - Logger: util.GetLogger(req.Context()).WithField("Origin", cfg.Matrix.ServerName), + Logger: util.GetLogger(req.Context()).WithField("Origin", cfg.Matrix.ServerName).WithField("prefix", "mediaapi"), } if resErr := r.Validate(*cfg.Media.MaxFileSizeBytes); resErr != nil { diff --git a/src/github.com/matrix-org/dendrite/publicroomsapi/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/publicroomsapi/consumers/roomserver.go index 285fa9372..fe1210f6c 100644 --- a/src/github.com/matrix-org/dendrite/publicroomsapi/consumers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/publicroomsapi/consumers/roomserver.go @@ -21,7 +21,8 @@ import ( "github.com/matrix-org/dendrite/common/config" "github.com/matrix-org/dendrite/publicroomsapi/storage" "github.com/matrix-org/dendrite/roomserver/api" - log "github.com/sirupsen/logrus" + + "github.com/sirupsen/logrus" sarama "gopkg.in/Shopify/sarama.v1" ) @@ -61,6 +62,7 @@ func (s *OutputRoomEvent) Start() error { // onMessage is called when the sync server receives a new event from the room server output log. func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error { + log := logrus.WithField("prefix", "publicroomsapi") // Parse out the event JSON var output api.OutputEvent if err := json.Unmarshal(msg.Value, &output); err != nil { @@ -77,7 +79,7 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error { } ev := output.NewRoomEvent.Event - log.WithFields(log.Fields{ + log.WithFields(logrus.Fields{ "event_id": ev.EventID(), "room_id": ev.RoomID(), "type": ev.Type(), diff --git a/src/github.com/matrix-org/dendrite/syncapi/consumers/clientapi.go b/src/github.com/matrix-org/dendrite/syncapi/consumers/clientapi.go index afe74aed3..ad6738cd5 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/consumers/clientapi.go +++ b/src/github.com/matrix-org/dendrite/syncapi/consumers/clientapi.go @@ -21,7 +21,8 @@ import ( "github.com/matrix-org/dendrite/common/config" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/sync" - log "github.com/sirupsen/logrus" + + "github.com/sirupsen/logrus" sarama "gopkg.in/Shopify/sarama.v1" ) @@ -64,6 +65,7 @@ func (s *OutputClientData) Start() error { // It is not safe for this function to be called from multiple goroutines, or else the // sync stream position may race and be incorrectly calculated. func (s *OutputClientData) onMessage(msg *sarama.ConsumerMessage) error { + log := logrus.WithField("prefix", "syncapi") // Parse out the event JSON var output common.AccountData if err := json.Unmarshal(msg.Value, &output); err != nil { @@ -72,17 +74,17 @@ func (s *OutputClientData) onMessage(msg *sarama.ConsumerMessage) error { return nil } - log.WithFields(log.Fields{ + log.WithFields(logrus.Fields{ "type": output.Type, "room_id": output.RoomID, }).Info("received data from client API server") syncStreamPos, err := s.db.UpsertAccountData(string(msg.Key), output.RoomID, output.Type) if err != nil { - log.WithFields(log.Fields{ - "type": output.Type, - "room_id": output.RoomID, - log.ErrorKey: err, + log.WithFields(logrus.Fields{ + "type": output.Type, + "room_id": output.RoomID, + logrus.ErrorKey: err, }).Panicf("could not save account data") } diff --git a/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go index 759050d09..30ec050d5 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go @@ -25,7 +25,8 @@ import ( "github.com/matrix-org/dendrite/syncapi/sync" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" - log "github.com/sirupsen/logrus" + + "github.com/sirupsen/logrus" sarama "gopkg.in/Shopify/sarama.v1" ) @@ -77,6 +78,7 @@ func (s *OutputRoomEvent) Start() error { // It is not safe for this function to be called from multiple goroutines, or else the // sync stream position may race and be incorrectly calculated. func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error { + log := logrus.WithField("prefix", "syncapi") // Parse out the event JSON var output api.OutputEvent if err := json.Unmarshal(msg.Value, &output); err != nil { @@ -93,18 +95,18 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error { } ev := output.NewRoomEvent.Event - log.WithFields(log.Fields{ + log.WithFields(logrus.Fields{ "event_id": ev.EventID(), "room_id": ev.RoomID(), }).Info("received event from roomserver") addsStateEvents, err := s.lookupStateEvents(output.NewRoomEvent.AddsStateEventIDs, ev) if err != nil { - log.WithFields(log.Fields{ - "event": string(ev.JSON()), - log.ErrorKey: err, - "add": output.NewRoomEvent.AddsStateEventIDs, - "del": output.NewRoomEvent.RemovesStateEventIDs, + log.WithFields(logrus.Fields{ + "event": string(ev.JSON()), + logrus.ErrorKey: err, + "add": output.NewRoomEvent.AddsStateEventIDs, + "del": output.NewRoomEvent.RemovesStateEventIDs, }).Panicf("roomserver output log: state event lookup failure") } @@ -126,11 +128,11 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error { if err != nil { // panic rather than continue with an inconsistent database - log.WithFields(log.Fields{ - "event": string(ev.JSON()), - log.ErrorKey: err, - "add": output.NewRoomEvent.AddsStateEventIDs, - "del": output.NewRoomEvent.RemovesStateEventIDs, + log.WithFields(logrus.Fields{ + "event": string(ev.JSON()), + logrus.ErrorKey: err, + "add": output.NewRoomEvent.AddsStateEventIDs, + "del": output.NewRoomEvent.RemovesStateEventIDs, }).Panicf("roomserver output log: write event failure") return nil } diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go index 2afab341d..0fffbedac 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go @@ -134,6 +134,7 @@ func (s *outputRoomEventsStatements) selectStateInRange( // since it'll just mark the event as not being needed. if len(addIDs) < len(delIDs) { log.WithFields(log.Fields{ + "prefix": "syncapi", "since": oldPos, "current": newPos, "adds": addIDs, diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go index 102e42311..48069d0a8 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go @@ -70,7 +70,10 @@ func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, userID string, pos ty userID := *ev.StateKey() membership, err := ev.Membership() if err != nil { - log.WithError(err).WithField("event_id", ev.EventID()).Errorf( + log.WithError(err).WithFields(log.Fields{ + "prefix": "syncapi", + "event_id": ev.EventID(), + }).Errorf( "Notifier.OnNewEvent: Failed to unmarshal member event", ) } else { diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/request.go b/src/github.com/matrix-org/dendrite/syncapi/sync/request.go index dd1188241..5f6e109bb 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/request.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/request.go @@ -45,6 +45,7 @@ func newSyncRequest(req *http.Request, userID string) (*syncRequest, error) { if err != nil { return nil, err } + log := util.GetLogger(req.Context()).WithField("prefix", "syncapi") // TODO: Additional query params: set_presence, filter return &syncRequest{ userID: userID, @@ -52,7 +53,7 @@ func newSyncRequest(req *http.Request, userID string) (*syncRequest, error) { since: since, wantFullState: wantFullState, limit: defaultTimelineLimit, // TODO: read from filter - log: util.GetLogger(req.Context()), + log: log, }, nil } diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go index cdff6caef..2681d4d55 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go @@ -46,7 +46,7 @@ func NewRequestPool(db *storage.SyncServerDatabase, n *Notifier, adb *accounts.D // until a response is ready, or it times out. func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtypes.Device) util.JSONResponse { // Extract values from request - logger := util.GetLogger(req.Context()) + logger := util.GetLogger(req.Context()).WithField("prefix", "syncapi") userID := device.UserID syncReq, err := newSyncRequest(req, userID) if err != nil {