diff --git a/clientapi/routing/state.go b/clientapi/routing/state.go index b826a6476..d7f0b40f8 100644 --- a/clientapi/routing/state.go +++ b/clientapi/routing/state.go @@ -193,7 +193,7 @@ func OnIncomingStateRequest(ctx context.Context, device *userapi.Device, rsAPI a } stateEvents = append( stateEvents, - synctypes.ToClientEvent(ev, synctypes.FormatAll, sender.String(), sk), + synctypes.ToClientEvent(ev, synctypes.FormatAll, sender.String(), sk, ev.Unsigned()), ) } } diff --git a/internal/eventutil/events.go b/internal/eventutil/events.go index 295f1fe46..aa99e5860 100644 --- a/internal/eventutil/events.go +++ b/internal/eventutil/events.go @@ -184,7 +184,7 @@ func RedactEvent(ctx context.Context, redactionEvent, redactedEvent gomatrixserv if err != nil { return err } - redactedBecause := synctypes.ToClientEvent(redactionEvent, synctypes.FormatSync, senderID.String(), redactionEvent.StateKey()) + redactedBecause := synctypes.ToClientEvent(redactionEvent, synctypes.FormatSync, senderID.String(), redactionEvent.StateKey(), redactionEvent.Unsigned()) if err := redactedEvent.SetUnsignedField("redacted_because", redactedBecause); err != nil { return err } diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index 18e1c17d2..402c820c0 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -33,11 +33,11 @@ import ( "github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/streams" + "github.com/matrix-org/dendrite/syncapi/synctypes" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib/spec" "github.com/nats-io/nats.go" "github.com/sirupsen/logrus" - log "github.com/sirupsen/logrus" "github.com/tidwall/gjson" ) @@ -101,7 +101,7 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Ms var output api.OutputEvent if err = json.Unmarshal(msg.Data, &output); err != nil { // If the message was invalid, log it and move on to the next message in the stream - log.WithError(err).Errorf("roomserver output log: message parse failure") + logrus.WithError(err).Errorf("roomserver output log: message parse failure") return true } @@ -137,7 +137,7 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Ms return true // non-fatal, as otherwise we end up in a loop of trying to purge the room } default: - log.WithField("type", output.Type).Debug( + logrus.WithField("type", output.Type).Debug( "roomserver output log: ignoring unknown output type", ) } @@ -146,7 +146,7 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Ms // no matter how often we retry this event, we will always get this error, discard the event return true } - log.WithFields(log.Fields{ + logrus.WithFields(logrus.Fields{ "type": output.Type, }).WithError(err).Error("roomserver output log: failed to process event") sentry.CaptureException(err) @@ -161,12 +161,12 @@ func (s *OutputRoomEventConsumer) onRedactEvent( ) error { err := s.db.RedactEvent(ctx, msg.RedactedEventID, msg.RedactedBecause, s.rsAPI) if err != nil { - log.WithError(err).Error("RedactEvent error'd") + logrus.WithError(err).Error("RedactEvent error'd") return err } if err = s.db.RedactRelations(ctx, msg.RedactedBecause.RoomID(), msg.RedactedEventID); err != nil { - log.WithFields(log.Fields{ + logrus.WithFields(logrus.Fields{ "room_id": msg.RedactedBecause.RoomID(), "event_id": msg.RedactedBecause.EventID(), "redacted_event_id": msg.RedactedEventID, @@ -276,29 +276,29 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent( pduPos, err := s.db.WriteEvent(ctx, ev, addsStateEvents, msg.AddsStateEventIDs, msg.RemovesStateEventIDs, msg.TransactionID, false, msg.HistoryVisibility) if err != nil { // panic rather than continue with an inconsistent database - log.WithFields(log.Fields{ - "event_id": ev.EventID(), - "event": string(ev.JSON()), - log.ErrorKey: err, - "add": msg.AddsStateEventIDs, - "del": msg.RemovesStateEventIDs, + logrus.WithFields(logrus.Fields{ + "event_id": ev.EventID(), + "event": string(ev.JSON()), + logrus.ErrorKey: err, + "add": msg.AddsStateEventIDs, + "del": msg.RemovesStateEventIDs, }).Panicf("roomserver output log: write new event failure") return nil } if err = s.writeFTS(ev, pduPos); err != nil { - log.WithFields(log.Fields{ + logrus.WithFields(logrus.Fields{ "event_id": ev.EventID(), "type": ev.Type(), }).WithError(err).Warn("failed to index fulltext element") } if pduPos, err = s.notifyJoinedPeeks(ctx, ev, pduPos); err != nil { - log.WithError(err).Errorf("Failed to notifyJoinedPeeks for PDU pos %d", pduPos) + logrus.WithError(err).Errorf("Failed to notifyJoinedPeeks for PDU pos %d", pduPos) return err } if err = s.db.UpdateRelations(ctx, ev); err != nil { - log.WithFields(log.Fields{ + logrus.WithFields(logrus.Fields{ "event_id": ev.EventID(), "type": ev.Type(), }).WithError(err).Warn("Failed to update relations") @@ -337,23 +337,23 @@ func (s *OutputRoomEventConsumer) onOldRoomEvent( pduPos, err := s.db.WriteEvent(ctx, ev, []*rstypes.HeaderedEvent{}, []string{}, []string{}, nil, ev.StateKey() != nil, msg.HistoryVisibility) if err != nil { // panic rather than continue with an inconsistent database - log.WithFields(log.Fields{ - "event_id": ev.EventID(), - "event": string(ev.JSON()), - log.ErrorKey: err, + logrus.WithFields(logrus.Fields{ + "event_id": ev.EventID(), + "event": string(ev.JSON()), + logrus.ErrorKey: err, }).Panicf("roomserver output log: write old event failure") return nil } if err = s.writeFTS(ev, pduPos); err != nil { - log.WithFields(log.Fields{ + logrus.WithFields(logrus.Fields{ "event_id": ev.EventID(), "type": ev.Type(), }).WithError(err).Warn("failed to index fulltext element") } if err = s.db.UpdateRelations(ctx, ev); err != nil { - log.WithFields(log.Fields{ + logrus.WithFields(logrus.Fields{ "room_id": ev.RoomID(), "event_id": ev.EventID(), "type": ev.Type(), @@ -362,7 +362,7 @@ func (s *OutputRoomEventConsumer) onOldRoomEvent( } if pduPos, err = s.notifyJoinedPeeks(ctx, ev, pduPos); err != nil { - log.WithError(err).Errorf("Failed to notifyJoinedPeeks for PDU pos %d", pduPos) + logrus.WithError(err).Errorf("Failed to notifyJoinedPeeks for PDU pos %d", pduPos) return err } @@ -435,11 +435,11 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent( pduPos, err := s.db.AddInviteEvent(ctx, msg.Event) if err != nil { // panic rather than continue with an inconsistent database - log.WithFields(log.Fields{ - "event_id": msg.Event.EventID(), - "event": string(msg.Event.JSON()), - "pdupos": pduPos, - log.ErrorKey: err, + logrus.WithFields(logrus.Fields{ + "event_id": msg.Event.EventID(), + "event": string(msg.Event.JSON()), + "pdupos": pduPos, + logrus.ErrorKey: err, }).Errorf("roomserver output log: write invite failure") return } @@ -456,9 +456,9 @@ func (s *OutputRoomEventConsumer) onRetireInviteEvent( // we should not panic if we try to retire it. if err != nil && err != sql.ErrNoRows { // panic rather than continue with an inconsistent database - log.WithFields(log.Fields{ - "event_id": msg.EventID, - log.ErrorKey: err, + logrus.WithFields(logrus.Fields{ + "event_id": msg.EventID, + logrus.ErrorKey: err, }).Errorf("roomserver output log: remove invite failure") return } @@ -474,19 +474,19 @@ func (s *OutputRoomEventConsumer) onRetireInviteEvent( s.inviteStream.Advance(pduPos) validRoomID, err := spec.NewRoomID(msg.RoomID) if err != nil { - log.WithFields(log.Fields{ - "event_id": msg.EventID, - "room_id": msg.RoomID, - log.ErrorKey: err, + logrus.WithFields(logrus.Fields{ + "event_id": msg.EventID, + "room_id": msg.RoomID, + logrus.ErrorKey: err, }).Errorf("roomID is invalid") return } userID, err := s.rsAPI.QueryUserIDForSender(ctx, *validRoomID, msg.TargetSenderID) if err != nil || userID == nil { - log.WithFields(log.Fields{ - "event_id": msg.EventID, - "sender_id": msg.TargetSenderID, - log.ErrorKey: err, + logrus.WithFields(logrus.Fields{ + "event_id": msg.EventID, + "sender_id": msg.TargetSenderID, + logrus.ErrorKey: err, }).Errorf("failed to find userID for sender") return } @@ -499,8 +499,8 @@ func (s *OutputRoomEventConsumer) onNewPeek( sp, err := s.db.AddPeek(ctx, msg.RoomID, msg.UserID, msg.DeviceID) if err != nil { // panic rather than continue with an inconsistent database - log.WithFields(log.Fields{ - log.ErrorKey: err, + logrus.WithFields(logrus.Fields{ + logrus.ErrorKey: err, }).Errorf("roomserver output log: write peek failure") return } @@ -518,8 +518,8 @@ func (s *OutputRoomEventConsumer) onRetirePeek( sp, err := s.db.DeletePeek(ctx, msg.RoomID, msg.UserID, msg.DeviceID) if err != nil { // panic rather than continue with an inconsistent database - log.WithFields(log.Fields{ - log.ErrorKey: err, + logrus.WithFields(logrus.Fields{ + logrus.ErrorKey: err, }).Errorf("roomserver output log: write peek failure") return } @@ -592,17 +592,10 @@ func (s *OutputRoomEventConsumer) updateStateEvent(event *rstypes.HeaderedEvent) return event, nil } - // TODO: don't change this to userID if event_format == federation - prevEventSender := string(prevEvent.SenderID()) - prevUser, err := s.rsAPI.QueryUserIDForSender(s.ctx, *validRoomID, prevEvent.SenderID()) - if err == nil && prevUser != nil { - prevEventSender = prevUser.String() - } - - prev := types.PrevEventRef{ + prev := synctypes.PrevEventRef{ PrevContent: prevEvent.Content(), ReplacesState: prevEvent.EventID(), - PrevSenderID: prevEventSender, + PrevSenderID: string(prevEvent.SenderID()), } event.PDU, err = event.SetUnsigned(prev) @@ -629,7 +622,7 @@ func (s *OutputRoomEventConsumer) writeFTS(ev *rstypes.HeaderedEvent, pduPositio case spec.MRoomTopic: e.Content = gjson.GetBytes(ev.Content(), "topic").String() case spec.MRoomRedaction: - log.Tracef("Redacting event: %s", ev.Redacts()) + logrus.Tracef("Redacting event: %s", ev.Redacts()) if err := s.fts.Delete(ev.Redacts()); err != nil { return fmt.Errorf("failed to delete entry from fulltext index: %w", err) } @@ -638,7 +631,7 @@ func (s *OutputRoomEventConsumer) writeFTS(ev *rstypes.HeaderedEvent, pduPositio return nil } if e.Content != "" { - log.Tracef("Indexing element: %+v", e) + logrus.Tracef("Indexing element: %+v", e) if err := s.fts.Index(e); err != nil { return err } diff --git a/syncapi/routing/getevent.go b/syncapi/routing/getevent.go index 594db7f7e..bf0f9bf8c 100644 --- a/syncapi/routing/getevent.go +++ b/syncapi/routing/getevent.go @@ -144,6 +144,6 @@ func GetEvent( } return util.JSONResponse{ Code: http.StatusOK, - JSON: synctypes.ToClientEvent(events[0], synctypes.FormatAll, senderUserID.String(), sk), + JSON: synctypes.ToClientEvent(events[0], synctypes.FormatAll, senderUserID.String(), sk, events[0].Unsigned()), } } diff --git a/syncapi/routing/relations.go b/syncapi/routing/relations.go index 20c42e145..b451a7e2e 100644 --- a/syncapi/routing/relations.go +++ b/syncapi/routing/relations.go @@ -146,7 +146,7 @@ func Relations( } res.Chunk = append( res.Chunk, - synctypes.ToClientEvent(event.PDU, synctypes.FormatAll, sender.String(), sk), + synctypes.ToClientEvent(event.PDU, synctypes.FormatAll, sender.String(), sk, event.Unsigned()), ) } diff --git a/syncapi/routing/search.go b/syncapi/routing/search.go index 80f9e76d3..7d5c061b7 100644 --- a/syncapi/routing/search.go +++ b/syncapi/routing/search.go @@ -267,7 +267,7 @@ func Search(req *http.Request, device *api.Device, syncDB storage.Database, fts ProfileInfo: profileInfos, }, Rank: eventScore[event.EventID()].Score, - Result: synctypes.ToClientEvent(event, synctypes.FormatAll, sender.String(), sk), + Result: synctypes.ToClientEvent(event, synctypes.FormatAll, sender.String(), sk, event.Unsigned()), }) roomGroup := groups[event.RoomID()] roomGroup.Results = append(roomGroup.Results, event.EventID()) diff --git a/syncapi/synctypes/clientevent.go b/syncapi/synctypes/clientevent.go index 5a41a8f1d..17dc81d64 100644 --- a/syncapi/synctypes/clientevent.go +++ b/syncapi/synctypes/clientevent.go @@ -16,12 +16,21 @@ package synctypes import ( + "encoding/json" "fmt" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib/spec" + "github.com/sirupsen/logrus" ) +// PrevEventRef represents a reference to a previous event in a state event upgrade +type PrevEventRef struct { + PrevContent json.RawMessage `json:"prev_content"` + ReplacesState string `json:"replaces_state"` + PrevSenderID string `json:"prev_sender"` +} + type ClientEventFormat int const ( @@ -70,7 +79,7 @@ func ToClientEvents(serverEvs []gomatrixserverlib.PDU, format ClientEventFormat, continue // TODO: shouldn't happen? } if format == FormatSyncFederation { - evs = append(evs, ToClientEvent(se, format, string(se.SenderID()), se.StateKey())) + evs = append(evs, ToClientEvent(se, format, string(se.SenderID()), se.StateKey(), spec.RawJSON(se.Unsigned()))) continue } @@ -92,19 +101,36 @@ func ToClientEvents(serverEvs []gomatrixserverlib.PDU, format ClientEventFormat, sk = &skString } } - evs = append(evs, ToClientEvent(se, format, sender.String(), sk)) + + unsigned := se.Unsigned() + var prev PrevEventRef + if err := json.Unmarshal(se.Unsigned(), &prev); err == nil && prev.PrevSenderID != "" { + prevUserID, err := userIDForSender(*validRoomID, spec.SenderID(prev.PrevSenderID)) + if err == nil && userID != nil { + prev.PrevSenderID = prevUserID.String() + } else { + logrus.Warnf("Failed to find userID for prev_sender in ClientEvent") + // NOTE: Not much can be done here, so leave the previous value in place. + } + unsigned, err = json.Marshal(prev) + if err != nil { + logrus.Errorf("Failed to marshal unsigned content for ClientEvent: %s", err.Error()) + continue + } + } + evs = append(evs, ToClientEvent(se, format, sender.String(), sk, spec.RawJSON(unsigned))) } return evs } // ToClientEvent converts a single server event to a client event. -func ToClientEvent(se gomatrixserverlib.PDU, format ClientEventFormat, sender string, stateKey *string) ClientEvent { +func ToClientEvent(se gomatrixserverlib.PDU, format ClientEventFormat, sender string, stateKey *string, unsigned spec.RawJSON) ClientEvent { ce := ClientEvent{ Content: spec.RawJSON(se.Content()), Sender: sender, Type: se.Type(), StateKey: stateKey, - Unsigned: spec.RawJSON(se.Unsigned()), + Unsigned: unsigned, OriginServerTS: se.OriginServerTS(), EventID: se.EventID(), Redacts: se.Redacts(), @@ -151,7 +177,7 @@ func ToClientEventDefault(userIDQuery spec.UserIDForSender, event gomatrixserver sk = &skString } } - return ToClientEvent(event, FormatAll, sender.String(), sk) + return ToClientEvent(event, FormatAll, sender.String(), sk, event.Unsigned()) } // If provided state key is a user ID (state keys beginning with @ are reserved for this purpose) diff --git a/syncapi/synctypes/clientevent_test.go b/syncapi/synctypes/clientevent_test.go index d40277fbc..588fa2834 100644 --- a/syncapi/synctypes/clientevent_test.go +++ b/syncapi/synctypes/clientevent_test.go @@ -49,7 +49,7 @@ func TestToClientEvent(t *testing.T) { // nolint: gocyclo t.Fatalf("failed to create userID: %s", err) } sk := "" - ce := ToClientEvent(ev, FormatAll, userID.String(), &sk) + ce := ToClientEvent(ev, FormatAll, userID.String(), &sk, ev.Unsigned()) if ce.EventID != ev.EventID() { t.Errorf("ClientEvent.EventID: wanted %s, got %s", ev.EventID(), ce.EventID) } @@ -109,7 +109,7 @@ func TestToClientFormatSync(t *testing.T) { t.Fatalf("failed to create userID: %s", err) } sk := "" - ce := ToClientEvent(ev, FormatSync, userID.String(), &sk) + ce := ToClientEvent(ev, FormatSync, userID.String(), &sk, ev.Unsigned()) if ce.RoomID != "" { t.Errorf("ClientEvent.RoomID: wanted '', got %s", ce.RoomID) } diff --git a/syncapi/types/types.go b/syncapi/types/types.go index 4074e0635..b90c128c3 100644 --- a/syncapi/types/types.go +++ b/syncapi/types/types.go @@ -339,13 +339,6 @@ func NewStreamTokenFromString(tok string) (token StreamingToken, err error) { return token, nil } -// PrevEventRef represents a reference to a previous event in a state event upgrade -type PrevEventRef struct { - PrevContent json.RawMessage `json:"prev_content"` - ReplacesState string `json:"replaces_state"` - PrevSenderID string `json:"prev_sender"` -} - type DeviceLists struct { Changed []string `json:"changed,omitempty"` Left []string `json:"left,omitempty"` @@ -552,7 +545,7 @@ func NewInviteResponse(event *types.HeaderedEvent, userID spec.UserID, stateKey // Then we'll see if we can create a partial of the invite event itself. // This is needed for clients to work out *who* sent the invite. - inviteEvent := synctypes.ToClientEvent(event.PDU, eventFormat, userID.String(), stateKey) + inviteEvent := synctypes.ToClientEvent(event.PDU, eventFormat, userID.String(), stateKey, event.Unsigned()) inviteEvent.Unsigned = nil if ev, err := json.Marshal(inviteEvent); err == nil { res.InviteState.Events = append(res.InviteState.Events, ev) diff --git a/userapi/consumers/roomserver.go b/userapi/consumers/roomserver.go index a38fed067..8863d258a 100644 --- a/userapi/consumers/roomserver.go +++ b/userapi/consumers/roomserver.go @@ -321,7 +321,7 @@ func (s *OutputRoomEventConsumer) processMessage(ctx context.Context, event *rst return fmt.Errorf("queryUserIDForSender: userID unknown for %s", *sk) } } - cevent := synctypes.ToClientEvent(event, synctypes.FormatAll, sender.String(), sk) + cevent := synctypes.ToClientEvent(event, synctypes.FormatAll, sender.String(), sk, event.Unsigned()) var member *localMembership member, err = newLocalMembership(&cevent) if err != nil { @@ -566,7 +566,7 @@ func (s *OutputRoomEventConsumer) notifyLocal(ctx context.Context, event *rstype // UNSPEC: the spec doesn't say this is a ClientEvent, but the // fields seem to match. room_id should be missing, which // matches the behaviour of FormatSync. - Event: synctypes.ToClientEvent(event, synctypes.FormatSync, sender.String(), sk), + Event: synctypes.ToClientEvent(event, synctypes.FormatSync, sender.String(), sk, event.Unsigned()), // TODO: this is per-device, but it's not part of the primary // key. So inserting one notification per profile tag doesn't // make sense. What is this supposed to be? Sytests require it diff --git a/userapi/util/notify_test.go b/userapi/util/notify_test.go index 3344cbcb6..27e77cf7a 100644 --- a/userapi/util/notify_test.go +++ b/userapi/util/notify_test.go @@ -106,7 +106,7 @@ func TestNotifyUserCountsAsync(t *testing.T) { } sk := "" if err := db.InsertNotification(ctx, aliceLocalpart, serverName, dummyEvent.EventID(), 0, nil, &api.Notification{ - Event: synctypes.ToClientEvent(dummyEvent, synctypes.FormatAll, sender.String(), &sk), + Event: synctypes.ToClientEvent(dummyEvent, synctypes.FormatAll, sender.String(), &sk, dummyEvent.Unsigned()), }); err != nil { t.Error(err) }