mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-15 01:53:09 -06:00
zerolog: replace logrus
This commit is contained in:
parent
447746e214
commit
f07bd172fa
|
|
@ -21,7 +21,6 @@ import (
|
||||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||||
"github.com/matrix-org/dendrite/setup/process"
|
"github.com/matrix-org/dendrite/setup/process"
|
||||||
"github.com/matrix-org/gomatrixserverlib/spec"
|
"github.com/matrix-org/gomatrixserverlib/spec"
|
||||||
"github.com/sirupsen/logrus"
|
|
||||||
|
|
||||||
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
|
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
|
||||||
"github.com/matrix-org/dendrite/appservice/consumers"
|
"github.com/matrix-org/dendrite/appservice/consumers"
|
||||||
|
|
@ -29,6 +28,7 @@ import (
|
||||||
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
|
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/dendrite/setup/config"
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||||
|
log "github.com/rs/zerolog/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewInternalAPI returns a concerete implementation of the internal API. Callers
|
// NewInternalAPI returns a concerete implementation of the internal API. Callers
|
||||||
|
|
@ -59,9 +59,7 @@ func NewInternalAPI(
|
||||||
for _, appservice := range cfg.Derived.ApplicationServices {
|
for _, appservice := range cfg.Derived.ApplicationServices {
|
||||||
// Create bot account for this AS if it doesn't already exist
|
// Create bot account for this AS if it doesn't already exist
|
||||||
if err := generateAppServiceAccount(userAPI, appservice, cfg.Global.ServerName); err != nil {
|
if err := generateAppServiceAccount(userAPI, appservice, cfg.Global.ServerName); err != nil {
|
||||||
logrus.WithFields(logrus.Fields{
|
log.Logger.Panic().Str("appservice", appservice.ID).Err(err).Msg("failed to generate bot account for appservice")
|
||||||
"appservice": appservice.ID,
|
|
||||||
}).WithError(err).Panicf("failed to generate bot account for appservice")
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -73,7 +71,7 @@ func NewInternalAPI(
|
||||||
js, rsAPI,
|
js, rsAPI,
|
||||||
)
|
)
|
||||||
if err := consumer.Start(); err != nil {
|
if err := consumer.Start(); err != nil {
|
||||||
logrus.WithError(err).Panicf("failed to start appservice roomserver consumer")
|
log.Panic().Err(err).Msg("failed to start appservice roomserver consumer")
|
||||||
}
|
}
|
||||||
|
|
||||||
return appserviceQueryAPI
|
return appserviceQueryAPI
|
||||||
|
|
|
||||||
|
|
@ -36,7 +36,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/setup/process"
|
"github.com/matrix-org/dendrite/setup/process"
|
||||||
"github.com/matrix-org/dendrite/syncapi/synctypes"
|
"github.com/matrix-org/dendrite/syncapi/synctypes"
|
||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/rs/zerolog/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ApplicationServiceTransaction is the transaction that is sent off to an
|
// ApplicationServiceTransaction is the transaction that is sent off to an
|
||||||
|
|
@ -104,7 +104,7 @@ func (s *OutputRoomEventConsumer) Start() error {
|
||||||
func (s *OutputRoomEventConsumer) onMessage(
|
func (s *OutputRoomEventConsumer) onMessage(
|
||||||
ctx context.Context, state *appserviceState, msgs []*nats.Msg,
|
ctx context.Context, state *appserviceState, msgs []*nats.Msg,
|
||||||
) bool {
|
) bool {
|
||||||
log.WithField("appservice", state.ID).Tracef("Appservice worker received %d message(s) from roomserver", len(msgs))
|
log.Trace().Str("appservice", state.ID).Msgf("Appservice worker received %d message(s) from roomserver", len(msgs))
|
||||||
events := make([]*types.HeaderedEvent, 0, len(msgs))
|
events := make([]*types.HeaderedEvent, 0, len(msgs))
|
||||||
for _, msg := range msgs {
|
for _, msg := range msgs {
|
||||||
// Only handle events we care about
|
// Only handle events we care about
|
||||||
|
|
@ -116,7 +116,7 @@ func (s *OutputRoomEventConsumer) onMessage(
|
||||||
var output api.OutputEvent
|
var output api.OutputEvent
|
||||||
if err := json.Unmarshal(msg.Data, &output); err != nil {
|
if err := json.Unmarshal(msg.Data, &output); err != nil {
|
||||||
// If the message was invalid, log it and move on to the next message in the stream
|
// If the message was invalid, log it and move on to the next message in the stream
|
||||||
log.WithField("appservice", state.ID).WithError(err).Errorf("Appservice failed to parse message, ignoring")
|
log.Error().Str("appservice", state.ID).Err(err).Msg("Appservice failed to parse message, ignoring")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
switch output.Type {
|
switch output.Type {
|
||||||
|
|
@ -139,7 +139,7 @@ func (s *OutputRoomEventConsumer) onMessage(
|
||||||
}
|
}
|
||||||
if len(eventsReq.EventIDs) > 0 {
|
if len(eventsReq.EventIDs) > 0 {
|
||||||
if err := s.rsAPI.QueryEventsByID(s.ctx, eventsReq, eventsRes); err != nil {
|
if err := s.rsAPI.QueryEventsByID(s.ctx, eventsReq, eventsRes); err != nil {
|
||||||
log.WithError(err).Errorf("s.rsAPI.QueryEventsByID failed")
|
log.Error().Err(err).Msg("s.rsAPI.QueryEventsByID failed")
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
events = append(events, eventsRes.Events...)
|
events = append(events, eventsRes.Events...)
|
||||||
|
|
@ -167,7 +167,7 @@ func (s *OutputRoomEventConsumer) onMessage(
|
||||||
|
|
||||||
// Send event to any relevant application services. If we hit
|
// Send event to any relevant application services. If we hit
|
||||||
// an error here, return false, so that we negatively ack.
|
// an error here, return false, so that we negatively ack.
|
||||||
log.WithField("appservice", state.ID).Debugf("Appservice worker sending %d events(s) from roomserver", len(events))
|
log.Debug().Str("appservice", state.ID).Msgf("Appservice worker sending %d events(s) from roomserver", len(events))
|
||||||
return s.sendEvents(ctx, state, events, txnID) == nil
|
return s.sendEvents(ctx, state, events, txnID) == nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -223,7 +223,7 @@ func (s *appserviceState) backoffAndPause(err error) error {
|
||||||
s.backoff++
|
s.backoff++
|
||||||
}
|
}
|
||||||
duration := time.Second * time.Duration(math.Pow(2, float64(s.backoff)))
|
duration := time.Second * time.Duration(math.Pow(2, float64(s.backoff)))
|
||||||
log.WithField("appservice", s.ID).WithError(err).Errorf("Unable to send transaction to appservice, backing off for %s", duration.String())
|
log.Error().Str("appservice", s.ID).Err(err).Msgf("Unable to send transaction to appservice, backing off for %s", duration.String())
|
||||||
time.Sleep(duration)
|
time.Sleep(duration)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
@ -258,10 +258,7 @@ func (s *OutputRoomEventConsumer) appserviceIsInterestedInEvent(ctx context.Cont
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
log.WithFields(log.Fields{
|
log.Error().Str("appservice", appservice.ID).Str("room_id", event.RoomID()).Err(err).Msg("Unable to get aliases for room")
|
||||||
"appservice": appservice.ID,
|
|
||||||
"room_id": event.RoomID(),
|
|
||||||
}).WithError(err).Errorf("Unable to get aliases for room")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if any of the members in the room match the appservice
|
// Check if any of the members in the room match the appservice
|
||||||
|
|
@ -303,10 +300,7 @@ func (s *OutputRoomEventConsumer) appserviceJoinedAtEvent(ctx context.Context, e
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
log.WithFields(log.Fields{
|
log.Error().Str("appservice", appservice.ID).Str("room_id", event.RoomID()).Err(err).Msg("Unable to get membership for room")
|
||||||
"appservice": appservice.ID,
|
|
||||||
"room_id": event.RoomID(),
|
|
||||||
}).WithError(err).Errorf("Unable to get membership for room")
|
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -25,7 +25,7 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/rs/zerolog/log"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/appservice/api"
|
"github.com/matrix-org/dendrite/appservice/api"
|
||||||
"github.com/matrix-org/dendrite/internal"
|
"github.com/matrix-org/dendrite/internal"
|
||||||
|
|
@ -77,15 +77,12 @@ func (a *AppServiceQueryAPI) RoomAliasExists(
|
||||||
defer func() {
|
defer func() {
|
||||||
err = resp.Body.Close()
|
err = resp.Body.Close()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithFields(log.Fields{
|
log.Error().Err(err).Str("appservice_id", appservice.ID).Int("status_code", resp.StatusCode).Msg("Unable to close application service response body")
|
||||||
"appservice_id": appservice.ID,
|
|
||||||
"status_code": resp.StatusCode,
|
|
||||||
}).WithError(err).Error("Unable to close application service response body")
|
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Errorf("Issue querying room alias on application service %s", appservice.ID)
|
log.Error().Err(err).Msgf("Issue querying room alias on application service %s", appservice.ID)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
switch resp.StatusCode {
|
switch resp.StatusCode {
|
||||||
|
|
@ -97,10 +94,7 @@ func (a *AppServiceQueryAPI) RoomAliasExists(
|
||||||
// Room does not exist
|
// Room does not exist
|
||||||
default:
|
default:
|
||||||
// Application service reported an error. Warn
|
// Application service reported an error. Warn
|
||||||
log.WithFields(log.Fields{
|
log.Warn().Str("appservice_id", appservice.ID).Int("status_code", resp.StatusCode).Msg("Application service responded with non-OK status code")
|
||||||
"appservice_id": appservice.ID,
|
|
||||||
"status_code": resp.StatusCode,
|
|
||||||
}).Warn("Application service responded with non-OK status code")
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -141,17 +135,12 @@ func (a *AppServiceQueryAPI) UserIDExists(
|
||||||
defer func() {
|
defer func() {
|
||||||
err = resp.Body.Close()
|
err = resp.Body.Close()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithFields(log.Fields{
|
log.Error().Str("appservice_id", appservice.ID).Int("status_code", resp.StatusCode).Msg("Unable to close application service response body")
|
||||||
"appservice_id": appservice.ID,
|
|
||||||
"status_code": resp.StatusCode,
|
|
||||||
}).Error("Unable to close application service response body")
|
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithFields(log.Fields{
|
log.Error().Err(err).Str("appservice_id", appservice.ID).Msg("issue querying user ID on application service")
|
||||||
"appservice_id": appservice.ID,
|
|
||||||
}).WithError(err).Error("issue querying user ID on application service")
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if resp.StatusCode == http.StatusOK {
|
if resp.StatusCode == http.StatusOK {
|
||||||
|
|
@ -161,10 +150,7 @@ func (a *AppServiceQueryAPI) UserIDExists(
|
||||||
}
|
}
|
||||||
|
|
||||||
// Log non OK
|
// Log non OK
|
||||||
log.WithFields(log.Fields{
|
log.Warn().Str("appservice_id", appservice.ID).Int("status_code", resp.StatusCode).Msg("application service responded with non-OK status code")
|
||||||
"appservice_id": appservice.ID,
|
|
||||||
"status_code": resp.StatusCode,
|
|
||||||
}).Warn("application service responded with non-OK status code")
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -217,7 +203,7 @@ func (a *AppServiceQueryAPI) Locations(
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := requestDo[[]api.ASLocationResponse](as.HTTPClient, url+"?"+params.Encode(), &asLocations); err != nil {
|
if err := requestDo[[]api.ASLocationResponse](as.HTTPClient, url+"?"+params.Encode(), &asLocations); err != nil {
|
||||||
log.WithError(err).Error("unable to get 'locations' from application service")
|
log.Error().Err(err).Msg("unable to get 'locations' from application service")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -252,7 +238,7 @@ func (a *AppServiceQueryAPI) User(
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := requestDo[[]api.ASUserResponse](as.HTTPClient, url+"?"+params.Encode(), &asUsers); err != nil {
|
if err := requestDo[[]api.ASUserResponse](as.HTTPClient, url+"?"+params.Encode(), &asUsers); err != nil {
|
||||||
log.WithError(err).Error("unable to get 'user' from application service")
|
log.Error().Err(err).Msg("unable to get 'user' from application service")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -290,7 +276,7 @@ func (a *AppServiceQueryAPI) Protocols(
|
||||||
for _, as := range a.Cfg.Derived.ApplicationServices {
|
for _, as := range a.Cfg.Derived.ApplicationServices {
|
||||||
var proto api.ASProtocolResponse
|
var proto api.ASProtocolResponse
|
||||||
if err := requestDo[api.ASProtocolResponse](as.HTTPClient, as.RequestUrl()+api.ASProtocolPath+req.Protocol, &proto); err != nil {
|
if err := requestDo[api.ASProtocolResponse](as.HTTPClient, as.RequestUrl()+api.ASProtocolPath+req.Protocol, &proto); err != nil {
|
||||||
log.WithError(err).Error("unable to get 'protocol' from application service")
|
log.Error().Err(err).Msg("unable to get 'protocol' from application service")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -320,7 +306,7 @@ func (a *AppServiceQueryAPI) Protocols(
|
||||||
for _, p := range as.Protocols {
|
for _, p := range as.Protocols {
|
||||||
var proto api.ASProtocolResponse
|
var proto api.ASProtocolResponse
|
||||||
if err := requestDo[api.ASProtocolResponse](as.HTTPClient, as.RequestUrl()+api.ASProtocolPath+p, &proto); err != nil {
|
if err := requestDo[api.ASProtocolResponse](as.HTTPClient, as.RequestUrl()+api.ASProtocolPath+p, &proto); err != nil {
|
||||||
log.WithError(err).Error("unable to get 'protocol' from application service")
|
log.Error().Err(err).Msg("unable to get 'protocol' from application service")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
existing, ok := response[p]
|
existing, ok := response[p]
|
||||||
|
|
|
||||||
|
|
@ -24,8 +24,9 @@ import (
|
||||||
"github.com/matrix-org/dendrite/setup/config"
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
"github.com/matrix-org/dendrite/userapi/api"
|
"github.com/matrix-org/dendrite/userapi/api"
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
"github.com/sirupsen/logrus"
|
|
||||||
"github.com/tidwall/gjson"
|
"github.com/tidwall/gjson"
|
||||||
|
|
||||||
|
log "github.com/rs/zerolog/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Type represents an auth type
|
// Type represents an auth type
|
||||||
|
|
@ -177,7 +178,7 @@ func (u *UserInteractive) challenge(sessionID string) *util.JSONResponse {
|
||||||
func (u *UserInteractive) NewSession() *util.JSONResponse {
|
func (u *UserInteractive) NewSession() *util.JSONResponse {
|
||||||
sessionID, err := GenerateAccessToken()
|
sessionID, err := GenerateAccessToken()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Error("failed to generate session ID")
|
log.Error().Err(err).Msg("failed to generate session ID")
|
||||||
res := jsonerror.InternalServerError()
|
res := jsonerror.InternalServerError()
|
||||||
return &res
|
return &res
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -21,7 +21,7 @@ import (
|
||||||
|
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
"github.com/sirupsen/logrus"
|
log "github.com/rs/zerolog/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
// MatrixError represents the "standard error response" in Matrix.
|
// MatrixError represents the "standard error response" in Matrix.
|
||||||
|
|
@ -235,7 +235,7 @@ func NotTrusted(serverName string) *MatrixError {
|
||||||
|
|
||||||
// InternalAPIError is returned when Dendrite failed to reach an internal API.
|
// InternalAPIError is returned when Dendrite failed to reach an internal API.
|
||||||
func InternalAPIError(ctx context.Context, err error) util.JSONResponse {
|
func InternalAPIError(ctx context.Context, err error) util.JSONResponse {
|
||||||
logrus.WithContext(ctx).WithError(err).Error("Error reaching an internal API")
|
log.Error().Any("context", ctx).Err(err).Msg("Error reaching an internal API")
|
||||||
return util.JSONResponse{
|
return util.JSONResponse{
|
||||||
Code: http.StatusInternalServerError,
|
Code: http.StatusInternalServerError,
|
||||||
JSON: &MatrixError{
|
JSON: &MatrixError{
|
||||||
|
|
|
||||||
|
|
@ -24,7 +24,7 @@ import (
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/matrix-org/gomatrixserverlib/spec"
|
"github.com/matrix-org/gomatrixserverlib/spec"
|
||||||
"github.com/nats-io/nats.go"
|
"github.com/nats-io/nats.go"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/rs/zerolog/log"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
|
|
@ -56,7 +56,7 @@ func (p *SyncAPIProducer) SendReceipt(
|
||||||
m.Header.Set("type", receiptType)
|
m.Header.Set("type", receiptType)
|
||||||
m.Header.Set("timestamp", fmt.Sprintf("%d", timestamp))
|
m.Header.Set("timestamp", fmt.Sprintf("%d", timestamp))
|
||||||
|
|
||||||
log.WithFields(log.Fields{}).Tracef("Producing to topic '%s'", p.TopicReceiptEvent)
|
log.Trace().Msgf("Producing to topic '%s'", p.TopicReceiptEvent)
|
||||||
_, err := p.JetStream.PublishMsg(m, nats.Context(ctx))
|
_, err := p.JetStream.PublishMsg(m, nats.Context(ctx))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
@ -91,11 +91,7 @@ func (p *SyncAPIProducer) SendToDevice(
|
||||||
devices = append(devices, deviceID)
|
devices = append(devices, deviceID)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.WithFields(log.Fields{
|
log.Trace().Str("user_id", userID).Int("num_devices", len(devices)).Str("type", eventType).Msgf("Producing to topic '%s'", p.TopicSendToDeviceEvent)
|
||||||
"user_id": userID,
|
|
||||||
"num_devices": len(devices),
|
|
||||||
"type": eventType,
|
|
||||||
}).Tracef("Producing to topic '%s'", p.TopicSendToDeviceEvent)
|
|
||||||
for i, device := range devices {
|
for i, device := range devices {
|
||||||
ote := &types.OutputSendToDeviceEvent{
|
ote := &types.OutputSendToDeviceEvent{
|
||||||
UserID: userID,
|
UserID: userID,
|
||||||
|
|
@ -109,7 +105,7 @@ func (p *SyncAPIProducer) SendToDevice(
|
||||||
|
|
||||||
eventJSON, err := json.Marshal(ote)
|
eventJSON, err := json.Marshal(ote)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Error("sendToDevice failed json.Marshal")
|
log.Error().Err(err).Msg("sendToDevice failed json.Marshal")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
m := nats.NewMsg(p.TopicSendToDeviceEvent)
|
m := nats.NewMsg(p.TopicSendToDeviceEvent)
|
||||||
|
|
@ -119,10 +115,10 @@ func (p *SyncAPIProducer) SendToDevice(
|
||||||
|
|
||||||
if _, err = p.JetStream.PublishMsg(m, nats.Context(ctx)); err != nil {
|
if _, err = p.JetStream.PublishMsg(m, nats.Context(ctx)); err != nil {
|
||||||
if i < len(devices)-1 {
|
if i < len(devices)-1 {
|
||||||
log.WithError(err).Warn("sendToDevice failed to PublishMsg, trying further devices")
|
log.Warn().Err(err).Msg("sendToDevice failed to PublishMsg, trying further devices")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
log.WithError(err).Error("sendToDevice failed to PublishMsg for all devices")
|
log.Error().Err(err).Msg("sendToDevice failed to PublishMsg for all devices")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -15,7 +15,8 @@ import (
|
||||||
"github.com/matrix-org/gomatrixserverlib/spec"
|
"github.com/matrix-org/gomatrixserverlib/spec"
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
"github.com/nats-io/nats.go"
|
"github.com/nats-io/nats.go"
|
||||||
"github.com/sirupsen/logrus"
|
|
||||||
|
log "github.com/rs/zerolog/log"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||||
"github.com/matrix-org/dendrite/internal/httputil"
|
"github.com/matrix-org/dendrite/internal/httputil"
|
||||||
|
|
@ -40,7 +41,7 @@ func AdminEvacuateRoom(req *http.Request, rsAPI roomserverAPI.ClientRoomserverAP
|
||||||
JSON: jsonerror.NotFound(err.Error()),
|
JSON: jsonerror.NotFound(err.Error()),
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
logrus.WithError(err).WithField("roomID", vars["roomID"]).Error("Failed to evacuate room")
|
log.Error().Err(err).Str("roomID", vars["roomID"]).Msg("Failed to evacuate room")
|
||||||
return util.ErrorResponse(err)
|
return util.ErrorResponse(err)
|
||||||
}
|
}
|
||||||
return util.JSONResponse{
|
return util.JSONResponse{
|
||||||
|
|
@ -59,7 +60,7 @@ func AdminEvacuateUser(req *http.Request, rsAPI roomserverAPI.ClientRoomserverAP
|
||||||
|
|
||||||
affected, err := rsAPI.PerformAdminEvacuateUser(req.Context(), vars["userID"])
|
affected, err := rsAPI.PerformAdminEvacuateUser(req.Context(), vars["userID"])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).WithField("userID", vars["userID"]).Error("Failed to evacuate user")
|
log.Error().Err(err).Str("userID", vars["userID"]).Msg("Failed to evacuate user")
|
||||||
return util.MessageResponse(http.StatusBadRequest, err.Error())
|
return util.MessageResponse(http.StatusBadRequest, err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -169,7 +170,7 @@ func AdminResetPassword(req *http.Request, cfg *config.ClientAPI, device *api.De
|
||||||
func AdminReindex(req *http.Request, cfg *config.ClientAPI, device *api.Device, natsClient *nats.Conn) util.JSONResponse {
|
func AdminReindex(req *http.Request, cfg *config.ClientAPI, device *api.Device, natsClient *nats.Conn) util.JSONResponse {
|
||||||
_, err := natsClient.RequestMsg(nats.NewMsg(cfg.Matrix.JetStream.Prefixed(jetstream.InputFulltextReindex)), time.Second*10)
|
_, err := natsClient.RequestMsg(nats.NewMsg(cfg.Matrix.JetStream.Prefixed(jetstream.InputFulltextReindex)), time.Second*10)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Error("failed to publish nats message")
|
log.Error().Err(err).Msg("failed to publish nats message")
|
||||||
return jsonerror.InternalServerError()
|
return jsonerror.InternalServerError()
|
||||||
}
|
}
|
||||||
return util.JSONResponse{
|
return util.JSONResponse{
|
||||||
|
|
@ -238,11 +239,7 @@ func AdminDownloadState(req *http.Request, device *api.Device, rsAPI roomserverA
|
||||||
JSON: jsonerror.NotFound(eventutil.ErrRoomNoExists.Error()),
|
JSON: jsonerror.NotFound(eventutil.ErrRoomNoExists.Error()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
logrus.WithError(err).WithFields(logrus.Fields{
|
log.Error().Err(err).Str("userID", device.UserID).Str("serverName", serverName).Str("roomID", roomID).Msg("failed to download state")
|
||||||
"userID": device.UserID,
|
|
||||||
"serverName": serverName,
|
|
||||||
"roomID": roomID,
|
|
||||||
}).Error("failed to download state")
|
|
||||||
return util.ErrorResponse(err)
|
return util.ErrorResponse(err)
|
||||||
}
|
}
|
||||||
return util.JSONResponse{
|
return util.JSONResponse{
|
||||||
|
|
|
||||||
|
|
@ -37,7 +37,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/setup/config"
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/rs/zerolog/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
// https://matrix.org/docs/spec/client_server/r0.2.0.html#post-matrix-client-r0-createroom
|
// https://matrix.org/docs/spec/client_server/r0.2.0.html#post-matrix-client-r0-createroom
|
||||||
|
|
|
||||||
|
|
@ -12,7 +12,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/userapi/api"
|
"github.com/matrix-org/dendrite/userapi/api"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
"github.com/sirupsen/logrus"
|
log "github.com/rs/zerolog/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
type newPasswordRequest struct {
|
type newPasswordRequest struct {
|
||||||
|
|
@ -37,10 +37,7 @@ func Password(
|
||||||
var r newPasswordRequest
|
var r newPasswordRequest
|
||||||
r.LogoutDevices = true
|
r.LogoutDevices = true
|
||||||
|
|
||||||
logrus.WithFields(logrus.Fields{
|
log.Debug().Int64("sessionId", device.SessionID).Str("userId", device.UserID).Msg("Changing password")
|
||||||
"sessionId": device.SessionID,
|
|
||||||
"userId": device.UserID,
|
|
||||||
}).Debug("Changing password")
|
|
||||||
|
|
||||||
// Unmarshal the request.
|
// Unmarshal the request.
|
||||||
resErr := httputil.UnmarshalJSONRequest(req, &r)
|
resErr := httputil.UnmarshalJSONRequest(req, &r)
|
||||||
|
|
|
||||||
|
|
@ -24,7 +24,7 @@ import (
|
||||||
"github.com/matrix-org/gomatrix"
|
"github.com/matrix-org/gomatrix"
|
||||||
"github.com/matrix-org/gomatrixserverlib/spec"
|
"github.com/matrix-org/gomatrixserverlib/spec"
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
"github.com/sirupsen/logrus"
|
log "github.com/rs/zerolog/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
func PeekRoomByIDOrAlias(
|
func PeekRoomByIDOrAlias(
|
||||||
|
|
@ -75,7 +75,7 @@ func PeekRoomByIDOrAlias(
|
||||||
}
|
}
|
||||||
case nil:
|
case nil:
|
||||||
default:
|
default:
|
||||||
logrus.WithError(err).WithField("roomID", roomIDOrAlias).Errorf("Failed to peek room")
|
log.Error().Err(err).Str("roomID",roomIDOrAlias).Msg("Failed to peek room")
|
||||||
return jsonerror.InternalServerError()
|
return jsonerror.InternalServerError()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -111,7 +111,7 @@ func UnpeekRoomByID(
|
||||||
}
|
}
|
||||||
case nil:
|
case nil:
|
||||||
default:
|
default:
|
||||||
logrus.WithError(err).WithField("roomID", roomID).Errorf("Failed to un-peek room")
|
log.Error().Err(err).Str("roomID",roomID).Msg("Failed to un-peek room")
|
||||||
return jsonerror.InternalServerError()
|
return jsonerror.InternalServerError()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -30,7 +30,7 @@ import (
|
||||||
"github.com/matrix-org/gomatrixserverlib/spec"
|
"github.com/matrix-org/gomatrixserverlib/spec"
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
"github.com/nats-io/nats.go"
|
"github.com/nats-io/nats.go"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/rs/zerolog/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
type presenceReq struct {
|
type presenceReq struct {
|
||||||
|
|
@ -72,7 +72,7 @@ func SetPresence(
|
||||||
}
|
}
|
||||||
err := producer.SendPresence(req.Context(), userID, presenceStatus, presence.StatusMsg)
|
err := producer.SendPresence(req.Context(), userID, presenceStatus, presence.StatusMsg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Errorf("failed to update presence")
|
log.Error().Err(err).Msgf("failed to update presence")
|
||||||
return util.JSONResponse{
|
return util.JSONResponse{
|
||||||
Code: http.StatusInternalServerError,
|
Code: http.StatusInternalServerError,
|
||||||
JSON: jsonerror.InternalServerError(),
|
JSON: jsonerror.InternalServerError(),
|
||||||
|
|
@ -97,7 +97,7 @@ func GetPresence(
|
||||||
|
|
||||||
presence, err := natsClient.RequestMsg(msg, time.Second*10)
|
presence, err := natsClient.RequestMsg(msg, time.Second*10)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Errorf("unable to get presence")
|
log.Error().Err(err).Msg("unable to get presence")
|
||||||
return util.JSONResponse{
|
return util.JSONResponse{
|
||||||
Code: http.StatusInternalServerError,
|
Code: http.StatusInternalServerError,
|
||||||
JSON: jsonerror.InternalServerError(),
|
JSON: jsonerror.InternalServerError(),
|
||||||
|
|
@ -107,7 +107,7 @@ func GetPresence(
|
||||||
statusMsg := presence.Header.Get("status_msg")
|
statusMsg := presence.Header.Get("status_msg")
|
||||||
e := presence.Header.Get("error")
|
e := presence.Header.Get("error")
|
||||||
if e != "" {
|
if e != "" {
|
||||||
log.Errorf("received error msg from nats: %s", e)
|
log.Error().Msgf("received error msg from nats: %s", e)
|
||||||
return util.JSONResponse{
|
return util.JSONResponse{
|
||||||
Code: http.StatusOK,
|
Code: http.StatusOK,
|
||||||
JSON: types.PresenceClientResponse{
|
JSON: types.PresenceClientResponse{
|
||||||
|
|
|
||||||
|
|
@ -27,19 +27,14 @@ import (
|
||||||
"github.com/matrix-org/dendrite/userapi/api"
|
"github.com/matrix-org/dendrite/userapi/api"
|
||||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
"github.com/sirupsen/logrus"
|
|
||||||
|
log "github.com/rs/zerolog/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
func SetReceipt(req *http.Request, userAPI api.ClientUserAPI, syncProducer *producers.SyncAPIProducer, device *userapi.Device, roomID, receiptType, eventID string) util.JSONResponse {
|
func SetReceipt(req *http.Request, userAPI api.ClientUserAPI, syncProducer *producers.SyncAPIProducer, device *userapi.Device, roomID, receiptType, eventID string) util.JSONResponse {
|
||||||
timestamp := spec.AsTimestamp(time.Now())
|
timestamp := spec.AsTimestamp(time.Now())
|
||||||
logrus.WithFields(logrus.Fields{
|
/// TODO: Check timestamp => timestamp.Time().Unix() for compat issues
|
||||||
"roomID": roomID,
|
log.Debug().Str("roomID", roomID).Str("receiptType", receiptType).Str("eventID", eventID).Str("userId", device.UserID).Int64("timestamp", timestamp.Time().Unix()).Msg("Setting receipt")
|
||||||
"receiptType": receiptType,
|
|
||||||
"eventID": eventID,
|
|
||||||
"userId": device.UserID,
|
|
||||||
"timestamp": timestamp,
|
|
||||||
}).Debug("Setting receipt")
|
|
||||||
|
|
||||||
switch receiptType {
|
switch receiptType {
|
||||||
case "m.read", "m.read.private":
|
case "m.read", "m.read.private":
|
||||||
if err := syncProducer.SendReceipt(req.Context(), device.UserID, roomID, eventID, receiptType, timestamp); err != nil {
|
if err := syncProducer.SendReceipt(req.Context(), device.UserID, roomID, eventID, receiptType, timestamp); err != nil {
|
||||||
|
|
|
||||||
|
|
@ -36,19 +36,18 @@ import (
|
||||||
"github.com/matrix-org/dendrite/internal/eventutil"
|
"github.com/matrix-org/dendrite/internal/eventutil"
|
||||||
"github.com/matrix-org/dendrite/setup/config"
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
|
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
|
||||||
"github.com/matrix-org/gomatrixserverlib/spec"
|
|
||||||
"github.com/matrix-org/gomatrixserverlib/tokens"
|
|
||||||
"github.com/matrix-org/util"
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
|
||||||
log "github.com/sirupsen/logrus"
|
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/clientapi/auth"
|
"github.com/matrix-org/dendrite/clientapi/auth"
|
||||||
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
||||||
"github.com/matrix-org/dendrite/clientapi/httputil"
|
"github.com/matrix-org/dendrite/clientapi/httputil"
|
||||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||||
"github.com/matrix-org/dendrite/clientapi/userutil"
|
"github.com/matrix-org/dendrite/clientapi/userutil"
|
||||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||||
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
"github.com/matrix-org/gomatrixserverlib/spec"
|
||||||
|
"github.com/matrix-org/gomatrixserverlib/tokens"
|
||||||
|
"github.com/matrix-org/util"
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
log "github.com/rs/zerolog/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
|
|
||||||
|
|
@ -20,15 +20,6 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
"github.com/matrix-org/dendrite/setup/base"
|
|
||||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
|
||||||
"github.com/matrix-org/gomatrixserverlib/fclient"
|
|
||||||
"github.com/matrix-org/gomatrixserverlib/spec"
|
|
||||||
"github.com/matrix-org/util"
|
|
||||||
"github.com/nats-io/nats.go"
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
|
||||||
"github.com/sirupsen/logrus"
|
|
||||||
|
|
||||||
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
|
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
|
||||||
"github.com/matrix-org/dendrite/clientapi/api"
|
"github.com/matrix-org/dendrite/clientapi/api"
|
||||||
"github.com/matrix-org/dendrite/clientapi/auth"
|
"github.com/matrix-org/dendrite/clientapi/auth"
|
||||||
|
|
@ -39,8 +30,16 @@ import (
|
||||||
"github.com/matrix-org/dendrite/internal/httputil"
|
"github.com/matrix-org/dendrite/internal/httputil"
|
||||||
"github.com/matrix-org/dendrite/internal/transactions"
|
"github.com/matrix-org/dendrite/internal/transactions"
|
||||||
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
|
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
|
||||||
|
"github.com/matrix-org/dendrite/setup/base"
|
||||||
"github.com/matrix-org/dendrite/setup/config"
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||||
|
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||||
|
"github.com/matrix-org/gomatrixserverlib/fclient"
|
||||||
|
"github.com/matrix-org/gomatrixserverlib/spec"
|
||||||
|
"github.com/matrix-org/util"
|
||||||
|
"github.com/nats-io/nats.go"
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
log "github.com/rs/zerolog/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Setup registers HTTP handlers with the given ServeMux. It also supplies the given http.Client
|
// Setup registers HTTP handlers with the given ServeMux. It also supplies the given http.Client
|
||||||
|
|
@ -86,7 +85,7 @@ func Setup(
|
||||||
}
|
}
|
||||||
|
|
||||||
if cfg.Matrix.WellKnownClientName != "" {
|
if cfg.Matrix.WellKnownClientName != "" {
|
||||||
logrus.Infof("Setting m.homeserver base_url as %s at /.well-known/matrix/client", cfg.Matrix.WellKnownClientName)
|
log.Info().Msgf("Setting m.homeserver base_url as %s at /.well-known/matrix/client", cfg.Matrix.WellKnownClientName)
|
||||||
wkMux.Handle("/client", httputil.MakeExternalAPI("wellknown", func(r *http.Request) util.JSONResponse {
|
wkMux.Handle("/client", httputil.MakeExternalAPI("wellknown", func(r *http.Request) util.JSONResponse {
|
||||||
return util.JSONResponse{
|
return util.JSONResponse{
|
||||||
Code: http.StatusOK,
|
Code: http.StatusOK,
|
||||||
|
|
@ -129,7 +128,7 @@ func Setup(
|
||||||
).Methods(http.MethodGet, http.MethodOptions)
|
).Methods(http.MethodGet, http.MethodOptions)
|
||||||
|
|
||||||
if cfg.RegistrationSharedSecret != "" {
|
if cfg.RegistrationSharedSecret != "" {
|
||||||
logrus.Info("Enabling shared secret registration at /_synapse/admin/v1/register")
|
log.Info().Msg("Enabling shared secret registration at /_synapse/admin/v1/register")
|
||||||
sr := NewSharedSecretRegistration(cfg.RegistrationSharedSecret)
|
sr := NewSharedSecretRegistration(cfg.RegistrationSharedSecret)
|
||||||
synapseAdminRouter.Handle("/admin/v1/register",
|
synapseAdminRouter.Handle("/admin/v1/register",
|
||||||
httputil.MakeExternalAPI("shared_secret_registration", func(req *http.Request) util.JSONResponse {
|
httputil.MakeExternalAPI("shared_secret_registration", func(req *http.Request) util.JSONResponse {
|
||||||
|
|
@ -198,10 +197,10 @@ func Setup(
|
||||||
|
|
||||||
// server notifications
|
// server notifications
|
||||||
if cfg.Matrix.ServerNotices.Enabled {
|
if cfg.Matrix.ServerNotices.Enabled {
|
||||||
logrus.Info("Enabling server notices at /_synapse/admin/v1/send_server_notice")
|
log.Info().Msg("Enabling server notices at /_synapse/admin/v1/send_server_notice")
|
||||||
serverNotificationSender, err := getSenderDevice(context.Background(), rsAPI, userAPI, cfg)
|
serverNotificationSender, err := getSenderDevice(context.Background(), rsAPI, userAPI, cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Fatal("unable to get account for sending sending server notices")
|
log.Fatal().Err(err).Msg("unable to get account for sending sending server notices")
|
||||||
}
|
}
|
||||||
|
|
||||||
synapseAdminRouter.Handle("/admin/v1/send_server_notice/{txnID}",
|
synapseAdminRouter.Handle("/admin/v1/send_server_notice/{txnID}",
|
||||||
|
|
|
||||||
|
|
@ -21,15 +21,15 @@ import (
|
||||||
"net/http"
|
"net/http"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/types"
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/version"
|
||||||
"github.com/matrix-org/gomatrix"
|
"github.com/matrix-org/gomatrix"
|
||||||
"github.com/matrix-org/gomatrixserverlib/tokens"
|
"github.com/matrix-org/gomatrixserverlib/tokens"
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
log "github.com/rs/zerolog/log"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/roomserver/types"
|
|
||||||
"github.com/matrix-org/dendrite/roomserver/version"
|
|
||||||
|
|
||||||
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
|
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
|
||||||
"github.com/matrix-org/dendrite/clientapi/httputil"
|
"github.com/matrix-org/dendrite/clientapi/httputil"
|
||||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||||
|
|
@ -208,7 +208,7 @@ func SendServerNotice(
|
||||||
}
|
}
|
||||||
e, resErr := generateSendEvent(ctx, request, senderDevice, roomID, "m.room.message", nil, cfgClient, rsAPI, time.Now())
|
e, resErr := generateSendEvent(ctx, request, senderDevice, roomID, "m.room.message", nil, cfgClient, rsAPI, time.Now())
|
||||||
if resErr != nil {
|
if resErr != nil {
|
||||||
logrus.Errorf("failed to send message: %+v", resErr)
|
log.Error().Msgf("failed to send message: %+v", resErr)
|
||||||
return *resErr
|
return *resErr
|
||||||
}
|
}
|
||||||
timeToGenerateEvent := time.Since(startedGeneratingEvent)
|
timeToGenerateEvent := time.Since(startedGeneratingEvent)
|
||||||
|
|
|
||||||
|
|
@ -28,7 +28,7 @@ import (
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/matrix-org/gomatrixserverlib/spec"
|
"github.com/matrix-org/gomatrixserverlib/spec"
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/rs/zerolog/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
type stateEventInStateResp struct {
|
type stateEventInStateResp struct {
|
||||||
|
|
|
||||||
|
|
@ -19,23 +19,23 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/getsentry/sentry-go"
|
"github.com/getsentry/sentry-go"
|
||||||
|
"github.com/matrix-org/dendrite/appservice"
|
||||||
|
"github.com/matrix-org/dendrite/federationapi"
|
||||||
"github.com/matrix-org/dendrite/internal"
|
"github.com/matrix-org/dendrite/internal"
|
||||||
"github.com/matrix-org/dendrite/internal/caching"
|
"github.com/matrix-org/dendrite/internal/caching"
|
||||||
"github.com/matrix-org/dendrite/internal/httputil"
|
"github.com/matrix-org/dendrite/internal/httputil"
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
|
||||||
"github.com/matrix-org/dendrite/setup/process"
|
|
||||||
"github.com/matrix-org/gomatrixserverlib/fclient"
|
|
||||||
"github.com/sirupsen/logrus"
|
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/appservice"
|
|
||||||
"github.com/matrix-org/dendrite/federationapi"
|
|
||||||
"github.com/matrix-org/dendrite/roomserver"
|
"github.com/matrix-org/dendrite/roomserver"
|
||||||
"github.com/matrix-org/dendrite/setup"
|
"github.com/matrix-org/dendrite/setup"
|
||||||
basepkg "github.com/matrix-org/dendrite/setup/base"
|
basepkg "github.com/matrix-org/dendrite/setup/base"
|
||||||
"github.com/matrix-org/dendrite/setup/config"
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
|
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||||
"github.com/matrix-org/dendrite/setup/mscs"
|
"github.com/matrix-org/dendrite/setup/mscs"
|
||||||
|
"github.com/matrix-org/dendrite/setup/process"
|
||||||
"github.com/matrix-org/dendrite/userapi"
|
"github.com/matrix-org/dendrite/userapi"
|
||||||
|
"github.com/matrix-org/gomatrixserverlib/fclient"
|
||||||
|
|
||||||
|
log "github.com/rs/zerolog/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
|
@ -58,18 +58,18 @@ func main() {
|
||||||
if *unixSocket == "" {
|
if *unixSocket == "" {
|
||||||
http, err := config.HTTPAddress("http://" + *httpBindAddr)
|
http, err := config.HTTPAddress("http://" + *httpBindAddr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Fatalf("Failed to parse http address")
|
log.Fatal().Err(err).Msg("Failed to parse http address")
|
||||||
}
|
}
|
||||||
httpAddr = http
|
httpAddr = http
|
||||||
https, err := config.HTTPAddress("https://" + *httpsBindAddr)
|
https, err := config.HTTPAddress("https://" + *httpsBindAddr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Fatalf("Failed to parse https address")
|
log.Fatal().Err(err).Msg("Failed to parse https address")
|
||||||
}
|
}
|
||||||
httpsAddr = https
|
httpsAddr = https
|
||||||
} else {
|
} else {
|
||||||
socket, err := config.UnixSocketAddress(*unixSocket, *unixSocketPermission)
|
socket, err := config.UnixSocketAddress(*unixSocket, *unixSocketPermission)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Fatalf("Failed to parse unix socket")
|
log.Fatal().Err(err).Msg("Failed to parse unix socket")
|
||||||
}
|
}
|
||||||
httpAddr = socket
|
httpAddr = socket
|
||||||
}
|
}
|
||||||
|
|
@ -78,9 +78,9 @@ func main() {
|
||||||
cfg.Verify(configErrors)
|
cfg.Verify(configErrors)
|
||||||
if len(*configErrors) > 0 {
|
if len(*configErrors) > 0 {
|
||||||
for _, err := range *configErrors {
|
for _, err := range *configErrors {
|
||||||
logrus.Errorf("Configuration error: %s", err)
|
log.Error().Msgf("Configuration error: %s", err)
|
||||||
}
|
}
|
||||||
logrus.Fatalf("Failed to start due to configuration errors")
|
log.Fatal().Msgf("Failed to start due to configuration errors")
|
||||||
}
|
}
|
||||||
processCtx := process.NewProcessContext()
|
processCtx := process.NewProcessContext()
|
||||||
|
|
||||||
|
|
@ -90,9 +90,9 @@ func main() {
|
||||||
|
|
||||||
basepkg.PlatformSanityChecks()
|
basepkg.PlatformSanityChecks()
|
||||||
|
|
||||||
logrus.Infof("Dendrite version %s", internal.VersionString())
|
log.Info().Msgf("Dendrite version %s", internal.VersionString())
|
||||||
if !cfg.ClientAPI.RegistrationDisabled && cfg.ClientAPI.OpenRegistrationWithoutVerificationEnabled {
|
if !cfg.ClientAPI.RegistrationDisabled && cfg.ClientAPI.OpenRegistrationWithoutVerificationEnabled {
|
||||||
logrus.Warn("Open registration is enabled")
|
log.Warn().Msg("Open registration is enabled")
|
||||||
}
|
}
|
||||||
|
|
||||||
// create DNS cache
|
// create DNS cache
|
||||||
|
|
@ -102,7 +102,7 @@ func main() {
|
||||||
cfg.Global.DNSCache.CacheSize,
|
cfg.Global.DNSCache.CacheSize,
|
||||||
cfg.Global.DNSCache.CacheLifetime,
|
cfg.Global.DNSCache.CacheLifetime,
|
||||||
)
|
)
|
||||||
logrus.Infof(
|
log.Info().Msgf(
|
||||||
"DNS cache enabled (size %d, lifetime %s)",
|
"DNS cache enabled (size %d, lifetime %s)",
|
||||||
cfg.Global.DNSCache.CacheSize,
|
cfg.Global.DNSCache.CacheSize,
|
||||||
cfg.Global.DNSCache.CacheLifetime,
|
cfg.Global.DNSCache.CacheLifetime,
|
||||||
|
|
@ -112,13 +112,13 @@ func main() {
|
||||||
// setup tracing
|
// setup tracing
|
||||||
closer, err := cfg.SetupTracing()
|
closer, err := cfg.SetupTracing()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Panicf("failed to start opentracing")
|
log.Panic().Err(err).Msg("failed to start opentracing")
|
||||||
}
|
}
|
||||||
defer closer.Close() // nolint: errcheck
|
defer closer.Close() // nolint: errcheck
|
||||||
|
|
||||||
// setup sentry
|
// setup sentry
|
||||||
if cfg.Global.Sentry.Enabled {
|
if cfg.Global.Sentry.Enabled {
|
||||||
logrus.Info("Setting up Sentry for debugging...")
|
log.Info().Msg("Setting up Sentry for debugging...")
|
||||||
err = sentry.Init(sentry.ClientOptions{
|
err = sentry.Init(sentry.ClientOptions{
|
||||||
Dsn: cfg.Global.Sentry.DSN,
|
Dsn: cfg.Global.Sentry.DSN,
|
||||||
Environment: cfg.Global.Sentry.Environment,
|
Environment: cfg.Global.Sentry.Environment,
|
||||||
|
|
@ -128,13 +128,13 @@ func main() {
|
||||||
AttachStacktrace: true,
|
AttachStacktrace: true,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Panic("failed to start Sentry")
|
log.Panic().Err(err).Msg("failed to start Sentry")
|
||||||
}
|
}
|
||||||
go func() {
|
go func() {
|
||||||
processCtx.ComponentStarted()
|
processCtx.ComponentStarted()
|
||||||
<-processCtx.WaitForShutdown()
|
<-processCtx.WaitForShutdown()
|
||||||
if !sentry.Flush(time.Second * 5) {
|
if !sentry.Flush(time.Second * 5) {
|
||||||
logrus.Warnf("failed to flush all Sentry events!")
|
log.Warn().Msg("failed to flush all Sentry events!")
|
||||||
}
|
}
|
||||||
processCtx.ComponentFinished()
|
processCtx.ComponentFinished()
|
||||||
}()
|
}()
|
||||||
|
|
@ -183,7 +183,7 @@ func main() {
|
||||||
|
|
||||||
if len(cfg.MSCs.MSCs) > 0 {
|
if len(cfg.MSCs.MSCs) > 0 {
|
||||||
if err := mscs.Enable(cfg, cm, routers, &monolith, caches); err != nil {
|
if err := mscs.Enable(cfg, cm, routers, &monolith, caches); err != nil {
|
||||||
logrus.WithError(err).Fatalf("Failed to enable MSCs")
|
log.Fatal().Err(err).Msg("Failed to enable MSCs")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -22,7 +22,7 @@ import (
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/matrix-org/gomatrixserverlib/spec"
|
"github.com/matrix-org/gomatrixserverlib/spec"
|
||||||
"github.com/nats-io/nats.go"
|
"github.com/nats-io/nats.go"
|
||||||
"github.com/sirupsen/logrus"
|
log "github.com/rs/zerolog/log"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/federationapi/queue"
|
"github.com/matrix-org/dendrite/federationapi/queue"
|
||||||
"github.com/matrix-org/dendrite/federationapi/storage"
|
"github.com/matrix-org/dendrite/federationapi/storage"
|
||||||
|
|
@ -82,7 +82,7 @@ func (t *KeyChangeConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) boo
|
||||||
var m api.DeviceMessage
|
var m api.DeviceMessage
|
||||||
if err := json.Unmarshal(msg.Data, &m); err != nil {
|
if err := json.Unmarshal(msg.Data, &m); err != nil {
|
||||||
sentry.CaptureException(err)
|
sentry.CaptureException(err)
|
||||||
logrus.WithError(err).Errorf("failed to read device message from key change topic")
|
log.Error().Err(err).Msg("failed to read device message from key change topic")
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
if m.DeviceKeys == nil && m.OutputCrossSigningKeyUpdate == nil {
|
if m.DeviceKeys == nil && m.OutputCrossSigningKeyUpdate == nil {
|
||||||
|
|
@ -104,13 +104,13 @@ func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) bool {
|
||||||
if m.DeviceKeys == nil {
|
if m.DeviceKeys == nil {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
logger := logrus.WithField("user_id", m.UserID)
|
logger := log.With().Str("user_id", m.UserID).Logger()
|
||||||
|
|
||||||
// only send key change events which originated from us
|
// only send key change events which originated from us
|
||||||
_, originServerName, err := gomatrixserverlib.SplitID('@', m.UserID)
|
_, originServerName, err := gomatrixserverlib.SplitID('@', m.UserID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
sentry.CaptureException(err)
|
sentry.CaptureException(err)
|
||||||
logger.WithError(err).Error("Failed to extract domain from key change event")
|
logger.Error().Err(err).Msg("Failed to extract domain from key change event")
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
if !t.isLocalServerName(originServerName) {
|
if !t.isLocalServerName(originServerName) {
|
||||||
|
|
@ -124,7 +124,7 @@ func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) bool {
|
||||||
}, &queryRes)
|
}, &queryRes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
sentry.CaptureException(err)
|
sentry.CaptureException(err)
|
||||||
logger.WithError(err).Error("failed to calculate joined rooms for user")
|
logger.Error().Err(err).Msg("failed to calculate joined rooms for user")
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -132,7 +132,7 @@ func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) bool {
|
||||||
destinations, err := t.db.GetJoinedHostsForRooms(t.ctx, queryRes.RoomIDs, true, true)
|
destinations, err := t.db.GetJoinedHostsForRooms(t.ctx, queryRes.RoomIDs, true, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
sentry.CaptureException(err)
|
sentry.CaptureException(err)
|
||||||
logger.WithError(err).Error("failed to calculate joined hosts for rooms user is in")
|
logger.Error().Err(err).Msg("failed to calculate joined hosts for rooms user is in")
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -155,11 +155,11 @@ func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) bool {
|
||||||
}
|
}
|
||||||
if edu.Content, err = json.Marshal(event); err != nil {
|
if edu.Content, err = json.Marshal(event); err != nil {
|
||||||
sentry.CaptureException(err)
|
sentry.CaptureException(err)
|
||||||
logger.WithError(err).Error("failed to marshal EDU JSON")
|
logger.Error().Err(err).Msg("failed to marshal EDU JSON")
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Debugf("Sending device list update message to %q", destinations)
|
logger.Debug().Msgf("Sending device list update message to %q", destinations)
|
||||||
err = t.queues.SendEDU(edu, originServerName, destinations)
|
err = t.queues.SendEDU(edu, originServerName, destinations)
|
||||||
return err == nil
|
return err == nil
|
||||||
}
|
}
|
||||||
|
|
@ -169,7 +169,7 @@ func (t *KeyChangeConsumer) onCrossSigningMessage(m api.DeviceMessage) bool {
|
||||||
_, host, err := gomatrixserverlib.SplitID('@', output.UserID)
|
_, host, err := gomatrixserverlib.SplitID('@', output.UserID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
sentry.CaptureException(err)
|
sentry.CaptureException(err)
|
||||||
logrus.WithError(err).Errorf("fedsender key change consumer: user ID parse failure")
|
log.Error().Err(err).Msgf("fedsender key change consumer: user ID parse failure")
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
if !t.isLocalServerName(host) {
|
if !t.isLocalServerName(host) {
|
||||||
|
|
@ -177,7 +177,7 @@ func (t *KeyChangeConsumer) onCrossSigningMessage(m api.DeviceMessage) bool {
|
||||||
// end up parroting information we received from other servers.
|
// end up parroting information we received from other servers.
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
logger := logrus.WithField("user_id", output.UserID)
|
logger := log.With().Str("user_id", m.UserID).Logger()
|
||||||
|
|
||||||
var queryRes roomserverAPI.QueryRoomsForUserResponse
|
var queryRes roomserverAPI.QueryRoomsForUserResponse
|
||||||
err = t.rsAPI.QueryRoomsForUser(t.ctx, &roomserverAPI.QueryRoomsForUserRequest{
|
err = t.rsAPI.QueryRoomsForUser(t.ctx, &roomserverAPI.QueryRoomsForUserRequest{
|
||||||
|
|
@ -186,14 +186,14 @@ func (t *KeyChangeConsumer) onCrossSigningMessage(m api.DeviceMessage) bool {
|
||||||
}, &queryRes)
|
}, &queryRes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
sentry.CaptureException(err)
|
sentry.CaptureException(err)
|
||||||
logger.WithError(err).Error("fedsender key change consumer: failed to calculate joined rooms for user")
|
logger.Error().Err(err).Msg("fedsender key change consumer: failed to calculate joined rooms for user")
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
// send this key change to all servers who share rooms with this user.
|
// send this key change to all servers who share rooms with this user.
|
||||||
destinations, err := t.db.GetJoinedHostsForRooms(t.ctx, queryRes.RoomIDs, true, true)
|
destinations, err := t.db.GetJoinedHostsForRooms(t.ctx, queryRes.RoomIDs, true, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
sentry.CaptureException(err)
|
sentry.CaptureException(err)
|
||||||
logger.WithError(err).Error("fedsender key change consumer: failed to calculate joined hosts for rooms user is in")
|
logger.Error().Err(err).Msg("fedsender key change consumer: failed to calculate joined hosts for rooms user is in")
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -208,11 +208,11 @@ func (t *KeyChangeConsumer) onCrossSigningMessage(m api.DeviceMessage) bool {
|
||||||
}
|
}
|
||||||
if edu.Content, err = json.Marshal(output); err != nil {
|
if edu.Content, err = json.Marshal(output); err != nil {
|
||||||
sentry.CaptureException(err)
|
sentry.CaptureException(err)
|
||||||
logger.WithError(err).Error("fedsender key change consumer: failed to marshal output, dropping")
|
logger.Error().Err(err).Msg("fedsender key change consumer: failed to marshal output, dropping")
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Debugf("Sending cross-signing update message to %q", destinations)
|
logger.Debug().Msgf("Sending cross-signing update message to %q", destinations)
|
||||||
err = t.queues.SendEDU(edu, host, destinations)
|
err = t.queues.SendEDU(edu, host, destinations)
|
||||||
return err == nil
|
return err == nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -30,7 +30,7 @@ import (
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/matrix-org/gomatrixserverlib/spec"
|
"github.com/matrix-org/gomatrixserverlib/spec"
|
||||||
"github.com/nats-io/nats.go"
|
"github.com/nats-io/nats.go"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/rs/zerolog/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
// OutputReceiptConsumer consumes events that originate in the clientapi.
|
// OutputReceiptConsumer consumes events that originate in the clientapi.
|
||||||
|
|
@ -87,7 +87,7 @@ func (t *OutputPresenceConsumer) onMessage(ctx context.Context, msgs []*nats.Msg
|
||||||
userID := msg.Header.Get(jetstream.UserID)
|
userID := msg.Header.Get(jetstream.UserID)
|
||||||
_, serverName, err := gomatrixserverlib.SplitID('@', userID)
|
_, serverName, err := gomatrixserverlib.SplitID('@', userID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).WithField("user_id", userID).Error("failed to extract domain from receipt sender")
|
log.Error().Err(err).Str("user_id", userID).Msg("failed to extract domain from receipt sender")
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
if !t.isLocalServerName(serverName) {
|
if !t.isLocalServerName(serverName) {
|
||||||
|
|
@ -100,7 +100,7 @@ func (t *OutputPresenceConsumer) onMessage(ctx context.Context, msgs []*nats.Msg
|
||||||
WantMembership: "join",
|
WantMembership: "join",
|
||||||
}, &queryRes)
|
}, &queryRes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Error("failed to calculate joined rooms for user")
|
log.Error().Err(err).Msg("failed to calculate joined rooms for user")
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -114,7 +114,7 @@ func (t *OutputPresenceConsumer) onMessage(ctx context.Context, msgs []*nats.Msg
|
||||||
// send this presence to all servers who share rooms with this user.
|
// send this presence to all servers who share rooms with this user.
|
||||||
joined, err := t.db.GetJoinedHostsForRooms(t.ctx, queryRes.RoomIDs, true, true)
|
joined, err := t.db.GetJoinedHostsForRooms(t.ctx, queryRes.RoomIDs, true, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Error("failed to get joined hosts")
|
log.Error().Err(err).Msg("failed to get joined hosts")
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -147,13 +147,13 @@ func (t *OutputPresenceConsumer) onMessage(ctx context.Context, msgs []*nats.Msg
|
||||||
Origin: string(serverName),
|
Origin: string(serverName),
|
||||||
}
|
}
|
||||||
if edu.Content, err = json.Marshal(content); err != nil {
|
if edu.Content, err = json.Marshal(content); err != nil {
|
||||||
log.WithError(err).Error("failed to marshal EDU JSON")
|
log.Error().Err(err).Msg("failed to marshal EDU JSON")
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Tracef("sending presence EDU to %d servers", len(joined))
|
log.Trace().Msgf("sending presence EDU to %d servers", len(joined))
|
||||||
if err = t.queues.SendEDU(edu, serverName, joined); err != nil {
|
if err = t.queues.SendEDU(edu, serverName, joined); err != nil {
|
||||||
log.WithError(err).Error("failed to send EDU")
|
log.Error().Err(err).Msg("failed to send EDU")
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -30,7 +30,7 @@ import (
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/matrix-org/gomatrixserverlib/spec"
|
"github.com/matrix-org/gomatrixserverlib/spec"
|
||||||
"github.com/nats-io/nats.go"
|
"github.com/nats-io/nats.go"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/rs/zerolog/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
// OutputReceiptConsumer consumes events that originate in the clientapi.
|
// OutputReceiptConsumer consumes events that originate in the clientapi.
|
||||||
|
|
@ -93,7 +93,7 @@ func (t *OutputReceiptConsumer) onMessage(ctx context.Context, msgs []*nats.Msg)
|
||||||
// only send receipt events which originated from us
|
// only send receipt events which originated from us
|
||||||
_, receiptServerName, err := gomatrixserverlib.SplitID('@', receipt.UserID)
|
_, receiptServerName, err := gomatrixserverlib.SplitID('@', receipt.UserID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).WithField("user_id", receipt.UserID).Error("failed to extract domain from receipt sender")
|
log.Error().Err(err).Str("user_id", receipt.UserID).Msg("failed to extract domain from receipt sender")
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
if !t.isLocalServerName(receiptServerName) {
|
if !t.isLocalServerName(receiptServerName) {
|
||||||
|
|
@ -103,7 +103,7 @@ func (t *OutputReceiptConsumer) onMessage(ctx context.Context, msgs []*nats.Msg)
|
||||||
timestamp, err := strconv.ParseUint(msg.Header.Get("timestamp"), 10, 64)
|
timestamp, err := strconv.ParseUint(msg.Header.Get("timestamp"), 10, 64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// If the message was invalid, log it and move on to the next message in the stream
|
// If the message was invalid, log it and move on to the next message in the stream
|
||||||
log.WithError(err).Errorf("EDU output log: message parse failure")
|
log.Error().Err(err).Msgf("EDU output log: message parse failure")
|
||||||
sentry.CaptureException(err)
|
sentry.CaptureException(err)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
@ -112,7 +112,7 @@ func (t *OutputReceiptConsumer) onMessage(ctx context.Context, msgs []*nats.Msg)
|
||||||
|
|
||||||
joined, err := t.db.GetJoinedHosts(ctx, receipt.RoomID)
|
joined, err := t.db.GetJoinedHosts(ctx, receipt.RoomID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).WithField("room_id", receipt.RoomID).Error("failed to get joined hosts for room")
|
log.Error().Err(err).Str("room_id", receipt.RoomID).Msg("failed to get joined hosts for room")
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -138,12 +138,12 @@ func (t *OutputReceiptConsumer) onMessage(ctx context.Context, msgs []*nats.Msg)
|
||||||
Origin: string(receiptServerName),
|
Origin: string(receiptServerName),
|
||||||
}
|
}
|
||||||
if edu.Content, err = json.Marshal(content); err != nil {
|
if edu.Content, err = json.Marshal(content); err != nil {
|
||||||
log.WithError(err).Error("failed to marshal EDU JSON")
|
log.Error().Err(err).Msg("failed to marshal EDU JSON")
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := t.queues.SendEDU(edu, receiptServerName, names); err != nil {
|
if err := t.queues.SendEDU(edu, receiptServerName, names); err != nil {
|
||||||
log.WithError(err).Error("failed to send EDU")
|
log.Error().Err(err).Msg("failed to send EDU")
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -26,8 +26,7 @@ import (
|
||||||
|
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/nats-io/nats.go"
|
"github.com/nats-io/nats.go"
|
||||||
"github.com/sirupsen/logrus"
|
log "github.com/rs/zerolog/log"
|
||||||
log "github.com/sirupsen/logrus"
|
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/federationapi/queue"
|
"github.com/matrix-org/dendrite/federationapi/queue"
|
||||||
"github.com/matrix-org/dendrite/federationapi/storage"
|
"github.com/matrix-org/dendrite/federationapi/storage"
|
||||||
|
|
@ -103,7 +102,7 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Ms
|
||||||
var output api.OutputEvent
|
var output api.OutputEvent
|
||||||
if err := json.Unmarshal(msg.Data, &output); err != nil {
|
if err := json.Unmarshal(msg.Data, &output); err != nil {
|
||||||
// If the message was invalid, log it and move on to the next message in the stream
|
// If the message was invalid, log it and move on to the next message in the stream
|
||||||
log.WithError(err).Errorf("roomserver output log: message parse failure")
|
log.Error().Err(err).Msg("roomserver output log: message parse failure")
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -112,36 +111,31 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Ms
|
||||||
ev := output.NewRoomEvent.Event
|
ev := output.NewRoomEvent.Event
|
||||||
if err := s.processMessage(*output.NewRoomEvent, output.NewRoomEvent.RewritesState); err != nil {
|
if err := s.processMessage(*output.NewRoomEvent, output.NewRoomEvent.RewritesState); err != nil {
|
||||||
// panic rather than continue with an inconsistent database
|
// panic rather than continue with an inconsistent database
|
||||||
log.WithFields(log.Fields{
|
log.Panic().
|
||||||
"event_id": ev.EventID(),
|
Str("event_id", ev.EventID()).
|
||||||
"event": string(ev.JSON()),
|
Str("event", string(ev.JSON())).
|
||||||
"add": output.NewRoomEvent.AddsStateEventIDs,
|
Any("add", output.NewRoomEvent.AddsStateEventIDs).
|
||||||
"del": output.NewRoomEvent.RemovesStateEventIDs,
|
Any("del", output.NewRoomEvent.RemovesStateEventIDs).
|
||||||
log.ErrorKey: err,
|
Err(err).Msg("roomserver output log: write room event failure")
|
||||||
}).Panicf("roomserver output log: write room event failure")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
case api.OutputTypeNewInboundPeek:
|
case api.OutputTypeNewInboundPeek:
|
||||||
if err := s.processInboundPeek(*output.NewInboundPeek); err != nil {
|
if err := s.processInboundPeek(*output.NewInboundPeek); err != nil {
|
||||||
log.WithFields(log.Fields{
|
//log.ErrorKey == ?
|
||||||
"event": output.NewInboundPeek,
|
log.Panic().Any("event", output.NewInboundPeek).Err(err).Msg("roomserver output log: remote peek event failure")
|
||||||
log.ErrorKey: err,
|
|
||||||
}).Panicf("roomserver output log: remote peek event failure")
|
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
case api.OutputTypePurgeRoom:
|
case api.OutputTypePurgeRoom:
|
||||||
log.WithField("room_id", output.PurgeRoom.RoomID).Warn("Purging room from federation API")
|
log.Warn().Str("room_id", output.PurgeRoom.RoomID).Msg("Purging room from federation API")
|
||||||
if err := s.db.PurgeRoom(ctx, output.PurgeRoom.RoomID); err != nil {
|
if err := s.db.PurgeRoom(ctx, output.PurgeRoom.RoomID); err != nil {
|
||||||
logrus.WithField("room_id", output.PurgeRoom.RoomID).WithError(err).Error("Failed to purge room from federation API")
|
log.Error().Str("room_id", output.PurgeRoom.RoomID).Err(err).Msg("Failed to purge room from federation API")
|
||||||
} else {
|
} else {
|
||||||
logrus.WithField("room_id", output.PurgeRoom.RoomID).Warn("Room purged from federation API")
|
log.Warn().Str("room_id", output.PurgeRoom.RoomID).Msg("Room purged from federation API")
|
||||||
}
|
}
|
||||||
|
|
||||||
default:
|
default:
|
||||||
log.WithField("type", output.Type).Debug(
|
log.Debug().Any("type", output.Type).Msg("roomserver output log: ignoring unknown output type")
|
||||||
"roomserver output log: ignoring unknown output type",
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return true
|
return true
|
||||||
|
|
@ -263,7 +257,7 @@ func (s *OutputRoomEventConsumer) sendPresence(roomID string, addedJoined []type
|
||||||
RoomID: roomID,
|
RoomID: roomID,
|
||||||
}, &queryRes)
|
}, &queryRes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Error("failed to calculate joined rooms for user")
|
log.Error().Err(err).Msg("failed to calculate joined rooms for user")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -276,7 +270,7 @@ func (s *OutputRoomEventConsumer) sendPresence(roomID string, addedJoined []type
|
||||||
var presence *nats.Msg
|
var presence *nats.Msg
|
||||||
presence, err = s.natsClient.RequestMsg(msg, time.Second*10)
|
presence, err = s.natsClient.RequestMsg(msg, time.Second*10)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Errorf("unable to get presence")
|
log.Error().Err(err).Msg("unable to get presence")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -311,11 +305,11 @@ func (s *OutputRoomEventConsumer) sendPresence(roomID string, addedJoined []type
|
||||||
Origin: string(s.cfg.Matrix.ServerName),
|
Origin: string(s.cfg.Matrix.ServerName),
|
||||||
}
|
}
|
||||||
if edu.Content, err = json.Marshal(content); err != nil {
|
if edu.Content, err = json.Marshal(content); err != nil {
|
||||||
log.WithError(err).Error("failed to marshal EDU JSON")
|
log.Error().Err(err).Msg("failed to marshal EDU JSON")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if err := s.queues.SendEDU(edu, s.cfg.Matrix.ServerName, joined); err != nil {
|
if err := s.queues.SendEDU(edu, s.cfg.Matrix.ServerName, joined); err != nil {
|
||||||
log.WithError(err).Error("failed to send EDU")
|
log.Error().Err(err).Msg("failed to send EDU")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -17,13 +17,6 @@ package federationapi
|
||||||
import (
|
import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal/httputil"
|
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
|
||||||
"github.com/matrix-org/dendrite/setup/config"
|
|
||||||
"github.com/matrix-org/dendrite/setup/process"
|
|
||||||
"github.com/matrix-org/gomatrixserverlib/fclient"
|
|
||||||
"github.com/sirupsen/logrus"
|
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/federationapi/api"
|
"github.com/matrix-org/dendrite/federationapi/api"
|
||||||
federationAPI "github.com/matrix-org/dendrite/federationapi/api"
|
federationAPI "github.com/matrix-org/dendrite/federationapi/api"
|
||||||
"github.com/matrix-org/dendrite/federationapi/consumers"
|
"github.com/matrix-org/dendrite/federationapi/consumers"
|
||||||
|
|
@ -33,9 +26,15 @@ import (
|
||||||
"github.com/matrix-org/dendrite/federationapi/statistics"
|
"github.com/matrix-org/dendrite/federationapi/statistics"
|
||||||
"github.com/matrix-org/dendrite/federationapi/storage"
|
"github.com/matrix-org/dendrite/federationapi/storage"
|
||||||
"github.com/matrix-org/dendrite/internal/caching"
|
"github.com/matrix-org/dendrite/internal/caching"
|
||||||
|
"github.com/matrix-org/dendrite/internal/httputil"
|
||||||
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
|
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
|
||||||
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||||
|
"github.com/matrix-org/dendrite/setup/process"
|
||||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||||
|
"github.com/matrix-org/gomatrixserverlib/fclient"
|
||||||
|
log "github.com/rs/zerolog/log"
|
||||||
|
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
|
||||||
|
|
@ -107,7 +106,7 @@ func NewInternalAPI(
|
||||||
|
|
||||||
federationDB, err := storage.NewDatabase(processContext.Context(), cm, &cfg.Database, caches, dendriteCfg.Global.IsLocalServerName)
|
federationDB, err := storage.NewDatabase(processContext.Context(), cm, &cfg.Database, caches, dendriteCfg.Global.IsLocalServerName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Panic("failed to connect to federation sender db")
|
log.Panic().Err(err).Msg("failed to connect to federation sender db")
|
||||||
}
|
}
|
||||||
|
|
||||||
if resetBlacklist {
|
if resetBlacklist {
|
||||||
|
|
@ -135,45 +134,45 @@ func NewInternalAPI(
|
||||||
federationDB, rsAPI,
|
federationDB, rsAPI,
|
||||||
)
|
)
|
||||||
if err = rsConsumer.Start(); err != nil {
|
if err = rsConsumer.Start(); err != nil {
|
||||||
logrus.WithError(err).Panic("failed to start room server consumer")
|
log.Panic().Err(err).Msg("failed to start room server consumer")
|
||||||
}
|
}
|
||||||
tsConsumer := consumers.NewOutputSendToDeviceConsumer(
|
tsConsumer := consumers.NewOutputSendToDeviceConsumer(
|
||||||
processContext, cfg, js, queues, federationDB,
|
processContext, cfg, js, queues, federationDB,
|
||||||
)
|
)
|
||||||
if err = tsConsumer.Start(); err != nil {
|
if err = tsConsumer.Start(); err != nil {
|
||||||
logrus.WithError(err).Panic("failed to start send-to-device consumer")
|
log.Panic().Err(err).Msg("failed to start send-to-device consumer")
|
||||||
}
|
}
|
||||||
receiptConsumer := consumers.NewOutputReceiptConsumer(
|
receiptConsumer := consumers.NewOutputReceiptConsumer(
|
||||||
processContext, cfg, js, queues, federationDB,
|
processContext, cfg, js, queues, federationDB,
|
||||||
)
|
)
|
||||||
if err = receiptConsumer.Start(); err != nil {
|
if err = receiptConsumer.Start(); err != nil {
|
||||||
logrus.WithError(err).Panic("failed to start receipt consumer")
|
log.Panic().Err(err).Msg("failed to start receipt consumer")
|
||||||
}
|
}
|
||||||
typingConsumer := consumers.NewOutputTypingConsumer(
|
typingConsumer := consumers.NewOutputTypingConsumer(
|
||||||
processContext, cfg, js, queues, federationDB,
|
processContext, cfg, js, queues, federationDB,
|
||||||
)
|
)
|
||||||
if err = typingConsumer.Start(); err != nil {
|
if err = typingConsumer.Start(); err != nil {
|
||||||
logrus.WithError(err).Panic("failed to start typing consumer")
|
log.Panic().Err(err).Msg("failed to start typing consumer")
|
||||||
}
|
}
|
||||||
keyConsumer := consumers.NewKeyChangeConsumer(
|
keyConsumer := consumers.NewKeyChangeConsumer(
|
||||||
processContext, &dendriteCfg.KeyServer, js, queues, federationDB, rsAPI,
|
processContext, &dendriteCfg.KeyServer, js, queues, federationDB, rsAPI,
|
||||||
)
|
)
|
||||||
if err = keyConsumer.Start(); err != nil {
|
if err = keyConsumer.Start(); err != nil {
|
||||||
logrus.WithError(err).Panic("failed to start key server consumer")
|
log.Panic().Err(err).Msg("failed to start key server consumer")
|
||||||
}
|
}
|
||||||
|
|
||||||
presenceConsumer := consumers.NewOutputPresenceConsumer(
|
presenceConsumer := consumers.NewOutputPresenceConsumer(
|
||||||
processContext, cfg, js, queues, federationDB, rsAPI,
|
processContext, cfg, js, queues, federationDB, rsAPI,
|
||||||
)
|
)
|
||||||
if err = presenceConsumer.Start(); err != nil {
|
if err = presenceConsumer.Start(); err != nil {
|
||||||
logrus.WithError(err).Panic("failed to start presence consumer")
|
log.Panic().Err(err).Msg("failed to start presence consumer")
|
||||||
}
|
}
|
||||||
|
|
||||||
var cleanExpiredEDUs func()
|
var cleanExpiredEDUs func()
|
||||||
cleanExpiredEDUs = func() {
|
cleanExpiredEDUs = func() {
|
||||||
logrus.Infof("Cleaning expired EDUs")
|
log.Info().Msg("Cleaning expired EDUs")
|
||||||
if err := federationDB.DeleteExpiredEDUs(processContext.Context()); err != nil {
|
if err := federationDB.DeleteExpiredEDUs(processContext.Context()); err != nil {
|
||||||
logrus.WithError(err).Error("Failed to clean expired EDUs")
|
log.Error().Err(err).Msg("Failed to clean expired EDUs")
|
||||||
}
|
}
|
||||||
time.AfterFunc(time.Hour, cleanExpiredEDUs)
|
time.AfterFunc(time.Hour, cleanExpiredEDUs)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -22,7 +22,7 @@ import (
|
||||||
"github.com/matrix-org/gomatrixserverlib/fclient"
|
"github.com/matrix-org/gomatrixserverlib/fclient"
|
||||||
"github.com/matrix-org/gomatrixserverlib/spec"
|
"github.com/matrix-org/gomatrixserverlib/spec"
|
||||||
"github.com/nats-io/nats.go"
|
"github.com/nats-io/nats.go"
|
||||||
"github.com/sirupsen/logrus"
|
log "github.com/rs/zerolog/log"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/setup/config"
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||||
|
|
@ -73,18 +73,18 @@ func (t *SigningKeyUpdateConsumer) onMessage(ctx context.Context, msgs []*nats.M
|
||||||
msg := msgs[0] // Guaranteed to exist if onMessage is called
|
msg := msgs[0] // Guaranteed to exist if onMessage is called
|
||||||
var updatePayload api.CrossSigningKeyUpdate
|
var updatePayload api.CrossSigningKeyUpdate
|
||||||
if err := json.Unmarshal(msg.Data, &updatePayload); err != nil {
|
if err := json.Unmarshal(msg.Data, &updatePayload); err != nil {
|
||||||
logrus.WithError(err).Errorf("Failed to read from signing key update input topic")
|
log.Error().Err(err).Msg("Failed to read from signing key update input topic")
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
origin := spec.ServerName(msg.Header.Get("origin"))
|
origin := spec.ServerName(msg.Header.Get("origin"))
|
||||||
if _, serverName, err := gomatrixserverlib.SplitID('@', updatePayload.UserID); err != nil {
|
if _, serverName, err := gomatrixserverlib.SplitID('@', updatePayload.UserID); err != nil {
|
||||||
logrus.WithError(err).Error("failed to split user id")
|
log.Error().Err(err).Msgf("failed to split user id")
|
||||||
return true
|
return true
|
||||||
} else if t.isLocalServerName(serverName) {
|
} else if t.isLocalServerName(serverName) {
|
||||||
logrus.Warn("dropping device key update from ourself")
|
log.Warn().Msg("dropping device key update from ourself")
|
||||||
return true
|
return true
|
||||||
} else if serverName != origin {
|
} else if serverName != origin {
|
||||||
logrus.Warnf("dropping device key update, %s != %s", serverName, origin)
|
log.Warn().Msgf("dropping device key update, %s != %s", serverName, origin)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -101,11 +101,11 @@ func (t *SigningKeyUpdateConsumer) onMessage(ctx context.Context, msgs []*nats.M
|
||||||
}
|
}
|
||||||
uploadRes := &api.PerformUploadDeviceKeysResponse{}
|
uploadRes := &api.PerformUploadDeviceKeysResponse{}
|
||||||
if err := t.userAPI.PerformUploadDeviceKeys(ctx, uploadReq, uploadRes); err != nil {
|
if err := t.userAPI.PerformUploadDeviceKeys(ctx, uploadReq, uploadRes); err != nil {
|
||||||
logrus.WithError(err).Error("failed to upload device keys")
|
log.Error().Err(err).Msg("failed to upload device keys")
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
if uploadRes.Error != nil {
|
if uploadRes.Error != nil {
|
||||||
logrus.WithError(uploadRes.Error).Error("failed to upload device keys")
|
log.Error().Err(uploadRes.Error).Msg("failed to upload device keys")
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -27,7 +27,8 @@ import (
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/matrix-org/gomatrixserverlib/fclient"
|
"github.com/matrix-org/gomatrixserverlib/fclient"
|
||||||
"github.com/matrix-org/gomatrixserverlib/spec"
|
"github.com/matrix-org/gomatrixserverlib/spec"
|
||||||
"github.com/sirupsen/logrus"
|
log "github.com/rs/zerolog/log"
|
||||||
|
|
||||||
"golang.org/x/crypto/curve25519"
|
"golang.org/x/crypto/curve25519"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -469,7 +470,7 @@ func (a *UserInternalAPI) crossSigningKeysFromDatabase(
|
||||||
for targetUserID := range req.UserToDevices {
|
for targetUserID := range req.UserToDevices {
|
||||||
keys, err := a.KeyDatabase.CrossSigningKeysForUser(ctx, targetUserID)
|
keys, err := a.KeyDatabase.CrossSigningKeysForUser(ctx, targetUserID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Errorf("Failed to get cross-signing keys for user %q", targetUserID)
|
log.Error().Err(err).Msgf("Failed to get cross-signing keys for user %q", targetUserID)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -482,7 +483,7 @@ func (a *UserInternalAPI) crossSigningKeysFromDatabase(
|
||||||
|
|
||||||
sigMap, err := a.KeyDatabase.CrossSigningSigsForTarget(ctx, req.UserID, targetUserID, keyID)
|
sigMap, err := a.KeyDatabase.CrossSigningSigsForTarget(ctx, req.UserID, targetUserID, keyID)
|
||||||
if err != nil && err != sql.ErrNoRows {
|
if err != nil && err != sql.ErrNoRows {
|
||||||
logrus.WithError(err).Errorf("Failed to get cross-signing signatures for user %q key %q", targetUserID, keyID)
|
log.Error().Err(err).Msgf("Failed to get cross-signing signatures for user %q key %q", targetUserID, keyID)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -22,7 +22,8 @@ import (
|
||||||
"github.com/matrix-org/dendrite/userapi/api"
|
"github.com/matrix-org/dendrite/userapi/api"
|
||||||
"github.com/matrix-org/dendrite/userapi/storage"
|
"github.com/matrix-org/dendrite/userapi/storage"
|
||||||
"github.com/nats-io/nats.go"
|
"github.com/nats-io/nats.go"
|
||||||
"github.com/sirupsen/logrus"
|
|
||||||
|
log "github.com/rs/zerolog/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
// KeyChange produces key change events for the sync API and federation sender to consume
|
// KeyChange produces key change events for the sync API and federation sender to consume
|
||||||
|
|
@ -61,10 +62,7 @@ func (p *KeyChange) ProduceKeyChanges(keys []api.DeviceMessage) error {
|
||||||
userToDeviceCount[key.UserID]++
|
userToDeviceCount[key.UserID]++
|
||||||
}
|
}
|
||||||
for userID, count := range userToDeviceCount {
|
for userID, count := range userToDeviceCount {
|
||||||
logrus.WithFields(logrus.Fields{
|
log.Trace().Str("user_id", userID).Int("num_key_changes", count).Msgf("Produced to key change topic '%s'", p.Topic)
|
||||||
"user_id": userID,
|
|
||||||
"num_key_changes": count,
|
|
||||||
}).Tracef("Produced to key change topic '%s'", p.Topic)
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
@ -100,8 +98,6 @@ func (p *KeyChange) ProduceSigningKeyUpdate(key api.CrossSigningKeyUpdate) error
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
logrus.WithFields(logrus.Fields{
|
log.Trace().Str("user_id", key.UserID).Msgf("Produced to cross-signing update topic '%s'", p.Topic)
|
||||||
"user_id": key.UserID,
|
|
||||||
}).Tracef("Produced to cross-signing update topic '%s'", p.Topic)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -6,7 +6,7 @@ import (
|
||||||
|
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/nats-io/nats.go"
|
"github.com/nats-io/nats.go"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/rs/zerolog/log"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal/eventutil"
|
"github.com/matrix-org/dendrite/internal/eventutil"
|
||||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||||
|
|
@ -48,11 +48,7 @@ func (p *SyncAPI) SendAccountData(userID string, data eventutil.AccountData) err
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
log.WithFields(log.Fields{
|
log.Trace().Str("user_id", userID).Str("room_id", data.RoomID).Str("data_type", data.Type).Msgf("Producing to topic '%s'", p.clientDataTopic)
|
||||||
"user_id": userID,
|
|
||||||
"room_id": data.RoomID,
|
|
||||||
"data_type": data.Type,
|
|
||||||
}).Tracef("Producing to topic '%s'", p.clientDataTopic)
|
|
||||||
|
|
||||||
_, err = p.producer.PublishMsg(m)
|
_, err = p.producer.PublishMsg(m)
|
||||||
return err
|
return err
|
||||||
|
|
@ -92,10 +88,7 @@ func (p *SyncAPI) sendNotificationData(userID string, data *eventutil.Notificati
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
log.WithFields(log.Fields{
|
log.Trace().Str("user_id", userID).Str("room_id", data.RoomID).Msgf("Producing to topic '%s'", p.clientDataTopic)
|
||||||
"user_id": userID,
|
|
||||||
"room_id": data.RoomID,
|
|
||||||
}).Tracef("Producing to topic '%s'", p.clientDataTopic)
|
|
||||||
|
|
||||||
_, err = p.producer.PublishMsg(m)
|
_, err = p.producer.PublishMsg(m)
|
||||||
return err
|
return err
|
||||||
|
|
|
||||||
|
|
@ -27,7 +27,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/userapi/storage/tables"
|
"github.com/matrix-org/dendrite/userapi/storage/tables"
|
||||||
"github.com/matrix-org/gomatrixserverlib/spec"
|
"github.com/matrix-org/gomatrixserverlib/spec"
|
||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/rs/zerolog/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
const accountsSchema = `
|
const accountsSchema = `
|
||||||
|
|
@ -178,7 +178,7 @@ func (s *accountsStatements) SelectAccountByLocalpart(
|
||||||
err := stmt.QueryRowContext(ctx, localpart, serverName).Scan(&acc.Localpart, &acc.ServerName, &appserviceIDPtr, &acc.AccountType)
|
err := stmt.QueryRowContext(ctx, localpart, serverName).Scan(&acc.Localpart, &acc.ServerName, &appserviceIDPtr, &acc.AccountType)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err != sql.ErrNoRows {
|
if err != sql.ErrNoRows {
|
||||||
log.WithError(err).Error("Unable to retrieve user from the db")
|
log.Error().Err(err).Msg("Unable to retrieve user from the db")
|
||||||
}
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -20,7 +20,7 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/rs/zerolog/log"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal"
|
"github.com/matrix-org/dendrite/internal"
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
|
|
@ -137,7 +137,7 @@ func (s *notificationsStatements) DeleteUpTo(ctx context.Context, txn *sql.Tx, l
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return true, err
|
return true, err
|
||||||
}
|
}
|
||||||
log.WithFields(log.Fields{"localpart": localpart, "room_id": roomID, "stream_pos": pos}).Tracef("DeleteUpTo: %d rows affected", nrows)
|
log.Trace().Str("localpart", localpart).Str("room_id", roomID).Uint64("stream_pos", pos).Msgf("DeleteUpTo: %d rows affected", nrows)
|
||||||
return nrows > 0, nil
|
return nrows > 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -151,7 +151,7 @@ func (s *notificationsStatements) UpdateRead(ctx context.Context, txn *sql.Tx, l
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return true, err
|
return true, err
|
||||||
}
|
}
|
||||||
log.WithFields(log.Fields{"localpart": localpart, "room_id": roomID, "stream_pos": pos}).Tracef("UpdateRead: %d rows affected", nrows)
|
log.Trace().Str("localpart", localpart).Str("room_id", roomID).Uint64("stream_pos", pos).Msgf("UpdateRead: %d rows affected", nrows)
|
||||||
return nrows > 0, nil
|
return nrows > 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -9,7 +9,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/userapi/api"
|
"github.com/matrix-org/dendrite/userapi/api"
|
||||||
"github.com/matrix-org/dendrite/userapi/storage/tables"
|
"github.com/matrix-org/dendrite/userapi/storage/tables"
|
||||||
"github.com/matrix-org/gomatrixserverlib/spec"
|
"github.com/matrix-org/gomatrixserverlib/spec"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/rs/zerolog/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
const openIDTokenSchema = `
|
const openIDTokenSchema = `
|
||||||
|
|
@ -80,7 +80,7 @@ func (s *openIDTokenStatements) SelectOpenIDTokenAtrributes(
|
||||||
openIDTokenAttrs.UserID = fmt.Sprintf("@%s:%s", localpart, serverName)
|
openIDTokenAttrs.UserID = fmt.Sprintf("@%s:%s", localpart, serverName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err != sql.ErrNoRows {
|
if err != sql.ErrNoRows {
|
||||||
log.WithError(err).Error("Unable to retrieve token from the db")
|
log.Error().Err(err).Msg("Unable to retrieve token from the db")
|
||||||
}
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -19,7 +19,7 @@ import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
|
||||||
"github.com/sirupsen/logrus"
|
log "github.com/rs/zerolog/log"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal"
|
"github.com/matrix-org/dendrite/internal"
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
|
|
@ -140,7 +140,7 @@ func (s *pushersStatements) SelectPushers(
|
||||||
pushers = append(pushers, pusher)
|
pushers = append(pushers, pusher)
|
||||||
}
|
}
|
||||||
|
|
||||||
logrus.Tracef("Database returned %d pushers", len(pushers))
|
log.Trace().Msgf("Database returned %d pushers", len(pushers))
|
||||||
return pushers, rows.Err()
|
return pushers, rows.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -21,7 +21,7 @@ import (
|
||||||
|
|
||||||
"github.com/lib/pq"
|
"github.com/lib/pq"
|
||||||
"github.com/matrix-org/gomatrixserverlib/spec"
|
"github.com/matrix-org/gomatrixserverlib/spec"
|
||||||
"github.com/sirupsen/logrus"
|
log "github.com/rs/zerolog/log"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal"
|
"github.com/matrix-org/dendrite/internal"
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
|
|
@ -235,9 +235,9 @@ func NewPostgresStatsTable(db *sql.DB, serverName spec.ServerName) (tables.Stats
|
||||||
func (s *statsStatements) startTimers() {
|
func (s *statsStatements) startTimers() {
|
||||||
var updateStatsFunc func()
|
var updateStatsFunc func()
|
||||||
updateStatsFunc = func() {
|
updateStatsFunc = func() {
|
||||||
logrus.Infof("Executing UpdateUserDailyVisits")
|
log.Info().Msg("Executing UpdateUserDailyVisits")
|
||||||
if err := s.UpdateUserDailyVisits(context.Background(), nil, time.Now(), s.lastUpdate); err != nil {
|
if err := s.UpdateUserDailyVisits(context.Background(), nil, time.Now(), s.lastUpdate); err != nil {
|
||||||
logrus.WithError(err).Error("failed to update daily user visits")
|
log.Error().Err(err).Msg("failed to update daily user visits")
|
||||||
}
|
}
|
||||||
time.AfterFunc(time.Hour*3, updateStatsFunc)
|
time.AfterFunc(time.Hour*3, updateStatsFunc)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -26,7 +26,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/userapi/storage/tables"
|
"github.com/matrix-org/dendrite/userapi/storage/tables"
|
||||||
"github.com/matrix-org/gomatrixserverlib/spec"
|
"github.com/matrix-org/gomatrixserverlib/spec"
|
||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/rs/zerolog/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
const accountsSchema = `
|
const accountsSchema = `
|
||||||
|
|
@ -178,7 +178,7 @@ func (s *accountsStatements) SelectAccountByLocalpart(
|
||||||
err := stmt.QueryRowContext(ctx, localpart, serverName).Scan(&acc.Localpart, &acc.ServerName, &appserviceIDPtr, &acc.AccountType)
|
err := stmt.QueryRowContext(ctx, localpart, serverName).Scan(&acc.Localpart, &acc.ServerName, &appserviceIDPtr, &acc.AccountType)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err != sql.ErrNoRows {
|
if err != sql.ErrNoRows {
|
||||||
log.WithError(err).Error("Unable to retrieve user from the db")
|
log.Error().Err(err).Msg("Unable to retrieve user from the db")
|
||||||
}
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -8,7 +8,8 @@ import (
|
||||||
|
|
||||||
"github.com/lib/pq"
|
"github.com/lib/pq"
|
||||||
"github.com/matrix-org/gomatrixserverlib/spec"
|
"github.com/matrix-org/gomatrixserverlib/spec"
|
||||||
"github.com/sirupsen/logrus"
|
|
||||||
|
log "github.com/rs/zerolog/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
var serverNamesTables = []string{
|
var serverNamesTables = []string{
|
||||||
|
|
@ -57,7 +58,7 @@ func UpServerNames(ctx context.Context, tx *sql.Tx, serverName spec.ServerName)
|
||||||
pq.QuoteIdentifier(table),
|
pq.QuoteIdentifier(table),
|
||||||
)
|
)
|
||||||
if err := tx.QueryRowContext(ctx, q).Scan(&c); err != nil || c == 1 {
|
if err := tx.QueryRowContext(ctx, q).Scan(&c); err != nil || c == 1 {
|
||||||
logrus.Infof("Table %s already has column, skipping", table)
|
log.Info().Msgf("Table %s already has column, skipping", table)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if c == 0 {
|
if c == 0 {
|
||||||
|
|
|
||||||
|
|
@ -20,7 +20,7 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/rs/zerolog/log"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal"
|
"github.com/matrix-org/dendrite/internal"
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
|
|
@ -137,7 +137,7 @@ func (s *notificationsStatements) DeleteUpTo(ctx context.Context, txn *sql.Tx, l
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return true, err
|
return true, err
|
||||||
}
|
}
|
||||||
log.WithFields(log.Fields{"localpart": localpart, "room_id": roomID, "stream_pos": pos}).Tracef("DeleteUpTo: %d rows affected", nrows)
|
log.Trace().Str("localpart", localpart).Str("room_id", roomID).Uint64("stream_pos", pos).Msgf("DeleteUpTo: %d rows affected", nrows)
|
||||||
return nrows > 0, nil
|
return nrows > 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -151,7 +151,7 @@ func (s *notificationsStatements) UpdateRead(ctx context.Context, txn *sql.Tx, l
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return true, err
|
return true, err
|
||||||
}
|
}
|
||||||
log.WithFields(log.Fields{"localpart": localpart, "room_id": roomID, "stream_pos": pos}).Tracef("UpdateRead: %d rows affected", nrows)
|
log.Trace().Str("localpart", localpart).Str("room_id", roomID).Uint64("stream_pos", pos).Msgf("UpdateRead: %d rows affected", nrows)
|
||||||
return nrows > 0, nil
|
return nrows > 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -9,7 +9,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/userapi/api"
|
"github.com/matrix-org/dendrite/userapi/api"
|
||||||
"github.com/matrix-org/dendrite/userapi/storage/tables"
|
"github.com/matrix-org/dendrite/userapi/storage/tables"
|
||||||
"github.com/matrix-org/gomatrixserverlib/spec"
|
"github.com/matrix-org/gomatrixserverlib/spec"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/rs/zerolog/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
const openIDTokenSchema = `
|
const openIDTokenSchema = `
|
||||||
|
|
@ -82,7 +82,7 @@ func (s *openIDTokenStatements) SelectOpenIDTokenAtrributes(
|
||||||
openIDTokenAttrs.UserID = fmt.Sprintf("@%s:%s", localpart, serverName)
|
openIDTokenAttrs.UserID = fmt.Sprintf("@%s:%s", localpart, serverName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err != sql.ErrNoRows {
|
if err != sql.ErrNoRows {
|
||||||
log.WithError(err).Error("Unable to retrieve token from the db")
|
log.Error().Err(err).Msg("Unable to retrieve token from the db")
|
||||||
}
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -19,7 +19,7 @@ import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
|
||||||
"github.com/sirupsen/logrus"
|
log "github.com/rs/zerolog/log"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal"
|
"github.com/matrix-org/dendrite/internal"
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
|
|
@ -140,7 +140,7 @@ func (s *pushersStatements) SelectPushers(
|
||||||
pushers = append(pushers, pusher)
|
pushers = append(pushers, pusher)
|
||||||
}
|
}
|
||||||
|
|
||||||
logrus.Tracef("Database returned %d pushers", len(pushers))
|
log.Trace().Msgf("Database returned %d pushers", len(pushers))
|
||||||
return pushers, rows.Err()
|
return pushers, rows.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -21,7 +21,8 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/matrix-org/gomatrixserverlib/spec"
|
"github.com/matrix-org/gomatrixserverlib/spec"
|
||||||
"github.com/sirupsen/logrus"
|
|
||||||
|
log "github.com/rs/zerolog/log"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal"
|
"github.com/matrix-org/dendrite/internal"
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
|
|
@ -241,9 +242,9 @@ func NewSQLiteStatsTable(db *sql.DB, serverName spec.ServerName) (tables.StatsTa
|
||||||
func (s *statsStatements) startTimers() {
|
func (s *statsStatements) startTimers() {
|
||||||
var updateStatsFunc func()
|
var updateStatsFunc func()
|
||||||
updateStatsFunc = func() {
|
updateStatsFunc = func() {
|
||||||
logrus.Infof("Executing UpdateUserDailyVisits")
|
log.Info().Msg("Executing UpdateUserDailyVisits")
|
||||||
if err := s.UpdateUserDailyVisits(context.Background(), nil, time.Now(), s.lastUpdate); err != nil {
|
if err := s.UpdateUserDailyVisits(context.Background(), nil, time.Now(), s.lastUpdate); err != nil {
|
||||||
logrus.WithError(err).Error("failed to update daily user visits")
|
log.Error().Err(err).Msg("failed to update daily user visits")
|
||||||
}
|
}
|
||||||
time.AfterFunc(time.Hour*3, updateStatsFunc)
|
time.AfterFunc(time.Hour*3, updateStatsFunc)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -8,7 +8,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/userapi/api"
|
"github.com/matrix-org/dendrite/userapi/api"
|
||||||
"github.com/matrix-org/dendrite/userapi/storage"
|
"github.com/matrix-org/dendrite/userapi/storage"
|
||||||
"github.com/matrix-org/gomatrixserverlib/spec"
|
"github.com/matrix-org/gomatrixserverlib/spec"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/rs/zerolog/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
type PusherDevice struct {
|
type PusherDevice struct {
|
||||||
|
|
@ -40,30 +40,20 @@ func GetPushDevices(ctx context.Context, localpart string, serverName spec.Serve
|
||||||
var ok bool
|
var ok bool
|
||||||
format, ok = fmtIface.(string)
|
format, ok = fmtIface.(string)
|
||||||
if ok && format != "event_id_only" {
|
if ok && format != "event_id_only" {
|
||||||
log.WithFields(log.Fields{
|
log.Error().Str("localpart", localpart).Str("app_id", pusher.AppID).Msg("Only data.format event_id_only or empty is supported")
|
||||||
"localpart": localpart,
|
|
||||||
"app_id": pusher.AppID,
|
|
||||||
}).Errorf("Only data.format event_id_only or empty is supported")
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
urlIface := pusher.Data["url"]
|
urlIface := pusher.Data["url"]
|
||||||
url, ok = urlIface.(string)
|
url, ok = urlIface.(string)
|
||||||
if !ok {
|
if !ok {
|
||||||
log.WithFields(log.Fields{
|
log.Error().Str("localpart", localpart).Str("app_id", pusher.AppID).Msg("No data.url configured for HTTP Pusher")
|
||||||
"localpart": localpart,
|
|
||||||
"app_id": pusher.AppID,
|
|
||||||
}).Errorf("No data.url configured for HTTP Pusher")
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
data = mapWithout(data, "url")
|
data = mapWithout(data, "url")
|
||||||
|
|
||||||
default:
|
default:
|
||||||
log.WithFields(log.Fields{
|
log.Error().Str("localpart", localpart).Str("app_id", pusher.AppID).Any("kind", pusher.Kind).Msg("Unhandled pusher kind")
|
||||||
"localpart": localpart,
|
|
||||||
"app_id": pusher.AppID,
|
|
||||||
"kind": pusher.Kind,
|
|
||||||
}).Errorf("Unhandled pusher kind")
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -9,7 +9,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/userapi/storage"
|
"github.com/matrix-org/dendrite/userapi/storage"
|
||||||
"github.com/matrix-org/dendrite/userapi/storage/tables"
|
"github.com/matrix-org/dendrite/userapi/storage/tables"
|
||||||
"github.com/matrix-org/gomatrixserverlib/spec"
|
"github.com/matrix-org/gomatrixserverlib/spec"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/rs/zerolog/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
// NotifyUserCountsAsync sends notifications to a local user's
|
// NotifyUserCountsAsync sends notifications to a local user's
|
||||||
|
|
@ -32,11 +32,7 @@ func NotifyUserCountsAsync(ctx context.Context, pgClient pushgateway.Client, loc
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
log.WithFields(log.Fields{
|
log.Trace().Str("localpart", localpart).Str("app_id0", pusherDevices[0].Device.AppID).Str("pushkey", pusherDevices[0].Device.PushKey).Msgf("Notifying HTTP push gateway about notification counts")
|
||||||
"localpart": localpart,
|
|
||||||
"app_id0": pusherDevices[0].Device.AppID,
|
|
||||||
"pushkey": pusherDevices[0].Device.PushKey,
|
|
||||||
}).Tracef("Notifying HTTP push gateway about notification counts")
|
|
||||||
|
|
||||||
// TODO: think about bounding this to one per user, and what
|
// TODO: think about bounding this to one per user, and what
|
||||||
// ordering guarantees we must provide.
|
// ordering guarantees we must provide.
|
||||||
|
|
@ -63,11 +59,7 @@ func NotifyUserCountsAsync(ctx context.Context, pgClient pushgateway.Client, loc
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
if err := pgClient.Notify(ctx, pusherDevice.URL, &req, &pushgateway.NotifyResponse{}); err != nil {
|
if err := pgClient.Notify(ctx, pusherDevice.URL, &req, &pushgateway.NotifyResponse{}); err != nil {
|
||||||
log.WithFields(log.Fields{
|
log.Error().Err(err).Str("localpart", localpart).Str("app_id0", pusherDevice.Device.AppID).Str("pushkey", pusherDevice.Device.PushKey).Msg("HTTP push gateway request failed")
|
||||||
"localpart": localpart,
|
|
||||||
"app_id0": pusherDevice.Device.AppID,
|
|
||||||
"pushkey": pusherDevice.Device.PushKey,
|
|
||||||
}).WithError(err).Error("HTTP push gateway request failed")
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -24,7 +24,7 @@ import (
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/sirupsen/logrus"
|
log "github.com/rs/zerolog/log"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal"
|
"github.com/matrix-org/dendrite/internal"
|
||||||
"github.com/matrix-org/dendrite/setup/config"
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
|
|
@ -92,7 +92,7 @@ func (p *phoneHomeStats) collect() {
|
||||||
// cpu and memory usage information
|
// cpu and memory usage information
|
||||||
err := getMemoryStats(p)
|
err := getMemoryStats(p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Warn("unable to get memory/cpu stats, using defaults")
|
log.Warn().Err(err).Msg("unable to get memory/cpu stats, using defaults")
|
||||||
}
|
}
|
||||||
|
|
||||||
// configuration information
|
// configuration information
|
||||||
|
|
@ -113,7 +113,7 @@ func (p *phoneHomeStats) collect() {
|
||||||
|
|
||||||
messageStats, activeRooms, activeE2EERooms, err := p.db.DailyRoomsMessages(ctx, p.serverName)
|
messageStats, activeRooms, activeE2EERooms, err := p.db.DailyRoomsMessages(ctx, p.serverName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Warn("unable to query message stats, using default values")
|
log.Warn().Err(err).Msg("unable to query message stats, using default values")
|
||||||
}
|
}
|
||||||
p.stats["daily_messages"] = messageStats.Messages
|
p.stats["daily_messages"] = messageStats.Messages
|
||||||
p.stats["daily_sent_messages"] = messageStats.SentMessages
|
p.stats["daily_sent_messages"] = messageStats.SentMessages
|
||||||
|
|
@ -125,7 +125,7 @@ func (p *phoneHomeStats) collect() {
|
||||||
// user stats and DB engine
|
// user stats and DB engine
|
||||||
userStats, db, err := p.db.UserStatistics(ctx)
|
userStats, db, err := p.db.UserStatistics(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Warn("unable to query userstats, using default values")
|
log.Warn().Err(err).Msg("unable to query userstats, using default values")
|
||||||
}
|
}
|
||||||
p.stats["database_engine"] = db.Engine
|
p.stats["database_engine"] = db.Engine
|
||||||
p.stats["database_server_version"] = db.Version
|
p.stats["database_server_version"] = db.Version
|
||||||
|
|
@ -145,22 +145,22 @@ func (p *phoneHomeStats) collect() {
|
||||||
|
|
||||||
output := bytes.Buffer{}
|
output := bytes.Buffer{}
|
||||||
if err = json.NewEncoder(&output).Encode(p.stats); err != nil {
|
if err = json.NewEncoder(&output).Encode(p.stats); err != nil {
|
||||||
logrus.WithError(err).Error("Unable to encode phone-home statistics")
|
log.Error().Err(err).Msg("Unable to encode phone-home statistics")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
logrus.Infof("Reporting stats to %s: %s", p.cfg.Global.ReportStats.Endpoint, output.String())
|
log.Info().Msgf("Reporting stats to %s: %s", p.cfg.Global.ReportStats.Endpoint, output.String())
|
||||||
|
|
||||||
request, err := http.NewRequestWithContext(ctx, http.MethodPost, p.cfg.Global.ReportStats.Endpoint, &output)
|
request, err := http.NewRequestWithContext(ctx, http.MethodPost, p.cfg.Global.ReportStats.Endpoint, &output)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Error("Unable to create phone-home statistics request")
|
log.Error().Err(err).Msg("Unable to create phone-home statistics request")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
request.Header.Set("User-Agent", "Dendrite/"+internal.VersionString())
|
request.Header.Set("User-Agent", "Dendrite/"+internal.VersionString())
|
||||||
|
|
||||||
_, err = p.client.Do(request)
|
_, err = p.client.Do(request)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Error("Unable to send phone-home statistics")
|
log.Error().Err(err).Msg("Unable to send phone-home statistics")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -21,14 +21,14 @@ import (
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/sirupsen/logrus"
|
log "github.com/rs/zerolog/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
func getMemoryStats(p *phoneHomeStats) error {
|
func getMemoryStats(p *phoneHomeStats) error {
|
||||||
oldUsage := p.prevData
|
oldUsage := p.prevData
|
||||||
newUsage := syscall.Rusage{}
|
newUsage := syscall.Rusage{}
|
||||||
if err := syscall.Getrusage(syscall.RUSAGE_SELF, &newUsage); err != nil {
|
if err := syscall.Getrusage(syscall.RUSAGE_SELF, &newUsage); err != nil {
|
||||||
logrus.WithError(err).Error("unable to get usage")
|
log.Error().Err(err).Msg("unable to get usage")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
newData := timestampToRUUsage{timestamp: time.Now().Unix(), usage: newUsage}
|
newData := timestampToRUUsage{timestamp: time.Now().Unix(), usage: newUsage}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue