Handle event_format federation in /sync responses

This commit is contained in:
Devon Hudson 2023-08-28 19:02:02 -06:00
parent 1c4ec67bb6
commit 8c4a7f1f2d
No known key found for this signature in database
GPG key ID: CD06B18E77F6A628
15 changed files with 106 additions and 45 deletions

View file

@ -193,7 +193,7 @@ func OnIncomingStateRequest(ctx context.Context, device *userapi.Device, rsAPI a
}
stateEvents = append(
stateEvents,
synctypes.ToClientEvent(ev, synctypes.FormatAll, sender, sk),
synctypes.ToClientEvent(ev, synctypes.FormatAll, sender.String(), sk),
)
}
}

View file

@ -184,7 +184,7 @@ func RedactEvent(ctx context.Context, redactionEvent, redactedEvent gomatrixserv
if err != nil {
return err
}
redactedBecause := synctypes.ToClientEvent(redactionEvent, synctypes.FormatSync, *senderID, redactionEvent.StateKey())
redactedBecause := synctypes.ToClientEvent(redactionEvent, synctypes.FormatSync, senderID.String(), redactionEvent.StateKey())
if err := redactedEvent.SetUnsignedField("redacted_because", redactedBecause); err != nil {
return err
}

View file

@ -592,6 +592,7 @@ func (s *OutputRoomEventConsumer) updateStateEvent(event *rstypes.HeaderedEvent)
return event, nil
}
// TODO: don't change this to userID if event_format == federation
prevEventSender := string(prevEvent.SenderID())
prevUser, err := s.rsAPI.QueryUserIDForSender(s.ctx, *validRoomID, prevEvent.SenderID())
if err == nil && prevUser != nil {

View file

@ -144,6 +144,6 @@ func GetEvent(
}
return util.JSONResponse{
Code: http.StatusOK,
JSON: synctypes.ToClientEvent(events[0], synctypes.FormatAll, *senderUserID, sk),
JSON: synctypes.ToClientEvent(events[0], synctypes.FormatAll, senderUserID.String(), sk),
}
}

View file

@ -146,7 +146,7 @@ func Relations(
}
res.Chunk = append(
res.Chunk,
synctypes.ToClientEvent(event.PDU, synctypes.FormatAll, sender, sk),
synctypes.ToClientEvent(event.PDU, synctypes.FormatAll, sender.String(), sk),
)
}

View file

@ -267,7 +267,7 @@ func Search(req *http.Request, device *api.Device, syncDB storage.Database, fts
ProfileInfo: profileInfos,
},
Rank: eventScore[event.EventID()].Score,
Result: synctypes.ToClientEvent(event, synctypes.FormatAll, sender, sk),
Result: synctypes.ToClientEvent(event, synctypes.FormatAll, sender.String(), sk),
})
roomGroup := groups[event.RoomID()]
roomGroup.Results = append(roomGroup.Results, event.EventID())

View file

@ -63,6 +63,11 @@ func (p *InviteStreamProvider) IncrementalSync(
return from
}
eventFormat := synctypes.FormatSync
if req.Filter.EventFormat == synctypes.EventFormatFederation {
eventFormat = synctypes.FormatSyncFederation
}
for roomID, inviteEvent := range invites {
user := spec.UserID{}
validRoomID, err := spec.NewRoomID(inviteEvent.RoomID())
@ -87,7 +92,7 @@ func (p *InviteStreamProvider) IncrementalSync(
if _, ok := req.IgnoredUsers.List[user.String()]; ok {
continue
}
ir := types.NewInviteResponse(inviteEvent, user, sk)
ir := types.NewInviteResponse(inviteEvent, user, sk, eventFormat)
req.Response.Rooms.Invite[roomID] = ir
}

View file

@ -88,6 +88,11 @@ func (p *PDUStreamProvider) CompleteSync(
req.Log.WithError(err).Error("unable to update event filter with ignored users")
}
eventFormat := synctypes.FormatSync
if req.Filter.EventFormat == synctypes.EventFormatFederation {
eventFormat = synctypes.FormatSyncFederation
}
recentEvents, err := snapshot.RecentEvents(ctx, joinedRoomIDs, r, &eventFilter, true, true)
if err != nil {
return from
@ -105,7 +110,7 @@ func (p *PDUStreamProvider) CompleteSync(
// get the join response for each room
jr, jerr := p.getJoinResponseForCompleteSync(
ctx, snapshot, roomID, &stateFilter, req.WantFullState, req.Device, false,
events.Events, events.Limited,
events.Events, events.Limited, eventFormat,
)
if jerr != nil {
req.Log.WithError(jerr).Error("p.getJoinResponseForCompleteSync failed")
@ -142,7 +147,7 @@ func (p *PDUStreamProvider) CompleteSync(
events := recentEvents[roomID]
jr, err = p.getJoinResponseForCompleteSync(
ctx, snapshot, roomID, &stateFilter, req.WantFullState, req.Device, true,
events.Events, events.Limited,
events.Events, events.Limited, eventFormat,
)
if err != nil {
req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed")
@ -346,6 +351,11 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
return r.From, fmt.Errorf("p.DB.GetBackwardTopologyPos: %w", err)
}
eventFormat := synctypes.FormatSync
if req.Filter.EventFormat == synctypes.EventFormatFederation {
eventFormat = synctypes.FormatSyncFederation
}
// Now that we've filtered the timeline, work out which state events are still
// left. Anything that appears in the filtered timeline will be removed from the
// "state" section and kept in "timeline".
@ -359,7 +369,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
continue
}
var newEvent gomatrixserverlib.PDU
newEvent, err = p.updatePowerLevelEvent(ctx, ev)
newEvent, err = p.updatePowerLevelEvent(ctx, ev, eventFormat)
if err != nil {
return r.From, err
}
@ -383,7 +393,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
// update the powerlevel event for state events
if ev.Version() == gomatrixserverlib.RoomVersionPseudoIDs && ev.Type() == spec.MRoomPowerLevels && ev.StateKeyEquals("") {
var newEvent gomatrixserverlib.PDU
newEvent, err = p.updatePowerLevelEvent(ctx, he)
newEvent, err = p.updatePowerLevelEvent(ctx, he, eventFormat)
if err != nil {
return r.From, err
}
@ -413,13 +423,13 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
}
}
jr.Timeline.PrevBatch = &prevBatch
jr.Timeline.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(events), synctypes.FormatSync, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
jr.Timeline.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(events), eventFormat, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
return p.rsAPI.QueryUserIDForSender(ctx, roomID, senderID)
})
// If we are limited by the filter AND the history visibility filter
// didn't "remove" events, return that the response is limited.
jr.Timeline.Limited = (limited && len(events) == len(recentEvents)) || delta.NewlyJoined
jr.State.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(delta.StateEvents), synctypes.FormatSync, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
jr.State.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(delta.StateEvents), eventFormat, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
return p.rsAPI.QueryUserIDForSender(ctx, roomID, senderID)
})
req.Response.Rooms.Join[delta.RoomID] = jr
@ -428,11 +438,11 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
jr := types.NewJoinResponse()
jr.Timeline.PrevBatch = &prevBatch
// TODO: Apply history visibility on peeked rooms
jr.Timeline.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(recentEvents), synctypes.FormatSync, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
jr.Timeline.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(recentEvents), eventFormat, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
return p.rsAPI.QueryUserIDForSender(ctx, roomID, senderID)
})
jr.Timeline.Limited = limited
jr.State.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(delta.StateEvents), synctypes.FormatSync, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
jr.State.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(delta.StateEvents), eventFormat, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
return p.rsAPI.QueryUserIDForSender(ctx, roomID, senderID)
})
req.Response.Rooms.Peek[delta.RoomID] = jr
@ -443,13 +453,13 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
case spec.Ban:
lr := types.NewLeaveResponse()
lr.Timeline.PrevBatch = &prevBatch
lr.Timeline.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(events), synctypes.FormatSync, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
lr.Timeline.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(events), eventFormat, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
return p.rsAPI.QueryUserIDForSender(ctx, roomID, senderID)
})
// If we are limited by the filter AND the history visibility filter
// didn't "remove" events, return that the response is limited.
lr.Timeline.Limited = limited && len(events) == len(recentEvents)
lr.State.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(delta.StateEvents), synctypes.FormatSync, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
lr.State.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(delta.StateEvents), eventFormat, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
return p.rsAPI.QueryUserIDForSender(ctx, roomID, senderID)
})
req.Response.Rooms.Leave[delta.RoomID] = lr
@ -458,7 +468,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
return latestPosition, nil
}
func (p *PDUStreamProvider) updatePowerLevelEvent(ctx context.Context, ev *rstypes.HeaderedEvent) (gomatrixserverlib.PDU, error) {
func (p *PDUStreamProvider) updatePowerLevelEvent(ctx context.Context, ev *rstypes.HeaderedEvent, eventFormat synctypes.ClientEventFormat) (gomatrixserverlib.PDU, error) {
pls, err := gomatrixserverlib.NewPowerLevelContentFromEvent(ev)
if err != nil {
return nil, err
@ -467,11 +477,14 @@ func (p *PDUStreamProvider) updatePowerLevelEvent(ctx context.Context, ev *rstyp
var userID *spec.UserID
for user, level := range pls.Users {
validRoomID, _ := spec.NewRoomID(ev.RoomID())
if eventFormat != synctypes.FormatSyncFederation {
userID, err = p.rsAPI.QueryUserIDForSender(ctx, *validRoomID, spec.SenderID(user))
if err != nil {
return nil, err
}
newPls[userID.String()] = level
user = userID.String()
}
newPls[user] = level
}
var newPlBytes, newEv []byte
newPlBytes, err = json.Marshal(newPls)
@ -503,11 +516,14 @@ func (p *PDUStreamProvider) updatePowerLevelEvent(ctx context.Context, ev *rstyp
newPls = make(map[string]int64)
for user, level := range pls.Users {
validRoomID, _ := spec.NewRoomID(ev.RoomID())
if eventFormat != synctypes.FormatSyncFederation {
userID, err = p.rsAPI.QueryUserIDForSender(ctx, *validRoomID, spec.SenderID(user))
if err != nil {
return nil, err
}
newPls[userID.String()] = level
user = userID.String()
}
newPls[user] = level
}
newPlBytes, err = json.Marshal(newPls)
if err != nil {
@ -592,6 +608,7 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
isPeek bool,
recentStreamEvents []types.StreamEvent,
limited bool,
eventFormat synctypes.ClientEventFormat,
) (jr *types.JoinResponse, err error) {
jr = types.NewJoinResponse()
// TODO: When filters are added, we may need to call this multiple times to get enough events.
@ -683,7 +700,7 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
if ev.Type() != spec.MRoomPowerLevels || !ev.StateKeyEquals("") {
continue
}
newEvent, err := p.updatePowerLevelEvent(ctx, ev)
newEvent, err := p.updatePowerLevelEvent(ctx, ev, eventFormat)
if err != nil {
return nil, err
}
@ -697,7 +714,7 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
if ev.Type() != spec.MRoomPowerLevels || !ev.StateKeyEquals("") {
continue
}
newEvent, err := p.updatePowerLevelEvent(ctx, ev)
newEvent, err := p.updatePowerLevelEvent(ctx, ev, eventFormat)
if err != nil {
return nil, err
}
@ -705,13 +722,13 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
}
jr.Timeline.PrevBatch = prevBatch
jr.Timeline.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(events), synctypes.FormatSync, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
jr.Timeline.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(events), eventFormat, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
return p.rsAPI.QueryUserIDForSender(ctx, roomID, senderID)
})
// If we are limited by the filter AND the history visibility filter
// didn't "remove" events, return that the response is limited.
jr.Timeline.Limited = limited && len(events) == len(recentEvents)
jr.State.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(stateEvents), synctypes.FormatSync, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
jr.State.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(stateEvents), eventFormat, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
return p.rsAPI.QueryUserIDForSender(ctx, roomID, senderID)
})
return jr, nil

View file

@ -30,8 +30,21 @@ const (
// FormatSync will include only the event keys required by the /sync API. Notably, this
// means the 'room_id' will be missing from the events.
FormatSync
// FormatSyncFederation will include all event keys normally included in federated events.
// This allows clients to request federated formatted events via the /sync API.
FormatSyncFederation
)
// ClientFederationEvent extends a ClientEvent to contain the additional fields present in a
// federation event. Used when the client requests `event_format` of type `federation`.
type ClientFederationEvent struct {
Depth int64 `json:"depth,omitempty"`
PrevEvents []string `json:"prev_events,omitempty"`
AuthEvents []string `json:"auth_events,omitempty"`
Signatures spec.RawJSON `json:"signatures,omitempty"`
Hashes spec.RawJSON `json:"hashes,omitempty"`
}
// ClientEvent is an event which is fit for consumption by clients, in accordance with the specification.
type ClientEvent struct {
Content spec.RawJSON `json:"content"`
@ -44,6 +57,9 @@ type ClientEvent struct {
Type string `json:"type"`
Unsigned spec.RawJSON `json:"unsigned,omitempty"`
Redacts string `json:"redacts,omitempty"`
// Federation Event Fields. Only sent to clients when `event_format` == `federation`.
ClientFederationEvent
}
// ToClientEvents converts server events to client events.
@ -53,6 +69,11 @@ func ToClientEvents(serverEvs []gomatrixserverlib.PDU, format ClientEventFormat,
if se == nil {
continue // TODO: shouldn't happen?
}
if format == FormatSyncFederation {
evs = append(evs, ToClientEvent(se, format, string(se.SenderID()), se.StateKey()))
continue
}
sender := spec.UserID{}
validRoomID, err := spec.NewRoomID(se.RoomID())
if err != nil {
@ -71,16 +92,16 @@ func ToClientEvents(serverEvs []gomatrixserverlib.PDU, format ClientEventFormat,
sk = &skString
}
}
evs = append(evs, ToClientEvent(se, format, sender, sk))
evs = append(evs, ToClientEvent(se, format, sender.String(), sk))
}
return evs
}
// ToClientEvent converts a single server event to a client event.
func ToClientEvent(se gomatrixserverlib.PDU, format ClientEventFormat, sender spec.UserID, stateKey *string) ClientEvent {
func ToClientEvent(se gomatrixserverlib.PDU, format ClientEventFormat, sender string, stateKey *string) ClientEvent {
ce := ClientEvent{
Content: spec.RawJSON(se.Content()),
Sender: sender.String(),
Sender: sender,
Type: se.Type(),
StateKey: stateKey,
Unsigned: spec.RawJSON(se.Unsigned()),
@ -88,12 +109,24 @@ func ToClientEvent(se gomatrixserverlib.PDU, format ClientEventFormat, sender sp
EventID: se.EventID(),
Redacts: se.Redacts(),
}
if format == FormatAll {
switch format {
case FormatAll:
ce.RoomID = se.RoomID()
case FormatSync:
case FormatSyncFederation:
ce.RoomID = se.RoomID()
ce.AuthEvents = se.AuthEventIDs()
ce.PrevEvents = se.PrevEventIDs()
ce.Depth = se.Depth()
// TODO: Set Signatures & Hashes fields
}
if format != FormatSyncFederation {
if se.Version() == gomatrixserverlib.RoomVersionPseudoIDs {
ce.SenderKey = se.SenderID()
}
}
return ce
}
@ -118,7 +151,7 @@ func ToClientEventDefault(userIDQuery spec.UserIDForSender, event gomatrixserver
sk = &skString
}
}
return ToClientEvent(event, FormatAll, sender, sk)
return ToClientEvent(event, FormatAll, sender.String(), sk)
}
// If provided state key is a user ID (state keys beginning with @ are reserved for this purpose)

View file

@ -49,7 +49,7 @@ func TestToClientEvent(t *testing.T) { // nolint: gocyclo
t.Fatalf("failed to create userID: %s", err)
}
sk := ""
ce := ToClientEvent(ev, FormatAll, *userID, &sk)
ce := ToClientEvent(ev, FormatAll, userID.String(), &sk)
if ce.EventID != ev.EventID() {
t.Errorf("ClientEvent.EventID: wanted %s, got %s", ev.EventID(), ce.EventID)
}
@ -109,7 +109,7 @@ func TestToClientFormatSync(t *testing.T) {
t.Fatalf("failed to create userID: %s", err)
}
sk := ""
ce := ToClientEvent(ev, FormatSync, *userID, &sk)
ce := ToClientEvent(ev, FormatSync, userID.String(), &sk)
if ce.RoomID != "" {
t.Errorf("ClientEvent.RoomID: wanted '', got %s", ce.RoomID)
}

View file

@ -78,9 +78,14 @@ type RoomEventFilter struct {
ContainsURL *bool `json:"contains_url,omitempty"`
}
const (
EventFormatClient = "client"
EventFormatFederation = "federation"
)
// Validate checks if the filter contains valid property values
func (filter *Filter) Validate() error {
if filter.EventFormat != "" && filter.EventFormat != "client" && filter.EventFormat != "federation" {
if filter.EventFormat != "" && filter.EventFormat != EventFormatClient && filter.EventFormat != EventFormatFederation {
return errors.New("Bad event_format value. Must be one of [\"client\", \"federation\"]")
}
return nil

View file

@ -539,7 +539,7 @@ type InviteResponse struct {
}
// NewInviteResponse creates an empty response with initialised arrays.
func NewInviteResponse(event *types.HeaderedEvent, userID spec.UserID, stateKey *string) *InviteResponse {
func NewInviteResponse(event *types.HeaderedEvent, userID spec.UserID, stateKey *string, eventFormat synctypes.ClientEventFormat) *InviteResponse {
res := InviteResponse{}
res.InviteState.Events = []json.RawMessage{}
@ -552,7 +552,7 @@ func NewInviteResponse(event *types.HeaderedEvent, userID spec.UserID, stateKey
// Then we'll see if we can create a partial of the invite event itself.
// This is needed for clients to work out *who* sent the invite.
inviteEvent := synctypes.ToClientEvent(event.PDU, synctypes.FormatSync, userID, stateKey)
inviteEvent := synctypes.ToClientEvent(event.PDU, eventFormat, userID.String(), stateKey)
inviteEvent.Unsigned = nil
if ev, err := json.Marshal(inviteEvent); err == nil {
res.InviteState.Events = append(res.InviteState.Events, ev)

View file

@ -72,7 +72,7 @@ func TestNewInviteResponse(t *testing.T) {
skString := skUserID.String()
sk := &skString
res := NewInviteResponse(&types.HeaderedEvent{PDU: ev}, *sender, sk)
res := NewInviteResponse(&types.HeaderedEvent{PDU: ev}, *sender, sk, synctypes.FormatSync)
j, err := json.Marshal(res)
if err != nil {
t.Fatal(err)

View file

@ -321,7 +321,7 @@ func (s *OutputRoomEventConsumer) processMessage(ctx context.Context, event *rst
return fmt.Errorf("queryUserIDForSender: userID unknown for %s", *sk)
}
}
cevent := synctypes.ToClientEvent(event, synctypes.FormatAll, sender, sk)
cevent := synctypes.ToClientEvent(event, synctypes.FormatAll, sender.String(), sk)
var member *localMembership
member, err = newLocalMembership(&cevent)
if err != nil {
@ -566,7 +566,7 @@ func (s *OutputRoomEventConsumer) notifyLocal(ctx context.Context, event *rstype
// UNSPEC: the spec doesn't say this is a ClientEvent, but the
// fields seem to match. room_id should be missing, which
// matches the behaviour of FormatSync.
Event: synctypes.ToClientEvent(event, synctypes.FormatSync, sender, sk),
Event: synctypes.ToClientEvent(event, synctypes.FormatSync, sender.String(), sk),
// TODO: this is per-device, but it's not part of the primary
// key. So inserting one notification per profile tag doesn't
// make sense. What is this supposed to be? Sytests require it

View file

@ -106,7 +106,7 @@ func TestNotifyUserCountsAsync(t *testing.T) {
}
sk := ""
if err := db.InsertNotification(ctx, aliceLocalpart, serverName, dummyEvent.EventID(), 0, nil, &api.Notification{
Event: synctypes.ToClientEvent(dummyEvent, synctypes.FormatAll, *sender, &sk),
Event: synctypes.ToClientEvent(dummyEvent, synctypes.FormatAll, sender.String(), &sk),
}); err != nil {
t.Error(err)
}