mirror of
https://github.com/matrix-org/dendrite.git
synced 2024-11-22 14:21:55 -06:00
Discard "illegal base64 data at input byte 0" errors in the SyncAPI
This commit is contained in:
parent
3e314e028e
commit
6011ddc0a8
|
@ -17,16 +17,12 @@ package consumers
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"encoding/base64"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/getsentry/sentry-go"
|
"github.com/getsentry/sentry-go"
|
||||||
"github.com/matrix-org/gomatrixserverlib/spec"
|
|
||||||
"github.com/nats-io/nats.go"
|
|
||||||
"github.com/sirupsen/logrus"
|
|
||||||
log "github.com/sirupsen/logrus"
|
|
||||||
"github.com/tidwall/gjson"
|
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal/fulltext"
|
"github.com/matrix-org/dendrite/internal/fulltext"
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
|
@ -38,6 +34,11 @@ import (
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||||
"github.com/matrix-org/dendrite/syncapi/streams"
|
"github.com/matrix-org/dendrite/syncapi/streams"
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
|
"github.com/matrix-org/gomatrixserverlib/spec"
|
||||||
|
"github.com/nats-io/nats.go"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
"github.com/tidwall/gjson"
|
||||||
)
|
)
|
||||||
|
|
||||||
// OutputRoomEventConsumer consumes events that originated in the room server.
|
// OutputRoomEventConsumer consumes events that originated in the room server.
|
||||||
|
@ -141,7 +142,14 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Ms
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Error("roomserver output log: failed to process event")
|
if errors.As(err, new(base64.CorruptInputError)) {
|
||||||
|
// no matter how often we retry this event, we will always get this error, discard the event
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"type": output.Type,
|
||||||
|
}).WithError(err).Error("roomserver output log: failed to process event")
|
||||||
|
sentry.CaptureException(err)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -237,21 +245,18 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
|
||||||
|
|
||||||
ev, err := s.updateStateEvent(ev)
|
ev, err := s.updateStateEvent(ev)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
sentry.CaptureException(err)
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := range addsStateEvents {
|
for i := range addsStateEvents {
|
||||||
addsStateEvents[i], err = s.updateStateEvent(addsStateEvents[i])
|
addsStateEvents[i], err = s.updateStateEvent(addsStateEvents[i])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
sentry.CaptureException(err)
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if msg.RewritesState {
|
if msg.RewritesState {
|
||||||
if err = s.db.PurgeRoomState(ctx, ev.RoomID()); err != nil {
|
if err = s.db.PurgeRoomState(ctx, ev.RoomID()); err != nil {
|
||||||
sentry.CaptureException(err)
|
|
||||||
return fmt.Errorf("s.db.PurgeRoom: %w", err)
|
return fmt.Errorf("s.db.PurgeRoom: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -289,7 +294,6 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
|
||||||
|
|
||||||
if pduPos, err = s.notifyJoinedPeeks(ctx, ev, pduPos); err != nil {
|
if pduPos, err = s.notifyJoinedPeeks(ctx, ev, pduPos); err != nil {
|
||||||
log.WithError(err).Errorf("Failed to notifyJoinedPeeks for PDU pos %d", pduPos)
|
log.WithError(err).Errorf("Failed to notifyJoinedPeeks for PDU pos %d", pduPos)
|
||||||
sentry.CaptureException(err)
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -430,7 +434,6 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent(
|
||||||
|
|
||||||
pduPos, err := s.db.AddInviteEvent(ctx, msg.Event)
|
pduPos, err := s.db.AddInviteEvent(ctx, msg.Event)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
sentry.CaptureException(err)
|
|
||||||
// panic rather than continue with an inconsistent database
|
// panic rather than continue with an inconsistent database
|
||||||
log.WithFields(log.Fields{
|
log.WithFields(log.Fields{
|
||||||
"event_id": msg.Event.EventID(),
|
"event_id": msg.Event.EventID(),
|
||||||
|
@ -452,7 +455,6 @@ func (s *OutputRoomEventConsumer) onRetireInviteEvent(
|
||||||
// It's possible we just haven't heard of this invite yet, so
|
// It's possible we just haven't heard of this invite yet, so
|
||||||
// we should not panic if we try to retire it.
|
// we should not panic if we try to retire it.
|
||||||
if err != nil && err != sql.ErrNoRows {
|
if err != nil && err != sql.ErrNoRows {
|
||||||
sentry.CaptureException(err)
|
|
||||||
// panic rather than continue with an inconsistent database
|
// panic rather than continue with an inconsistent database
|
||||||
log.WithFields(log.Fields{
|
log.WithFields(log.Fields{
|
||||||
"event_id": msg.EventID,
|
"event_id": msg.EventID,
|
||||||
|
@ -496,7 +498,6 @@ func (s *OutputRoomEventConsumer) onNewPeek(
|
||||||
) {
|
) {
|
||||||
sp, err := s.db.AddPeek(ctx, msg.RoomID, msg.UserID, msg.DeviceID)
|
sp, err := s.db.AddPeek(ctx, msg.RoomID, msg.UserID, msg.DeviceID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
sentry.CaptureException(err)
|
|
||||||
// panic rather than continue with an inconsistent database
|
// panic rather than continue with an inconsistent database
|
||||||
log.WithFields(log.Fields{
|
log.WithFields(log.Fields{
|
||||||
log.ErrorKey: err,
|
log.ErrorKey: err,
|
||||||
|
|
Loading…
Reference in a new issue