From 005c4c0168d81e4ff9d0f737601febcdd65ebd08 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Fri, 29 May 2020 10:07:57 +0100 Subject: [PATCH] Fix storing and retrieving of send-to-device messages --- clientapi/routing/sendtodevice.go | 9 --------- eduserver/cache/cache.go | 2 +- syncapi/consumers/eduserver_sendtodevice.go | 10 +++------- syncapi/storage/shared/syncserver.go | 8 ++++++-- syncapi/sync/requestpool.go | 11 ++++++----- 5 files changed, 16 insertions(+), 24 deletions(-) diff --git a/clientapi/routing/sendtodevice.go b/clientapi/routing/sendtodevice.go index fad86c7aa..45ea86c75 100644 --- a/clientapi/routing/sendtodevice.go +++ b/clientapi/routing/sendtodevice.go @@ -22,7 +22,6 @@ import ( "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/internal/transactions" "github.com/matrix-org/util" - "github.com/sirupsen/logrus" ) // SendToDevice handles PUT /_matrix/client/r0/sendToDevice/{eventType}/{txnId} @@ -56,14 +55,6 @@ func SendToDevice( ); err != nil { util.GetLogger(req.Context()).WithError(err).Error("eduProducer.SendToDevice failed") return jsonerror.InternalServerError() - } else { - util.GetLogger(req.Context()).WithFields(logrus.Fields{ - "sender": device.UserID, - "user_id": userID, - "device_id": deviceID, - "event_type": eventType, - "message": string(message), - }).Info("client API processed send-to-device message") } } } diff --git a/eduserver/cache/cache.go b/eduserver/cache/cache.go index 61e2cbc87..2908c3f7e 100644 --- a/eduserver/cache/cache.go +++ b/eduserver/cache/cache.go @@ -117,7 +117,7 @@ func (t *EDUCache) AddSendToDeviceMessage() int64 { defer t.Unlock() t.latestSyncPosition++ - return t.latestSyncPosition + return t.latestSyncPosition - 1 } // addUser with mutex lock & replace the previous timer. diff --git a/syncapi/consumers/eduserver_sendtodevice.go b/syncapi/consumers/eduserver_sendtodevice.go index cf1995e59..356d2c4b4 100644 --- a/syncapi/consumers/eduserver_sendtodevice.go +++ b/syncapi/consumers/eduserver_sendtodevice.go @@ -25,7 +25,7 @@ import ( "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/sync" "github.com/matrix-org/dendrite/syncapi/types" - "github.com/sirupsen/logrus" + "github.com/matrix-org/util" log "github.com/sirupsen/logrus" ) @@ -64,13 +64,10 @@ func NewOutputSendToDeviceEventConsumer( // Start consuming from EDU api func (s *OutputSendToDeviceEventConsumer) Start() error { - logrus.Info("syncapi starting sendToDevice consumer") return s.sendToDeviceConsumer.Start() } func (s *OutputSendToDeviceEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { - logrus.Info("syncapi received sendToDevice event") - var output api.OutputSendToDeviceEvent if err := json.Unmarshal(msg.Value, &output); err != nil { // If the message was invalid, log it and move on to the next message in the stream @@ -78,13 +75,12 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(msg *sarama.ConsumerMessage) return err } - log.WithFields(log.Fields{ + util.GetLogger(context.TODO()).WithFields(log.Fields{ "sender": output.Sender, "user_id": output.UserID, "device_id": output.DeviceID, "event_type": output.Type, - "content": string(output.Content), - }).Debug("sync API received send-to-device event from EDU server") + }).Info("sync API received send-to-device event from EDU server") newPos, err := s.db.StoreNewSendForDeviceMessage( context.TODO(), output.UserID, output.DeviceID, output.SendToDeviceEvent, diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index 1a7a7e2c2..93471ea2d 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -1041,8 +1041,12 @@ func (d *Database) AddSendToDeviceEvent( func (d *Database) StoreNewSendForDeviceMessage( ctx context.Context, userID, deviceID string, event gomatrixserverlib.SendToDeviceEvent, ) (types.StreamPosition, error) { - err := d.AddSendToDeviceEvent( - ctx, nil, userID, deviceID, string(event.Content), + j, err := json.Marshal(event) + if err != nil { + return 0, err + } + err = d.AddSendToDeviceEvent( + ctx, nil, userID, deviceID, string(j), ) if err != nil { return 0, err diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index 919530fd6..a2b361e64 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -55,12 +55,13 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype JSON: jsonerror.Unknown(err.Error()), } } + logger := util.GetLogger(req.Context()).WithFields(log.Fields{ - "userID": device.UserID, - "deviceID": device.ID, - "since": syncReq.since, - "timeout": syncReq.timeout, - "limit": syncReq.limit, + "user_id": device.UserID, + "device_id": device.ID, + "since": syncReq.since, + "timeout": syncReq.timeout, + "limit": syncReq.limit, }) currPos := rp.notifier.CurrentPosition()