mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-07 06:53:09 -06:00
Properly handle prev_sender based on /sync event_format
This commit is contained in:
parent
3f22b374b9
commit
353e6668ec
|
|
@ -193,7 +193,7 @@ func OnIncomingStateRequest(ctx context.Context, device *userapi.Device, rsAPI a
|
||||||
}
|
}
|
||||||
stateEvents = append(
|
stateEvents = append(
|
||||||
stateEvents,
|
stateEvents,
|
||||||
synctypes.ToClientEvent(ev, synctypes.FormatAll, sender.String(), sk),
|
synctypes.ToClientEvent(ev, synctypes.FormatAll, sender.String(), sk, ev.Unsigned()),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -184,7 +184,7 @@ func RedactEvent(ctx context.Context, redactionEvent, redactedEvent gomatrixserv
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
redactedBecause := synctypes.ToClientEvent(redactionEvent, synctypes.FormatSync, senderID.String(), redactionEvent.StateKey())
|
redactedBecause := synctypes.ToClientEvent(redactionEvent, synctypes.FormatSync, senderID.String(), redactionEvent.StateKey(), redactionEvent.Unsigned())
|
||||||
if err := redactedEvent.SetUnsignedField("redacted_because", redactedBecause); err != nil {
|
if err := redactedEvent.SetUnsignedField("redacted_because", redactedBecause); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -33,11 +33,11 @@ import (
|
||||||
"github.com/matrix-org/dendrite/syncapi/notifier"
|
"github.com/matrix-org/dendrite/syncapi/notifier"
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||||
"github.com/matrix-org/dendrite/syncapi/streams"
|
"github.com/matrix-org/dendrite/syncapi/streams"
|
||||||
|
"github.com/matrix-org/dendrite/syncapi/synctypes"
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
"github.com/matrix-org/gomatrixserverlib/spec"
|
"github.com/matrix-org/gomatrixserverlib/spec"
|
||||||
"github.com/nats-io/nats.go"
|
"github.com/nats-io/nats.go"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
log "github.com/sirupsen/logrus"
|
|
||||||
"github.com/tidwall/gjson"
|
"github.com/tidwall/gjson"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -101,7 +101,7 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Ms
|
||||||
var output api.OutputEvent
|
var output api.OutputEvent
|
||||||
if err = json.Unmarshal(msg.Data, &output); err != nil {
|
if err = json.Unmarshal(msg.Data, &output); err != nil {
|
||||||
// If the message was invalid, log it and move on to the next message in the stream
|
// If the message was invalid, log it and move on to the next message in the stream
|
||||||
log.WithError(err).Errorf("roomserver output log: message parse failure")
|
logrus.WithError(err).Errorf("roomserver output log: message parse failure")
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -137,7 +137,7 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Ms
|
||||||
return true // non-fatal, as otherwise we end up in a loop of trying to purge the room
|
return true // non-fatal, as otherwise we end up in a loop of trying to purge the room
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
log.WithField("type", output.Type).Debug(
|
logrus.WithField("type", output.Type).Debug(
|
||||||
"roomserver output log: ignoring unknown output type",
|
"roomserver output log: ignoring unknown output type",
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
@ -146,7 +146,7 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Ms
|
||||||
// no matter how often we retry this event, we will always get this error, discard the event
|
// no matter how often we retry this event, we will always get this error, discard the event
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
log.WithFields(log.Fields{
|
logrus.WithFields(logrus.Fields{
|
||||||
"type": output.Type,
|
"type": output.Type,
|
||||||
}).WithError(err).Error("roomserver output log: failed to process event")
|
}).WithError(err).Error("roomserver output log: failed to process event")
|
||||||
sentry.CaptureException(err)
|
sentry.CaptureException(err)
|
||||||
|
|
@ -161,12 +161,12 @@ func (s *OutputRoomEventConsumer) onRedactEvent(
|
||||||
) error {
|
) error {
|
||||||
err := s.db.RedactEvent(ctx, msg.RedactedEventID, msg.RedactedBecause, s.rsAPI)
|
err := s.db.RedactEvent(ctx, msg.RedactedEventID, msg.RedactedBecause, s.rsAPI)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Error("RedactEvent error'd")
|
logrus.WithError(err).Error("RedactEvent error'd")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = s.db.RedactRelations(ctx, msg.RedactedBecause.RoomID(), msg.RedactedEventID); err != nil {
|
if err = s.db.RedactRelations(ctx, msg.RedactedBecause.RoomID(), msg.RedactedEventID); err != nil {
|
||||||
log.WithFields(log.Fields{
|
logrus.WithFields(logrus.Fields{
|
||||||
"room_id": msg.RedactedBecause.RoomID(),
|
"room_id": msg.RedactedBecause.RoomID(),
|
||||||
"event_id": msg.RedactedBecause.EventID(),
|
"event_id": msg.RedactedBecause.EventID(),
|
||||||
"redacted_event_id": msg.RedactedEventID,
|
"redacted_event_id": msg.RedactedEventID,
|
||||||
|
|
@ -276,29 +276,29 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
|
||||||
pduPos, err := s.db.WriteEvent(ctx, ev, addsStateEvents, msg.AddsStateEventIDs, msg.RemovesStateEventIDs, msg.TransactionID, false, msg.HistoryVisibility)
|
pduPos, err := s.db.WriteEvent(ctx, ev, addsStateEvents, msg.AddsStateEventIDs, msg.RemovesStateEventIDs, msg.TransactionID, false, msg.HistoryVisibility)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// panic rather than continue with an inconsistent database
|
// panic rather than continue with an inconsistent database
|
||||||
log.WithFields(log.Fields{
|
logrus.WithFields(logrus.Fields{
|
||||||
"event_id": ev.EventID(),
|
"event_id": ev.EventID(),
|
||||||
"event": string(ev.JSON()),
|
"event": string(ev.JSON()),
|
||||||
log.ErrorKey: err,
|
logrus.ErrorKey: err,
|
||||||
"add": msg.AddsStateEventIDs,
|
"add": msg.AddsStateEventIDs,
|
||||||
"del": msg.RemovesStateEventIDs,
|
"del": msg.RemovesStateEventIDs,
|
||||||
}).Panicf("roomserver output log: write new event failure")
|
}).Panicf("roomserver output log: write new event failure")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if err = s.writeFTS(ev, pduPos); err != nil {
|
if err = s.writeFTS(ev, pduPos); err != nil {
|
||||||
log.WithFields(log.Fields{
|
logrus.WithFields(logrus.Fields{
|
||||||
"event_id": ev.EventID(),
|
"event_id": ev.EventID(),
|
||||||
"type": ev.Type(),
|
"type": ev.Type(),
|
||||||
}).WithError(err).Warn("failed to index fulltext element")
|
}).WithError(err).Warn("failed to index fulltext element")
|
||||||
}
|
}
|
||||||
|
|
||||||
if pduPos, err = s.notifyJoinedPeeks(ctx, ev, pduPos); err != nil {
|
if pduPos, err = s.notifyJoinedPeeks(ctx, ev, pduPos); err != nil {
|
||||||
log.WithError(err).Errorf("Failed to notifyJoinedPeeks for PDU pos %d", pduPos)
|
logrus.WithError(err).Errorf("Failed to notifyJoinedPeeks for PDU pos %d", pduPos)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = s.db.UpdateRelations(ctx, ev); err != nil {
|
if err = s.db.UpdateRelations(ctx, ev); err != nil {
|
||||||
log.WithFields(log.Fields{
|
logrus.WithFields(logrus.Fields{
|
||||||
"event_id": ev.EventID(),
|
"event_id": ev.EventID(),
|
||||||
"type": ev.Type(),
|
"type": ev.Type(),
|
||||||
}).WithError(err).Warn("Failed to update relations")
|
}).WithError(err).Warn("Failed to update relations")
|
||||||
|
|
@ -337,23 +337,23 @@ func (s *OutputRoomEventConsumer) onOldRoomEvent(
|
||||||
pduPos, err := s.db.WriteEvent(ctx, ev, []*rstypes.HeaderedEvent{}, []string{}, []string{}, nil, ev.StateKey() != nil, msg.HistoryVisibility)
|
pduPos, err := s.db.WriteEvent(ctx, ev, []*rstypes.HeaderedEvent{}, []string{}, []string{}, nil, ev.StateKey() != nil, msg.HistoryVisibility)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// panic rather than continue with an inconsistent database
|
// panic rather than continue with an inconsistent database
|
||||||
log.WithFields(log.Fields{
|
logrus.WithFields(logrus.Fields{
|
||||||
"event_id": ev.EventID(),
|
"event_id": ev.EventID(),
|
||||||
"event": string(ev.JSON()),
|
"event": string(ev.JSON()),
|
||||||
log.ErrorKey: err,
|
logrus.ErrorKey: err,
|
||||||
}).Panicf("roomserver output log: write old event failure")
|
}).Panicf("roomserver output log: write old event failure")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = s.writeFTS(ev, pduPos); err != nil {
|
if err = s.writeFTS(ev, pduPos); err != nil {
|
||||||
log.WithFields(log.Fields{
|
logrus.WithFields(logrus.Fields{
|
||||||
"event_id": ev.EventID(),
|
"event_id": ev.EventID(),
|
||||||
"type": ev.Type(),
|
"type": ev.Type(),
|
||||||
}).WithError(err).Warn("failed to index fulltext element")
|
}).WithError(err).Warn("failed to index fulltext element")
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = s.db.UpdateRelations(ctx, ev); err != nil {
|
if err = s.db.UpdateRelations(ctx, ev); err != nil {
|
||||||
log.WithFields(log.Fields{
|
logrus.WithFields(logrus.Fields{
|
||||||
"room_id": ev.RoomID(),
|
"room_id": ev.RoomID(),
|
||||||
"event_id": ev.EventID(),
|
"event_id": ev.EventID(),
|
||||||
"type": ev.Type(),
|
"type": ev.Type(),
|
||||||
|
|
@ -362,7 +362,7 @@ func (s *OutputRoomEventConsumer) onOldRoomEvent(
|
||||||
}
|
}
|
||||||
|
|
||||||
if pduPos, err = s.notifyJoinedPeeks(ctx, ev, pduPos); err != nil {
|
if pduPos, err = s.notifyJoinedPeeks(ctx, ev, pduPos); err != nil {
|
||||||
log.WithError(err).Errorf("Failed to notifyJoinedPeeks for PDU pos %d", pduPos)
|
logrus.WithError(err).Errorf("Failed to notifyJoinedPeeks for PDU pos %d", pduPos)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -435,11 +435,11 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent(
|
||||||
pduPos, err := s.db.AddInviteEvent(ctx, msg.Event)
|
pduPos, err := s.db.AddInviteEvent(ctx, msg.Event)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// panic rather than continue with an inconsistent database
|
// panic rather than continue with an inconsistent database
|
||||||
log.WithFields(log.Fields{
|
logrus.WithFields(logrus.Fields{
|
||||||
"event_id": msg.Event.EventID(),
|
"event_id": msg.Event.EventID(),
|
||||||
"event": string(msg.Event.JSON()),
|
"event": string(msg.Event.JSON()),
|
||||||
"pdupos": pduPos,
|
"pdupos": pduPos,
|
||||||
log.ErrorKey: err,
|
logrus.ErrorKey: err,
|
||||||
}).Errorf("roomserver output log: write invite failure")
|
}).Errorf("roomserver output log: write invite failure")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
@ -456,9 +456,9 @@ func (s *OutputRoomEventConsumer) onRetireInviteEvent(
|
||||||
// we should not panic if we try to retire it.
|
// we should not panic if we try to retire it.
|
||||||
if err != nil && err != sql.ErrNoRows {
|
if err != nil && err != sql.ErrNoRows {
|
||||||
// panic rather than continue with an inconsistent database
|
// panic rather than continue with an inconsistent database
|
||||||
log.WithFields(log.Fields{
|
logrus.WithFields(logrus.Fields{
|
||||||
"event_id": msg.EventID,
|
"event_id": msg.EventID,
|
||||||
log.ErrorKey: err,
|
logrus.ErrorKey: err,
|
||||||
}).Errorf("roomserver output log: remove invite failure")
|
}).Errorf("roomserver output log: remove invite failure")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
@ -474,19 +474,19 @@ func (s *OutputRoomEventConsumer) onRetireInviteEvent(
|
||||||
s.inviteStream.Advance(pduPos)
|
s.inviteStream.Advance(pduPos)
|
||||||
validRoomID, err := spec.NewRoomID(msg.RoomID)
|
validRoomID, err := spec.NewRoomID(msg.RoomID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithFields(log.Fields{
|
logrus.WithFields(logrus.Fields{
|
||||||
"event_id": msg.EventID,
|
"event_id": msg.EventID,
|
||||||
"room_id": msg.RoomID,
|
"room_id": msg.RoomID,
|
||||||
log.ErrorKey: err,
|
logrus.ErrorKey: err,
|
||||||
}).Errorf("roomID is invalid")
|
}).Errorf("roomID is invalid")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
userID, err := s.rsAPI.QueryUserIDForSender(ctx, *validRoomID, msg.TargetSenderID)
|
userID, err := s.rsAPI.QueryUserIDForSender(ctx, *validRoomID, msg.TargetSenderID)
|
||||||
if err != nil || userID == nil {
|
if err != nil || userID == nil {
|
||||||
log.WithFields(log.Fields{
|
logrus.WithFields(logrus.Fields{
|
||||||
"event_id": msg.EventID,
|
"event_id": msg.EventID,
|
||||||
"sender_id": msg.TargetSenderID,
|
"sender_id": msg.TargetSenderID,
|
||||||
log.ErrorKey: err,
|
logrus.ErrorKey: err,
|
||||||
}).Errorf("failed to find userID for sender")
|
}).Errorf("failed to find userID for sender")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
@ -499,8 +499,8 @@ func (s *OutputRoomEventConsumer) onNewPeek(
|
||||||
sp, err := s.db.AddPeek(ctx, msg.RoomID, msg.UserID, msg.DeviceID)
|
sp, err := s.db.AddPeek(ctx, msg.RoomID, msg.UserID, msg.DeviceID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// panic rather than continue with an inconsistent database
|
// panic rather than continue with an inconsistent database
|
||||||
log.WithFields(log.Fields{
|
logrus.WithFields(logrus.Fields{
|
||||||
log.ErrorKey: err,
|
logrus.ErrorKey: err,
|
||||||
}).Errorf("roomserver output log: write peek failure")
|
}).Errorf("roomserver output log: write peek failure")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
@ -518,8 +518,8 @@ func (s *OutputRoomEventConsumer) onRetirePeek(
|
||||||
sp, err := s.db.DeletePeek(ctx, msg.RoomID, msg.UserID, msg.DeviceID)
|
sp, err := s.db.DeletePeek(ctx, msg.RoomID, msg.UserID, msg.DeviceID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// panic rather than continue with an inconsistent database
|
// panic rather than continue with an inconsistent database
|
||||||
log.WithFields(log.Fields{
|
logrus.WithFields(logrus.Fields{
|
||||||
log.ErrorKey: err,
|
logrus.ErrorKey: err,
|
||||||
}).Errorf("roomserver output log: write peek failure")
|
}).Errorf("roomserver output log: write peek failure")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
@ -592,17 +592,10 @@ func (s *OutputRoomEventConsumer) updateStateEvent(event *rstypes.HeaderedEvent)
|
||||||
return event, nil
|
return event, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: don't change this to userID if event_format == federation
|
prev := synctypes.PrevEventRef{
|
||||||
prevEventSender := string(prevEvent.SenderID())
|
|
||||||
prevUser, err := s.rsAPI.QueryUserIDForSender(s.ctx, *validRoomID, prevEvent.SenderID())
|
|
||||||
if err == nil && prevUser != nil {
|
|
||||||
prevEventSender = prevUser.String()
|
|
||||||
}
|
|
||||||
|
|
||||||
prev := types.PrevEventRef{
|
|
||||||
PrevContent: prevEvent.Content(),
|
PrevContent: prevEvent.Content(),
|
||||||
ReplacesState: prevEvent.EventID(),
|
ReplacesState: prevEvent.EventID(),
|
||||||
PrevSenderID: prevEventSender,
|
PrevSenderID: string(prevEvent.SenderID()),
|
||||||
}
|
}
|
||||||
|
|
||||||
event.PDU, err = event.SetUnsigned(prev)
|
event.PDU, err = event.SetUnsigned(prev)
|
||||||
|
|
@ -629,7 +622,7 @@ func (s *OutputRoomEventConsumer) writeFTS(ev *rstypes.HeaderedEvent, pduPositio
|
||||||
case spec.MRoomTopic:
|
case spec.MRoomTopic:
|
||||||
e.Content = gjson.GetBytes(ev.Content(), "topic").String()
|
e.Content = gjson.GetBytes(ev.Content(), "topic").String()
|
||||||
case spec.MRoomRedaction:
|
case spec.MRoomRedaction:
|
||||||
log.Tracef("Redacting event: %s", ev.Redacts())
|
logrus.Tracef("Redacting event: %s", ev.Redacts())
|
||||||
if err := s.fts.Delete(ev.Redacts()); err != nil {
|
if err := s.fts.Delete(ev.Redacts()); err != nil {
|
||||||
return fmt.Errorf("failed to delete entry from fulltext index: %w", err)
|
return fmt.Errorf("failed to delete entry from fulltext index: %w", err)
|
||||||
}
|
}
|
||||||
|
|
@ -638,7 +631,7 @@ func (s *OutputRoomEventConsumer) writeFTS(ev *rstypes.HeaderedEvent, pduPositio
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if e.Content != "" {
|
if e.Content != "" {
|
||||||
log.Tracef("Indexing element: %+v", e)
|
logrus.Tracef("Indexing element: %+v", e)
|
||||||
if err := s.fts.Index(e); err != nil {
|
if err := s.fts.Index(e); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -144,6 +144,6 @@ func GetEvent(
|
||||||
}
|
}
|
||||||
return util.JSONResponse{
|
return util.JSONResponse{
|
||||||
Code: http.StatusOK,
|
Code: http.StatusOK,
|
||||||
JSON: synctypes.ToClientEvent(events[0], synctypes.FormatAll, senderUserID.String(), sk),
|
JSON: synctypes.ToClientEvent(events[0], synctypes.FormatAll, senderUserID.String(), sk, events[0].Unsigned()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -146,7 +146,7 @@ func Relations(
|
||||||
}
|
}
|
||||||
res.Chunk = append(
|
res.Chunk = append(
|
||||||
res.Chunk,
|
res.Chunk,
|
||||||
synctypes.ToClientEvent(event.PDU, synctypes.FormatAll, sender.String(), sk),
|
synctypes.ToClientEvent(event.PDU, synctypes.FormatAll, sender.String(), sk, event.Unsigned()),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -267,7 +267,7 @@ func Search(req *http.Request, device *api.Device, syncDB storage.Database, fts
|
||||||
ProfileInfo: profileInfos,
|
ProfileInfo: profileInfos,
|
||||||
},
|
},
|
||||||
Rank: eventScore[event.EventID()].Score,
|
Rank: eventScore[event.EventID()].Score,
|
||||||
Result: synctypes.ToClientEvent(event, synctypes.FormatAll, sender.String(), sk),
|
Result: synctypes.ToClientEvent(event, synctypes.FormatAll, sender.String(), sk, event.Unsigned()),
|
||||||
})
|
})
|
||||||
roomGroup := groups[event.RoomID()]
|
roomGroup := groups[event.RoomID()]
|
||||||
roomGroup.Results = append(roomGroup.Results, event.EventID())
|
roomGroup.Results = append(roomGroup.Results, event.EventID())
|
||||||
|
|
|
||||||
|
|
@ -16,12 +16,21 @@
|
||||||
package synctypes
|
package synctypes
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/matrix-org/gomatrixserverlib/spec"
|
"github.com/matrix-org/gomatrixserverlib/spec"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// PrevEventRef represents a reference to a previous event in a state event upgrade
|
||||||
|
type PrevEventRef struct {
|
||||||
|
PrevContent json.RawMessage `json:"prev_content"`
|
||||||
|
ReplacesState string `json:"replaces_state"`
|
||||||
|
PrevSenderID string `json:"prev_sender"`
|
||||||
|
}
|
||||||
|
|
||||||
type ClientEventFormat int
|
type ClientEventFormat int
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|
@ -70,7 +79,7 @@ func ToClientEvents(serverEvs []gomatrixserverlib.PDU, format ClientEventFormat,
|
||||||
continue // TODO: shouldn't happen?
|
continue // TODO: shouldn't happen?
|
||||||
}
|
}
|
||||||
if format == FormatSyncFederation {
|
if format == FormatSyncFederation {
|
||||||
evs = append(evs, ToClientEvent(se, format, string(se.SenderID()), se.StateKey()))
|
evs = append(evs, ToClientEvent(se, format, string(se.SenderID()), se.StateKey(), spec.RawJSON(se.Unsigned())))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -92,19 +101,36 @@ func ToClientEvents(serverEvs []gomatrixserverlib.PDU, format ClientEventFormat,
|
||||||
sk = &skString
|
sk = &skString
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
evs = append(evs, ToClientEvent(se, format, sender.String(), sk))
|
|
||||||
|
unsigned := se.Unsigned()
|
||||||
|
var prev PrevEventRef
|
||||||
|
if err := json.Unmarshal(se.Unsigned(), &prev); err == nil && prev.PrevSenderID != "" {
|
||||||
|
prevUserID, err := userIDForSender(*validRoomID, spec.SenderID(prev.PrevSenderID))
|
||||||
|
if err == nil && userID != nil {
|
||||||
|
prev.PrevSenderID = prevUserID.String()
|
||||||
|
} else {
|
||||||
|
logrus.Warnf("Failed to find userID for prev_sender in ClientEvent")
|
||||||
|
// NOTE: Not much can be done here, so leave the previous value in place.
|
||||||
|
}
|
||||||
|
unsigned, err = json.Marshal(prev)
|
||||||
|
if err != nil {
|
||||||
|
logrus.Errorf("Failed to marshal unsigned content for ClientEvent: %s", err.Error())
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
evs = append(evs, ToClientEvent(se, format, sender.String(), sk, spec.RawJSON(unsigned)))
|
||||||
}
|
}
|
||||||
return evs
|
return evs
|
||||||
}
|
}
|
||||||
|
|
||||||
// ToClientEvent converts a single server event to a client event.
|
// ToClientEvent converts a single server event to a client event.
|
||||||
func ToClientEvent(se gomatrixserverlib.PDU, format ClientEventFormat, sender string, stateKey *string) ClientEvent {
|
func ToClientEvent(se gomatrixserverlib.PDU, format ClientEventFormat, sender string, stateKey *string, unsigned spec.RawJSON) ClientEvent {
|
||||||
ce := ClientEvent{
|
ce := ClientEvent{
|
||||||
Content: spec.RawJSON(se.Content()),
|
Content: spec.RawJSON(se.Content()),
|
||||||
Sender: sender,
|
Sender: sender,
|
||||||
Type: se.Type(),
|
Type: se.Type(),
|
||||||
StateKey: stateKey,
|
StateKey: stateKey,
|
||||||
Unsigned: spec.RawJSON(se.Unsigned()),
|
Unsigned: unsigned,
|
||||||
OriginServerTS: se.OriginServerTS(),
|
OriginServerTS: se.OriginServerTS(),
|
||||||
EventID: se.EventID(),
|
EventID: se.EventID(),
|
||||||
Redacts: se.Redacts(),
|
Redacts: se.Redacts(),
|
||||||
|
|
@ -151,7 +177,7 @@ func ToClientEventDefault(userIDQuery spec.UserIDForSender, event gomatrixserver
|
||||||
sk = &skString
|
sk = &skString
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return ToClientEvent(event, FormatAll, sender.String(), sk)
|
return ToClientEvent(event, FormatAll, sender.String(), sk, event.Unsigned())
|
||||||
}
|
}
|
||||||
|
|
||||||
// If provided state key is a user ID (state keys beginning with @ are reserved for this purpose)
|
// If provided state key is a user ID (state keys beginning with @ are reserved for this purpose)
|
||||||
|
|
|
||||||
|
|
@ -49,7 +49,7 @@ func TestToClientEvent(t *testing.T) { // nolint: gocyclo
|
||||||
t.Fatalf("failed to create userID: %s", err)
|
t.Fatalf("failed to create userID: %s", err)
|
||||||
}
|
}
|
||||||
sk := ""
|
sk := ""
|
||||||
ce := ToClientEvent(ev, FormatAll, userID.String(), &sk)
|
ce := ToClientEvent(ev, FormatAll, userID.String(), &sk, ev.Unsigned())
|
||||||
if ce.EventID != ev.EventID() {
|
if ce.EventID != ev.EventID() {
|
||||||
t.Errorf("ClientEvent.EventID: wanted %s, got %s", ev.EventID(), ce.EventID)
|
t.Errorf("ClientEvent.EventID: wanted %s, got %s", ev.EventID(), ce.EventID)
|
||||||
}
|
}
|
||||||
|
|
@ -109,7 +109,7 @@ func TestToClientFormatSync(t *testing.T) {
|
||||||
t.Fatalf("failed to create userID: %s", err)
|
t.Fatalf("failed to create userID: %s", err)
|
||||||
}
|
}
|
||||||
sk := ""
|
sk := ""
|
||||||
ce := ToClientEvent(ev, FormatSync, userID.String(), &sk)
|
ce := ToClientEvent(ev, FormatSync, userID.String(), &sk, ev.Unsigned())
|
||||||
if ce.RoomID != "" {
|
if ce.RoomID != "" {
|
||||||
t.Errorf("ClientEvent.RoomID: wanted '', got %s", ce.RoomID)
|
t.Errorf("ClientEvent.RoomID: wanted '', got %s", ce.RoomID)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -339,13 +339,6 @@ func NewStreamTokenFromString(tok string) (token StreamingToken, err error) {
|
||||||
return token, nil
|
return token, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// PrevEventRef represents a reference to a previous event in a state event upgrade
|
|
||||||
type PrevEventRef struct {
|
|
||||||
PrevContent json.RawMessage `json:"prev_content"`
|
|
||||||
ReplacesState string `json:"replaces_state"`
|
|
||||||
PrevSenderID string `json:"prev_sender"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type DeviceLists struct {
|
type DeviceLists struct {
|
||||||
Changed []string `json:"changed,omitempty"`
|
Changed []string `json:"changed,omitempty"`
|
||||||
Left []string `json:"left,omitempty"`
|
Left []string `json:"left,omitempty"`
|
||||||
|
|
@ -552,7 +545,7 @@ func NewInviteResponse(event *types.HeaderedEvent, userID spec.UserID, stateKey
|
||||||
|
|
||||||
// Then we'll see if we can create a partial of the invite event itself.
|
// Then we'll see if we can create a partial of the invite event itself.
|
||||||
// This is needed for clients to work out *who* sent the invite.
|
// This is needed for clients to work out *who* sent the invite.
|
||||||
inviteEvent := synctypes.ToClientEvent(event.PDU, eventFormat, userID.String(), stateKey)
|
inviteEvent := synctypes.ToClientEvent(event.PDU, eventFormat, userID.String(), stateKey, event.Unsigned())
|
||||||
inviteEvent.Unsigned = nil
|
inviteEvent.Unsigned = nil
|
||||||
if ev, err := json.Marshal(inviteEvent); err == nil {
|
if ev, err := json.Marshal(inviteEvent); err == nil {
|
||||||
res.InviteState.Events = append(res.InviteState.Events, ev)
|
res.InviteState.Events = append(res.InviteState.Events, ev)
|
||||||
|
|
|
||||||
|
|
@ -321,7 +321,7 @@ func (s *OutputRoomEventConsumer) processMessage(ctx context.Context, event *rst
|
||||||
return fmt.Errorf("queryUserIDForSender: userID unknown for %s", *sk)
|
return fmt.Errorf("queryUserIDForSender: userID unknown for %s", *sk)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
cevent := synctypes.ToClientEvent(event, synctypes.FormatAll, sender.String(), sk)
|
cevent := synctypes.ToClientEvent(event, synctypes.FormatAll, sender.String(), sk, event.Unsigned())
|
||||||
var member *localMembership
|
var member *localMembership
|
||||||
member, err = newLocalMembership(&cevent)
|
member, err = newLocalMembership(&cevent)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -566,7 +566,7 @@ func (s *OutputRoomEventConsumer) notifyLocal(ctx context.Context, event *rstype
|
||||||
// UNSPEC: the spec doesn't say this is a ClientEvent, but the
|
// UNSPEC: the spec doesn't say this is a ClientEvent, but the
|
||||||
// fields seem to match. room_id should be missing, which
|
// fields seem to match. room_id should be missing, which
|
||||||
// matches the behaviour of FormatSync.
|
// matches the behaviour of FormatSync.
|
||||||
Event: synctypes.ToClientEvent(event, synctypes.FormatSync, sender.String(), sk),
|
Event: synctypes.ToClientEvent(event, synctypes.FormatSync, sender.String(), sk, event.Unsigned()),
|
||||||
// TODO: this is per-device, but it's not part of the primary
|
// TODO: this is per-device, but it's not part of the primary
|
||||||
// key. So inserting one notification per profile tag doesn't
|
// key. So inserting one notification per profile tag doesn't
|
||||||
// make sense. What is this supposed to be? Sytests require it
|
// make sense. What is this supposed to be? Sytests require it
|
||||||
|
|
|
||||||
|
|
@ -106,7 +106,7 @@ func TestNotifyUserCountsAsync(t *testing.T) {
|
||||||
}
|
}
|
||||||
sk := ""
|
sk := ""
|
||||||
if err := db.InsertNotification(ctx, aliceLocalpart, serverName, dummyEvent.EventID(), 0, nil, &api.Notification{
|
if err := db.InsertNotification(ctx, aliceLocalpart, serverName, dummyEvent.EventID(), 0, nil, &api.Notification{
|
||||||
Event: synctypes.ToClientEvent(dummyEvent, synctypes.FormatAll, sender.String(), &sk),
|
Event: synctypes.ToClientEvent(dummyEvent, synctypes.FormatAll, sender.String(), &sk, dummyEvent.Unsigned()),
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue