mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-12 01:13:10 -06:00
Use prefix in monolith + components
This commit is contained in:
parent
624c12b959
commit
d31f6fbcc2
|
|
@ -23,7 +23,7 @@ import (
|
|||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/sirupsen/logrus"
|
||||
sarama "gopkg.in/Shopify/sarama.v1"
|
||||
)
|
||||
|
||||
|
|
@ -68,6 +68,7 @@ func (s *OutputRoomEvent) Start() error {
|
|||
// It is not safe for this function to be called from multiple goroutines, or else the
|
||||
// sync stream position may race and be incorrectly calculated.
|
||||
func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error {
|
||||
log := logrus.WithField("prefix", "clientapi")
|
||||
// Parse out the event JSON
|
||||
var output api.OutputEvent
|
||||
if err := json.Unmarshal(msg.Value, &output); err != nil {
|
||||
|
|
@ -84,7 +85,7 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error {
|
|||
}
|
||||
|
||||
ev := output.NewRoomEvent.Event
|
||||
log.WithFields(log.Fields{
|
||||
log.WithFields(logrus.Fields{
|
||||
"event_id": ev.EventID(),
|
||||
"room_id": ev.RoomID(),
|
||||
"type": ev.Type(),
|
||||
|
|
|
|||
|
|
@ -42,6 +42,6 @@ func UnmarshalJSONRequest(req *http.Request, iface interface{}) *util.JSONRespon
|
|||
// This should be used to log fatal errors which require investigation. It should not be used
|
||||
// to log client validation errors, etc.
|
||||
func LogThenError(req *http.Request, err error) util.JSONResponse {
|
||||
util.GetLogger(req.Context()).WithError(err).Error("request failed")
|
||||
util.GetLogger(req.Context()).WithField("prefix", "clientapi").WithError(err).Error("request failed")
|
||||
return jsonerror.InternalServerError()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -78,7 +78,7 @@ func Login(
|
|||
}
|
||||
}
|
||||
|
||||
util.GetLogger(req.Context()).WithField("user", r.User).Info("Processing login request")
|
||||
util.GetLogger(req.Context()).WithField("prefix", "clientapi").WithField("user", r.User).Info("Processing login request")
|
||||
|
||||
acc, err := accountDB.GetAccountByPassword(r.User, r.Password)
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -100,7 +100,7 @@ func createRoom(req *http.Request, device *authtypes.Device,
|
|||
cfg config.Dendrite, roomID string, producer *producers.RoomserverProducer,
|
||||
accountDB *accounts.Database,
|
||||
) util.JSONResponse {
|
||||
logger := util.GetLogger(req.Context())
|
||||
logger := util.GetLogger(req.Context()).WithField("prefix", "clientapi")
|
||||
userID := device.UserID
|
||||
var r createRoomRequest
|
||||
resErr := httputil.UnmarshalJSONRequest(req, &r)
|
||||
|
|
|
|||
|
|
@ -102,7 +102,7 @@ func Register(req *http.Request, accountDB *accounts.Database, deviceDB *devices
|
|||
return *resErr
|
||||
}
|
||||
|
||||
logger := util.GetLogger(req.Context())
|
||||
logger := util.GetLogger(req.Context()).WithField("prefix", "clientapi")
|
||||
logger.WithFields(log.Fields{
|
||||
"username": r.Username,
|
||||
"auth.type": r.Auth.Type,
|
||||
|
|
|
|||
|
|
@ -72,6 +72,8 @@ var (
|
|||
func main() {
|
||||
common.SetupLogging(logDir)
|
||||
|
||||
prefixedLog := log.WithField("prefix", "monolith")
|
||||
|
||||
flag.Parse()
|
||||
|
||||
if *configPath == "" {
|
||||
|
|
@ -82,7 +84,7 @@ func main() {
|
|||
log.Fatalf("Invalid config file: %s", err)
|
||||
}
|
||||
|
||||
m := newMonolith(cfg)
|
||||
m := newMonolith(cfg, prefixedLog)
|
||||
m.setupDatabases()
|
||||
m.setupFederation()
|
||||
m.setupKafka()
|
||||
|
|
@ -94,14 +96,14 @@ func main() {
|
|||
|
||||
// Expose the matrix APIs directly rather than putting them under a /api path.
|
||||
go func() {
|
||||
log.Info("Listening on ", *httpBindAddr)
|
||||
log.Fatal(http.ListenAndServe(*httpBindAddr, m.api))
|
||||
prefixedLog.Info("Listening on ", *httpBindAddr)
|
||||
prefixedLog.Fatal(http.ListenAndServe(*httpBindAddr, m.api))
|
||||
}()
|
||||
// Handle HTTPS if certificate and key are provided
|
||||
go func() {
|
||||
if *certFile != "" && *keyFile != "" {
|
||||
log.Info("Listening on ", *httpsBindAddr)
|
||||
log.Fatal(http.ListenAndServeTLS(*httpsBindAddr, *certFile, *keyFile, m.api))
|
||||
prefixedLog.Info("Listening on ", *httpsBindAddr)
|
||||
prefixedLog.Fatal(http.ListenAndServeTLS(*httpsBindAddr, *certFile, *keyFile, m.api))
|
||||
}
|
||||
}()
|
||||
|
||||
|
|
@ -115,6 +117,7 @@ func main() {
|
|||
type monolith struct {
|
||||
cfg *config.Dendrite
|
||||
api *mux.Router
|
||||
logEntry *log.Entry
|
||||
|
||||
roomServerDB *roomserver_storage.Database
|
||||
accountDB *accounts.Database
|
||||
|
|
@ -142,8 +145,8 @@ type monolith struct {
|
|||
syncAPINotifier *syncapi_sync.Notifier
|
||||
}
|
||||
|
||||
func newMonolith(cfg *config.Dendrite) *monolith {
|
||||
return &monolith{cfg: cfg, api: mux.NewRouter()}
|
||||
func newMonolith(cfg *config.Dendrite, log *log.Entry) *monolith {
|
||||
return &monolith{cfg: cfg, api: mux.NewRouter(), logEntry: log}
|
||||
}
|
||||
|
||||
func (m *monolith) setupDatabases() {
|
||||
|
|
@ -154,31 +157,31 @@ func (m *monolith) setupDatabases() {
|
|||
}
|
||||
m.accountDB, err = accounts.NewDatabase(string(m.cfg.Database.Account), m.cfg.Matrix.ServerName)
|
||||
if err != nil {
|
||||
log.Panicf("Failed to setup account database(%q): %s", m.cfg.Database.Account, err.Error())
|
||||
m.logEntry.Panicf("Failed to setup account database(%q): %s", m.cfg.Database.Account, err.Error())
|
||||
}
|
||||
m.deviceDB, err = devices.NewDatabase(string(m.cfg.Database.Device), m.cfg.Matrix.ServerName)
|
||||
if err != nil {
|
||||
log.Panicf("Failed to setup device database(%q): %s", m.cfg.Database.Device, err.Error())
|
||||
m.logEntry.Panicf("Failed to setup device database(%q): %s", m.cfg.Database.Device, err.Error())
|
||||
}
|
||||
m.keyDB, err = keydb.NewDatabase(string(m.cfg.Database.ServerKey))
|
||||
if err != nil {
|
||||
log.Panicf("Failed to setup key database(%q): %s", m.cfg.Database.ServerKey, err.Error())
|
||||
m.logEntry.Panicf("Failed to setup key database(%q): %s", m.cfg.Database.ServerKey, err.Error())
|
||||
}
|
||||
m.mediaAPIDB, err = mediaapi_storage.Open(string(m.cfg.Database.MediaAPI))
|
||||
if err != nil {
|
||||
log.Panicf("Failed to setup sync api database(%q): %s", m.cfg.Database.MediaAPI, err.Error())
|
||||
m.logEntry.Panicf("Failed to setup sync api database(%q): %s", m.cfg.Database.MediaAPI, err.Error())
|
||||
}
|
||||
m.syncAPIDB, err = syncapi_storage.NewSyncServerDatabase(string(m.cfg.Database.SyncAPI))
|
||||
if err != nil {
|
||||
log.Panicf("Failed to setup sync api database(%q): %s", m.cfg.Database.SyncAPI, err.Error())
|
||||
m.logEntry.Panicf("Failed to setup sync api database(%q): %s", m.cfg.Database.SyncAPI, err.Error())
|
||||
}
|
||||
m.federationSenderDB, err = federationsender_storage.NewDatabase(string(m.cfg.Database.FederationSender))
|
||||
if err != nil {
|
||||
log.Panicf("startup: failed to create federation sender database with data source %s : %s", m.cfg.Database.FederationSender, err)
|
||||
m.logEntry.Panicf("startup: failed to create federation sender database with data source %s : %s", m.cfg.Database.FederationSender, err)
|
||||
}
|
||||
m.publicRoomsAPIDB, err = publicroomsapi_storage.NewPublicRoomsServerDatabase(string(m.cfg.Database.PublicRoomsAPI))
|
||||
if err != nil {
|
||||
log.Panicf("startup: failed to setup public rooms api database with data source %s : %s", m.cfg.Database.PublicRoomsAPI, err)
|
||||
m.logEntry.Panicf("startup: failed to setup public rooms api database with data source %s : %s", m.cfg.Database.PublicRoomsAPI, err)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -201,7 +204,7 @@ func (m *monolith) setupKafka() {
|
|||
if m.cfg.Kafka.UseNaffka {
|
||||
naff, err := naffka.New(&naffka.MemoryDatabase{})
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
m.logEntry.WithFields(log.Fields{
|
||||
log.ErrorKey: err,
|
||||
}).Panic("Failed to setup naffka")
|
||||
}
|
||||
|
|
@ -210,7 +213,7 @@ func (m *monolith) setupKafka() {
|
|||
} else {
|
||||
m.kafkaProducer, err = sarama.NewSyncProducer(m.cfg.Kafka.Addresses, nil)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
m.logEntry.WithFields(log.Fields{
|
||||
log.ErrorKey: err,
|
||||
"addresses": m.cfg.Kafka.Addresses,
|
||||
}).Panic("Failed to setup kafka producers")
|
||||
|
|
@ -224,7 +227,7 @@ func (m *monolith) kafkaConsumer() sarama.Consumer {
|
|||
}
|
||||
consumer, err := sarama.NewConsumer(m.cfg.Kafka.Addresses, nil)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
m.logEntry.WithFields(log.Fields{
|
||||
log.ErrorKey: err,
|
||||
"addresses": m.cfg.Kafka.Addresses,
|
||||
}).Panic("Failed to setup kafka consumers")
|
||||
|
|
@ -266,12 +269,12 @@ func (m *monolith) setupProducers() {
|
|||
func (m *monolith) setupNotifiers() {
|
||||
pos, err := m.syncAPIDB.SyncStreamPosition()
|
||||
if err != nil {
|
||||
log.Panicf("startup: failed to get latest sync stream position : %s", err)
|
||||
m.logEntry.Panicf("startup: failed to get latest sync stream position : %s", err)
|
||||
}
|
||||
|
||||
m.syncAPINotifier = syncapi_sync.NewNotifier(syncapi_types.StreamPosition(pos))
|
||||
if err = m.syncAPINotifier.Load(m.syncAPIDB); err != nil {
|
||||
log.Panicf("startup: failed to set up notifier: %s", err)
|
||||
m.logEntry.Panicf("startup: failed to set up notifier: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -282,28 +285,28 @@ func (m *monolith) setupConsumers() {
|
|||
m.cfg, m.kafkaConsumer(), m.accountDB, m.queryAPI,
|
||||
)
|
||||
if err = clientAPIConsumer.Start(); err != nil {
|
||||
log.Panicf("startup: failed to start room server consumer")
|
||||
m.logEntry.Panicf("startup: failed to start room server consumer")
|
||||
}
|
||||
|
||||
syncAPIRoomConsumer := syncapi_consumers.NewOutputRoomEvent(
|
||||
m.cfg, m.kafkaConsumer(), m.syncAPINotifier, m.syncAPIDB, m.queryAPI,
|
||||
)
|
||||
if err = syncAPIRoomConsumer.Start(); err != nil {
|
||||
log.Panicf("startup: failed to start room server consumer: %s", err)
|
||||
m.logEntry.Panicf("startup: failed to start room server consumer: %s", err)
|
||||
}
|
||||
|
||||
syncAPIClientConsumer := syncapi_consumers.NewOutputClientData(
|
||||
m.cfg, m.kafkaConsumer(), m.syncAPINotifier, m.syncAPIDB,
|
||||
)
|
||||
if err = syncAPIClientConsumer.Start(); err != nil {
|
||||
log.Panicf("startup: failed to start client API server consumer: %s", err)
|
||||
m.logEntry.Panicf("startup: failed to start client API server consumer: %s", err)
|
||||
}
|
||||
|
||||
publicRoomsAPIConsumer := publicroomsapi_consumers.NewOutputRoomEvent(
|
||||
m.cfg, m.kafkaConsumer(), m.publicRoomsAPIDB, m.queryAPI,
|
||||
)
|
||||
if err = publicRoomsAPIConsumer.Start(); err != nil {
|
||||
log.Panicf("startup: failed to start room server consumer: %s", err)
|
||||
m.logEntry.Panicf("startup: failed to start room server consumer: %s", err)
|
||||
}
|
||||
|
||||
federationSenderQueues := queue.NewOutgoingQueues(m.cfg.Matrix.ServerName, m.federation)
|
||||
|
|
@ -312,7 +315,7 @@ func (m *monolith) setupConsumers() {
|
|||
m.cfg, m.kafkaConsumer(), federationSenderQueues, m.federationSenderDB, m.queryAPI,
|
||||
)
|
||||
if err = federationSenderRoomConsumer.Start(); err != nil {
|
||||
log.WithError(err).Panicf("startup: failed to start room server consumer")
|
||||
m.logEntry.WithError(err).Panicf("startup: failed to start room server consumer")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -25,7 +25,8 @@ import (
|
|||
"github.com/matrix-org/dendrite/federationsender/types"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
sarama "gopkg.in/Shopify/sarama.v1"
|
||||
)
|
||||
|
||||
|
|
@ -71,6 +72,7 @@ func (s *OutputRoomEvent) Start() error {
|
|||
// because updates it will likely fail with a types.EventIDMismatchError when it
|
||||
// realises that it cannot update the room state using the deltas.
|
||||
func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error {
|
||||
log := logrus.WithField("prefix", "roomserver")
|
||||
// Parse out the event JSON
|
||||
var output api.OutputEvent
|
||||
if err := json.Unmarshal(msg.Value, &output); err != nil {
|
||||
|
|
@ -85,7 +87,7 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error {
|
|||
return nil
|
||||
}
|
||||
ev := &output.NewRoomEvent.Event
|
||||
log.WithFields(log.Fields{
|
||||
log.WithFields(logrus.Fields{
|
||||
"event_id": ev.EventID(),
|
||||
"room_id": ev.RoomID(),
|
||||
"send_as_server": output.NewRoomEvent.SendAsServer,
|
||||
|
|
@ -93,9 +95,9 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error {
|
|||
|
||||
if err := s.processMessage(*output.NewRoomEvent); err != nil {
|
||||
// panic rather than continue with an inconsistent database
|
||||
log.WithFields(log.Fields{
|
||||
log.WithFields(logrus.Fields{
|
||||
"event": string(ev.JSON()),
|
||||
log.ErrorKey: err,
|
||||
logrus.ErrorKey: err,
|
||||
"add": output.NewRoomEvent.AddsStateEventIDs,
|
||||
"del": output.NewRoomEvent.RemovesStateEventIDs,
|
||||
}).Panicf("roomserver output log: write event failure")
|
||||
|
|
|
|||
|
|
@ -68,6 +68,7 @@ func (oq *destinationQueue) backgroundSend() {
|
|||
_, err := oq.client.SendTransaction(*t)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"prefix": "federationsender",
|
||||
"destination": oq.destination,
|
||||
log.ErrorKey: err,
|
||||
}).Info("problem sending transaction")
|
||||
|
|
|
|||
|
|
@ -61,7 +61,9 @@ func (oqs *OutgoingQueues) SendEvent(
|
|||
destinations = filterDestinations(oqs.origin, destinations)
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"destinations": destinations, "event": ev.EventID(),
|
||||
"prefix": "federationsender",
|
||||
"destinations": destinations,
|
||||
"event": ev.EventID(),
|
||||
}).Info("Sending event")
|
||||
|
||||
oqs.queuesMutex.Lock()
|
||||
|
|
|
|||
|
|
@ -67,6 +67,7 @@ func Download(w http.ResponseWriter, req *http.Request, origin gomatrixserverlib
|
|||
},
|
||||
IsThumbnailRequest: isThumbnailRequest,
|
||||
Logger: util.GetLogger(req.Context()).WithFields(log.Fields{
|
||||
"prefix": "mediaapi",
|
||||
"Origin": origin,
|
||||
"MediaID": mediaID,
|
||||
}),
|
||||
|
|
|
|||
|
|
@ -87,7 +87,7 @@ func parseAndValidateRequest(req *http.Request, cfg *config.Dendrite) (*uploadRe
|
|||
ContentType: types.ContentType(req.Header.Get("Content-Type")),
|
||||
UploadName: types.Filename(url.PathEscape(req.FormValue("filename"))),
|
||||
},
|
||||
Logger: util.GetLogger(req.Context()).WithField("Origin", cfg.Matrix.ServerName),
|
||||
Logger: util.GetLogger(req.Context()).WithField("Origin", cfg.Matrix.ServerName).WithField("prefix", "mediaapi"),
|
||||
}
|
||||
|
||||
if resErr := r.Validate(*cfg.Media.MaxFileSizeBytes); resErr != nil {
|
||||
|
|
|
|||
|
|
@ -21,7 +21,8 @@ import (
|
|||
"github.com/matrix-org/dendrite/common/config"
|
||||
"github.com/matrix-org/dendrite/publicroomsapi/storage"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
sarama "gopkg.in/Shopify/sarama.v1"
|
||||
)
|
||||
|
||||
|
|
@ -61,6 +62,7 @@ func (s *OutputRoomEvent) Start() error {
|
|||
|
||||
// onMessage is called when the sync server receives a new event from the room server output log.
|
||||
func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error {
|
||||
log := logrus.WithField("prefix", "publicroomsapi")
|
||||
// Parse out the event JSON
|
||||
var output api.OutputEvent
|
||||
if err := json.Unmarshal(msg.Value, &output); err != nil {
|
||||
|
|
@ -77,7 +79,7 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error {
|
|||
}
|
||||
|
||||
ev := output.NewRoomEvent.Event
|
||||
log.WithFields(log.Fields{
|
||||
log.WithFields(logrus.Fields{
|
||||
"event_id": ev.EventID(),
|
||||
"room_id": ev.RoomID(),
|
||||
"type": ev.Type(),
|
||||
|
|
|
|||
|
|
@ -21,7 +21,8 @@ import (
|
|||
"github.com/matrix-org/dendrite/common/config"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||
"github.com/matrix-org/dendrite/syncapi/sync"
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
sarama "gopkg.in/Shopify/sarama.v1"
|
||||
)
|
||||
|
||||
|
|
@ -64,6 +65,7 @@ func (s *OutputClientData) Start() error {
|
|||
// It is not safe for this function to be called from multiple goroutines, or else the
|
||||
// sync stream position may race and be incorrectly calculated.
|
||||
func (s *OutputClientData) onMessage(msg *sarama.ConsumerMessage) error {
|
||||
log := logrus.WithField("prefix", "syncapi")
|
||||
// Parse out the event JSON
|
||||
var output common.AccountData
|
||||
if err := json.Unmarshal(msg.Value, &output); err != nil {
|
||||
|
|
@ -72,17 +74,17 @@ func (s *OutputClientData) onMessage(msg *sarama.ConsumerMessage) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
log.WithFields(logrus.Fields{
|
||||
"type": output.Type,
|
||||
"room_id": output.RoomID,
|
||||
}).Info("received data from client API server")
|
||||
|
||||
syncStreamPos, err := s.db.UpsertAccountData(string(msg.Key), output.RoomID, output.Type)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
log.WithFields(logrus.Fields{
|
||||
"type": output.Type,
|
||||
"room_id": output.RoomID,
|
||||
log.ErrorKey: err,
|
||||
logrus.ErrorKey: err,
|
||||
}).Panicf("could not save account data")
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -25,7 +25,8 @@ import (
|
|||
"github.com/matrix-org/dendrite/syncapi/sync"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
sarama "gopkg.in/Shopify/sarama.v1"
|
||||
)
|
||||
|
||||
|
|
@ -77,6 +78,7 @@ func (s *OutputRoomEvent) Start() error {
|
|||
// It is not safe for this function to be called from multiple goroutines, or else the
|
||||
// sync stream position may race and be incorrectly calculated.
|
||||
func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error {
|
||||
log := logrus.WithField("prefix", "syncapi")
|
||||
// Parse out the event JSON
|
||||
var output api.OutputEvent
|
||||
if err := json.Unmarshal(msg.Value, &output); err != nil {
|
||||
|
|
@ -93,16 +95,16 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error {
|
|||
}
|
||||
|
||||
ev := output.NewRoomEvent.Event
|
||||
log.WithFields(log.Fields{
|
||||
log.WithFields(logrus.Fields{
|
||||
"event_id": ev.EventID(),
|
||||
"room_id": ev.RoomID(),
|
||||
}).Info("received event from roomserver")
|
||||
|
||||
addsStateEvents, err := s.lookupStateEvents(output.NewRoomEvent.AddsStateEventIDs, ev)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
log.WithFields(logrus.Fields{
|
||||
"event": string(ev.JSON()),
|
||||
log.ErrorKey: err,
|
||||
logrus.ErrorKey: err,
|
||||
"add": output.NewRoomEvent.AddsStateEventIDs,
|
||||
"del": output.NewRoomEvent.RemovesStateEventIDs,
|
||||
}).Panicf("roomserver output log: state event lookup failure")
|
||||
|
|
@ -126,9 +128,9 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error {
|
|||
|
||||
if err != nil {
|
||||
// panic rather than continue with an inconsistent database
|
||||
log.WithFields(log.Fields{
|
||||
log.WithFields(logrus.Fields{
|
||||
"event": string(ev.JSON()),
|
||||
log.ErrorKey: err,
|
||||
logrus.ErrorKey: err,
|
||||
"add": output.NewRoomEvent.AddsStateEventIDs,
|
||||
"del": output.NewRoomEvent.RemovesStateEventIDs,
|
||||
}).Panicf("roomserver output log: write event failure")
|
||||
|
|
|
|||
|
|
@ -134,6 +134,7 @@ func (s *outputRoomEventsStatements) selectStateInRange(
|
|||
// since it'll just mark the event as not being needed.
|
||||
if len(addIDs) < len(delIDs) {
|
||||
log.WithFields(log.Fields{
|
||||
"prefix": "syncapi",
|
||||
"since": oldPos,
|
||||
"current": newPos,
|
||||
"adds": addIDs,
|
||||
|
|
|
|||
|
|
@ -70,7 +70,10 @@ func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, userID string, pos ty
|
|||
userID := *ev.StateKey()
|
||||
membership, err := ev.Membership()
|
||||
if err != nil {
|
||||
log.WithError(err).WithField("event_id", ev.EventID()).Errorf(
|
||||
log.WithError(err).WithFields(log.Fields{
|
||||
"prefix": "syncapi",
|
||||
"event_id": ev.EventID(),
|
||||
}).Errorf(
|
||||
"Notifier.OnNewEvent: Failed to unmarshal member event",
|
||||
)
|
||||
} else {
|
||||
|
|
|
|||
|
|
@ -45,6 +45,7 @@ func newSyncRequest(req *http.Request, userID string) (*syncRequest, error) {
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
log := util.GetLogger(req.Context()).WithField("prefix", "syncapi")
|
||||
// TODO: Additional query params: set_presence, filter
|
||||
return &syncRequest{
|
||||
userID: userID,
|
||||
|
|
@ -52,7 +53,7 @@ func newSyncRequest(req *http.Request, userID string) (*syncRequest, error) {
|
|||
since: since,
|
||||
wantFullState: wantFullState,
|
||||
limit: defaultTimelineLimit, // TODO: read from filter
|
||||
log: util.GetLogger(req.Context()),
|
||||
log: log,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -46,7 +46,7 @@ func NewRequestPool(db *storage.SyncServerDatabase, n *Notifier, adb *accounts.D
|
|||
// until a response is ready, or it times out.
|
||||
func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
||||
// Extract values from request
|
||||
logger := util.GetLogger(req.Context())
|
||||
logger := util.GetLogger(req.Context()).WithField("prefix", "syncapi")
|
||||
userID := device.UserID
|
||||
syncReq, err := newSyncRequest(req, userID)
|
||||
if err != nil {
|
||||
|
|
|
|||
Loading…
Reference in a new issue