From bb2ab62cbf02abb6f600e6eb39fde67aa2ff3215 Mon Sep 17 00:00:00 2001 From: devonh Date: Thu, 31 Aug 2023 15:33:38 +0000 Subject: [PATCH] Handle event_format federation in /sync responses (#3192) --- clientapi/routing/state.go | 2 +- internal/eventutil/events.go | 2 +- syncapi/consumers/roomserver.go | 11 +- syncapi/routing/getevent.go | 2 +- syncapi/routing/relations.go | 2 +- syncapi/routing/search.go | 2 +- syncapi/streams/stream_invite.go | 7 +- syncapi/streams/stream_pdu.go | 67 ++-- syncapi/syncapi_test.go | 150 ++++++++ syncapi/synctypes/clientevent.go | 79 ++++- syncapi/synctypes/clientevent_test.go | 473 ++++++++++++++++++++++++-- syncapi/synctypes/filter.go | 7 +- syncapi/types/types.go | 11 +- syncapi/types/types_test.go | 2 +- userapi/consumers/roomserver.go | 4 +- userapi/util/notify_test.go | 2 +- 16 files changed, 739 insertions(+), 84 deletions(-) diff --git a/clientapi/routing/state.go b/clientapi/routing/state.go index 7648dc474..d7f0b40f8 100644 --- a/clientapi/routing/state.go +++ b/clientapi/routing/state.go @@ -193,7 +193,7 @@ func OnIncomingStateRequest(ctx context.Context, device *userapi.Device, rsAPI a } stateEvents = append( stateEvents, - synctypes.ToClientEvent(ev, synctypes.FormatAll, sender, sk), + synctypes.ToClientEvent(ev, synctypes.FormatAll, sender.String(), sk, ev.Unsigned()), ) } } diff --git a/internal/eventutil/events.go b/internal/eventutil/events.go index 56ee576a0..aa99e5860 100644 --- a/internal/eventutil/events.go +++ b/internal/eventutil/events.go @@ -184,7 +184,7 @@ func RedactEvent(ctx context.Context, redactionEvent, redactedEvent gomatrixserv if err != nil { return err } - redactedBecause := synctypes.ToClientEvent(redactionEvent, synctypes.FormatSync, *senderID, redactionEvent.StateKey()) + redactedBecause := synctypes.ToClientEvent(redactionEvent, synctypes.FormatSync, senderID.String(), redactionEvent.StateKey(), redactionEvent.Unsigned()) if err := redactedEvent.SetUnsignedField("redacted_because", redactedBecause); err != nil { return err } diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index 1e87aee99..9df5e0f9c 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -33,6 +33,7 @@ import ( "github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/streams" + "github.com/matrix-org/dendrite/syncapi/synctypes" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib/spec" "github.com/nats-io/nats.go" @@ -592,16 +593,10 @@ func (s *OutputRoomEventConsumer) updateStateEvent(event *rstypes.HeaderedEvent) return event, nil } - prevEventSender := string(prevEvent.SenderID()) - prevUser, err := s.rsAPI.QueryUserIDForSender(s.ctx, *validRoomID, prevEvent.SenderID()) - if err == nil && prevUser != nil { - prevEventSender = prevUser.String() - } - - prev := types.PrevEventRef{ + prev := synctypes.PrevEventRef{ PrevContent: prevEvent.Content(), ReplacesState: prevEvent.EventID(), - PrevSenderID: prevEventSender, + PrevSenderID: string(prevEvent.SenderID()), } event.PDU, err = event.SetUnsigned(prev) diff --git a/syncapi/routing/getevent.go b/syncapi/routing/getevent.go index 4fa282f3b..bf0f9bf8c 100644 --- a/syncapi/routing/getevent.go +++ b/syncapi/routing/getevent.go @@ -144,6 +144,6 @@ func GetEvent( } return util.JSONResponse{ Code: http.StatusOK, - JSON: synctypes.ToClientEvent(events[0], synctypes.FormatAll, *senderUserID, sk), + JSON: synctypes.ToClientEvent(events[0], synctypes.FormatAll, senderUserID.String(), sk, events[0].Unsigned()), } } diff --git a/syncapi/routing/relations.go b/syncapi/routing/relations.go index e3d1069a0..b451a7e2e 100644 --- a/syncapi/routing/relations.go +++ b/syncapi/routing/relations.go @@ -146,7 +146,7 @@ func Relations( } res.Chunk = append( res.Chunk, - synctypes.ToClientEvent(event.PDU, synctypes.FormatAll, sender, sk), + synctypes.ToClientEvent(event.PDU, synctypes.FormatAll, sender.String(), sk, event.Unsigned()), ) } diff --git a/syncapi/routing/search.go b/syncapi/routing/search.go index d892b604a..7d5c061b7 100644 --- a/syncapi/routing/search.go +++ b/syncapi/routing/search.go @@ -267,7 +267,7 @@ func Search(req *http.Request, device *api.Device, syncDB storage.Database, fts ProfileInfo: profileInfos, }, Rank: eventScore[event.EventID()].Score, - Result: synctypes.ToClientEvent(event, synctypes.FormatAll, sender, sk), + Result: synctypes.ToClientEvent(event, synctypes.FormatAll, sender.String(), sk, event.Unsigned()), }) roomGroup := groups[event.RoomID()] roomGroup.Results = append(roomGroup.Results, event.EventID()) diff --git a/syncapi/streams/stream_invite.go b/syncapi/streams/stream_invite.go index 7c29d84ae..1ce3346f4 100644 --- a/syncapi/streams/stream_invite.go +++ b/syncapi/streams/stream_invite.go @@ -63,6 +63,11 @@ func (p *InviteStreamProvider) IncrementalSync( return from } + eventFormat := synctypes.FormatSync + if req.Filter.EventFormat == synctypes.EventFormatFederation { + eventFormat = synctypes.FormatSyncFederation + } + for roomID, inviteEvent := range invites { user := spec.UserID{} validRoomID, err := spec.NewRoomID(inviteEvent.RoomID()) @@ -87,7 +92,7 @@ func (p *InviteStreamProvider) IncrementalSync( if _, ok := req.IgnoredUsers.List[user.String()]; ok { continue } - ir := types.NewInviteResponse(inviteEvent, user, sk) + ir := types.NewInviteResponse(inviteEvent, user, sk, eventFormat) req.Response.Rooms.Invite[roomID] = ir } diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index 4622c21ad..ee524f726 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -88,6 +88,11 @@ func (p *PDUStreamProvider) CompleteSync( req.Log.WithError(err).Error("unable to update event filter with ignored users") } + eventFormat := synctypes.FormatSync + if req.Filter.EventFormat == synctypes.EventFormatFederation { + eventFormat = synctypes.FormatSyncFederation + } + recentEvents, err := snapshot.RecentEvents(ctx, joinedRoomIDs, r, &eventFilter, true, true) if err != nil { return from @@ -105,7 +110,7 @@ func (p *PDUStreamProvider) CompleteSync( // get the join response for each room jr, jerr := p.getJoinResponseForCompleteSync( ctx, snapshot, roomID, &stateFilter, req.WantFullState, req.Device, false, - events.Events, events.Limited, + events.Events, events.Limited, eventFormat, ) if jerr != nil { req.Log.WithError(jerr).Error("p.getJoinResponseForCompleteSync failed") @@ -142,7 +147,7 @@ func (p *PDUStreamProvider) CompleteSync( events := recentEvents[roomID] jr, err = p.getJoinResponseForCompleteSync( ctx, snapshot, roomID, &stateFilter, req.WantFullState, req.Device, true, - events.Events, events.Limited, + events.Events, events.Limited, eventFormat, ) if err != nil { req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed") @@ -346,6 +351,11 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( return r.From, fmt.Errorf("p.DB.GetBackwardTopologyPos: %w", err) } + eventFormat := synctypes.FormatSync + if req.Filter.EventFormat == synctypes.EventFormatFederation { + eventFormat = synctypes.FormatSyncFederation + } + // Now that we've filtered the timeline, work out which state events are still // left. Anything that appears in the filtered timeline will be removed from the // "state" section and kept in "timeline". @@ -359,7 +369,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( continue } var newEvent gomatrixserverlib.PDU - newEvent, err = p.updatePowerLevelEvent(ctx, ev) + newEvent, err = p.updatePowerLevelEvent(ctx, ev, eventFormat) if err != nil { return r.From, err } @@ -383,7 +393,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( // update the powerlevel event for state events if ev.Version() == gomatrixserverlib.RoomVersionPseudoIDs && ev.Type() == spec.MRoomPowerLevels && ev.StateKeyEquals("") { var newEvent gomatrixserverlib.PDU - newEvent, err = p.updatePowerLevelEvent(ctx, he) + newEvent, err = p.updatePowerLevelEvent(ctx, he, eventFormat) if err != nil { return r.From, err } @@ -413,13 +423,13 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( } } jr.Timeline.PrevBatch = &prevBatch - jr.Timeline.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(events), synctypes.FormatSync, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) { + jr.Timeline.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(events), eventFormat, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) { return p.rsAPI.QueryUserIDForSender(ctx, roomID, senderID) }) // If we are limited by the filter AND the history visibility filter // didn't "remove" events, return that the response is limited. jr.Timeline.Limited = (limited && len(events) == len(recentEvents)) || delta.NewlyJoined - jr.State.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(delta.StateEvents), synctypes.FormatSync, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) { + jr.State.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(delta.StateEvents), eventFormat, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) { return p.rsAPI.QueryUserIDForSender(ctx, roomID, senderID) }) req.Response.Rooms.Join[delta.RoomID] = jr @@ -428,11 +438,11 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( jr := types.NewJoinResponse() jr.Timeline.PrevBatch = &prevBatch // TODO: Apply history visibility on peeked rooms - jr.Timeline.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(recentEvents), synctypes.FormatSync, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) { + jr.Timeline.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(recentEvents), eventFormat, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) { return p.rsAPI.QueryUserIDForSender(ctx, roomID, senderID) }) jr.Timeline.Limited = limited - jr.State.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(delta.StateEvents), synctypes.FormatSync, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) { + jr.State.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(delta.StateEvents), eventFormat, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) { return p.rsAPI.QueryUserIDForSender(ctx, roomID, senderID) }) req.Response.Rooms.Peek[delta.RoomID] = jr @@ -443,13 +453,13 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( case spec.Ban: lr := types.NewLeaveResponse() lr.Timeline.PrevBatch = &prevBatch - lr.Timeline.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(events), synctypes.FormatSync, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) { + lr.Timeline.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(events), eventFormat, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) { return p.rsAPI.QueryUserIDForSender(ctx, roomID, senderID) }) // If we are limited by the filter AND the history visibility filter // didn't "remove" events, return that the response is limited. lr.Timeline.Limited = limited && len(events) == len(recentEvents) - lr.State.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(delta.StateEvents), synctypes.FormatSync, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) { + lr.State.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(delta.StateEvents), eventFormat, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) { return p.rsAPI.QueryUserIDForSender(ctx, roomID, senderID) }) req.Response.Rooms.Leave[delta.RoomID] = lr @@ -458,7 +468,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( return latestPosition, nil } -func (p *PDUStreamProvider) updatePowerLevelEvent(ctx context.Context, ev *rstypes.HeaderedEvent) (gomatrixserverlib.PDU, error) { +func (p *PDUStreamProvider) updatePowerLevelEvent(ctx context.Context, ev *rstypes.HeaderedEvent, eventFormat synctypes.ClientEventFormat) (gomatrixserverlib.PDU, error) { pls, err := gomatrixserverlib.NewPowerLevelContentFromEvent(ev) if err != nil { return nil, err @@ -467,11 +477,14 @@ func (p *PDUStreamProvider) updatePowerLevelEvent(ctx context.Context, ev *rstyp var userID *spec.UserID for user, level := range pls.Users { validRoomID, _ := spec.NewRoomID(ev.RoomID()) - userID, err = p.rsAPI.QueryUserIDForSender(ctx, *validRoomID, spec.SenderID(user)) - if err != nil { - return nil, err + if eventFormat != synctypes.FormatSyncFederation { + userID, err = p.rsAPI.QueryUserIDForSender(ctx, *validRoomID, spec.SenderID(user)) + if err != nil { + return nil, err + } + user = userID.String() } - newPls[userID.String()] = level + newPls[user] = level } var newPlBytes, newEv []byte newPlBytes, err = json.Marshal(newPls) @@ -487,7 +500,7 @@ func (p *PDUStreamProvider) updatePowerLevelEvent(ctx context.Context, ev *rstyp prevContent := gjson.GetBytes(ev.JSON(), "unsigned.prev_content") if !prevContent.Exists() { var evNew gomatrixserverlib.PDU - evNew, err = gomatrixserverlib.MustGetRoomVersion(gomatrixserverlib.RoomVersionPseudoIDs).NewEventFromTrustedJSON(newEv, false) + evNew, err = gomatrixserverlib.MustGetRoomVersion(ev.Version()).NewEventFromTrustedJSON(newEv, false) if err != nil { return nil, err } @@ -503,11 +516,14 @@ func (p *PDUStreamProvider) updatePowerLevelEvent(ctx context.Context, ev *rstyp newPls = make(map[string]int64) for user, level := range pls.Users { validRoomID, _ := spec.NewRoomID(ev.RoomID()) - userID, err = p.rsAPI.QueryUserIDForSender(ctx, *validRoomID, spec.SenderID(user)) - if err != nil { - return nil, err + if eventFormat != synctypes.FormatSyncFederation { + userID, err = p.rsAPI.QueryUserIDForSender(ctx, *validRoomID, spec.SenderID(user)) + if err != nil { + return nil, err + } + user = userID.String() } - newPls[userID.String()] = level + newPls[user] = level } newPlBytes, err = json.Marshal(newPls) if err != nil { @@ -519,7 +535,7 @@ func (p *PDUStreamProvider) updatePowerLevelEvent(ctx context.Context, ev *rstyp } var evNew gomatrixserverlib.PDU - evNew, err = gomatrixserverlib.MustGetRoomVersion(gomatrixserverlib.RoomVersionPseudoIDs).NewEventFromTrustedJSONWithEventID(ev.EventID(), newEv, false) + evNew, err = gomatrixserverlib.MustGetRoomVersion(ev.Version()).NewEventFromTrustedJSONWithEventID(ev.EventID(), newEv, false) if err != nil { return nil, err } @@ -592,6 +608,7 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync( isPeek bool, recentStreamEvents []types.StreamEvent, limited bool, + eventFormat synctypes.ClientEventFormat, ) (jr *types.JoinResponse, err error) { jr = types.NewJoinResponse() // TODO: When filters are added, we may need to call this multiple times to get enough events. @@ -683,7 +700,7 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync( if ev.Type() != spec.MRoomPowerLevels || !ev.StateKeyEquals("") { continue } - newEvent, err := p.updatePowerLevelEvent(ctx, ev) + newEvent, err := p.updatePowerLevelEvent(ctx, ev, eventFormat) if err != nil { return nil, err } @@ -697,7 +714,7 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync( if ev.Type() != spec.MRoomPowerLevels || !ev.StateKeyEquals("") { continue } - newEvent, err := p.updatePowerLevelEvent(ctx, ev) + newEvent, err := p.updatePowerLevelEvent(ctx, ev, eventFormat) if err != nil { return nil, err } @@ -705,13 +722,13 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync( } jr.Timeline.PrevBatch = prevBatch - jr.Timeline.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(events), synctypes.FormatSync, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) { + jr.Timeline.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(events), eventFormat, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) { return p.rsAPI.QueryUserIDForSender(ctx, roomID, senderID) }) // If we are limited by the filter AND the history visibility filter // didn't "remove" events, return that the response is limited. jr.Timeline.Limited = limited && len(events) == len(recentEvents) - jr.State.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(stateEvents), synctypes.FormatSync, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) { + jr.State.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(stateEvents), eventFormat, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) { return p.rsAPI.QueryUserIDForSender(ctx, roomID, senderID) }) return jr, nil diff --git a/syncapi/syncapi_test.go b/syncapi/syncapi_test.go index ea1183cd2..f29719953 100644 --- a/syncapi/syncapi_test.go +++ b/syncapi/syncapi_test.go @@ -209,6 +209,156 @@ func testSyncAccessTokens(t *testing.T, dbType test.DBType) { } } +func TestSyncAPIEventFormatPowerLevels(t *testing.T) { + test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { + testSyncEventFormatPowerLevels(t, dbType) + }) +} + +func testSyncEventFormatPowerLevels(t *testing.T, dbType test.DBType) { + user := test.NewUser(t) + setRoomVersion := func(t *testing.T, r *test.Room) { r.Version = gomatrixserverlib.RoomVersionPseudoIDs } + room := test.NewRoom(t, user, setRoomVersion) + alice := userapi.Device{ + ID: "ALICEID", + UserID: user.ID, + AccessToken: "ALICE_BEARER_TOKEN", + DisplayName: "Alice", + AccountType: userapi.AccountTypeUser, + } + + room.CreateAndInsert(t, user, spec.MRoomPowerLevels, gomatrixserverlib.PowerLevelContent{ + Users: map[string]int64{ + user.ID: 100, + }, + }, test.WithStateKey("")) + + cfg, processCtx, close := testrig.CreateConfig(t, dbType) + routers := httputil.NewRouters() + cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions) + caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics) + natsInstance := jetstream.NATSInstance{} + defer close() + + jsctx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream) + defer jetstream.DeleteAllStreams(jsctx, &cfg.Global.JetStream) + msgs := toNATSMsgs(t, cfg, room.Events()...) + AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches, caching.DisableMetrics) + testrig.MustPublishMsgs(t, jsctx, msgs...) + + testCases := []struct { + name string + wantCode int + wantJoinedRooms []string + eventFormat synctypes.ClientEventFormat + }{ + { + name: "Client format", + wantCode: 200, + wantJoinedRooms: []string{room.ID}, + eventFormat: synctypes.FormatSync, + }, + { + name: "Federation format", + wantCode: 200, + wantJoinedRooms: []string{room.ID}, + eventFormat: synctypes.FormatSyncFederation, + }, + } + + syncUntil(t, routers, alice.AccessToken, false, func(syncBody string) bool { + // wait for the last sent eventID to come down sync + path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(event_id=="%s")`, room.ID, room.Events()[len(room.Events())-1].EventID()) + return gjson.Get(syncBody, path).Exists() + }) + + for _, tc := range testCases { + format := "" + if tc.eventFormat == synctypes.FormatSyncFederation { + format = "federation" + } + + w := httptest.NewRecorder() + routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{ + "access_token": alice.AccessToken, + "timeout": "0", + "filter": fmt.Sprintf(`{"event_format":"%s"}`, format), + }))) + if w.Code != tc.wantCode { + t.Fatalf("%s: got HTTP %d want %d", tc.name, w.Code, tc.wantCode) + } + if tc.wantJoinedRooms != nil { + var res types.Response + if err := json.NewDecoder(w.Body).Decode(&res); err != nil { + t.Fatalf("%s: failed to decode response body: %s", tc.name, err) + } + if len(res.Rooms.Join) != len(tc.wantJoinedRooms) { + t.Errorf("%s: got %v joined rooms, want %v.\nResponse: %+v", tc.name, len(res.Rooms.Join), len(tc.wantJoinedRooms), res) + } + t.Logf("res: %+v", res.Rooms.Join[room.ID]) + + gotEventIDs := make([]string, len(res.Rooms.Join[room.ID].Timeline.Events)) + for i, ev := range res.Rooms.Join[room.ID].Timeline.Events { + gotEventIDs[i] = ev.EventID + } + test.AssertEventIDsEqual(t, gotEventIDs, room.Events()) + + event := room.CreateAndInsert(t, user, spec.MRoomPowerLevels, gomatrixserverlib.PowerLevelContent{ + Users: map[string]int64{ + user.ID: 100, + "@otheruser:localhost": 50, + }, + }, test.WithStateKey("")) + + msgs := toNATSMsgs(t, cfg, event) + testrig.MustPublishMsgs(t, jsctx, msgs...) + + syncUntil(t, routers, alice.AccessToken, false, func(syncBody string) bool { + // wait for the last sent eventID to come down sync + path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(event_id=="%s")`, room.ID, room.Events()[len(room.Events())-1].EventID()) + return gjson.Get(syncBody, path).Exists() + }) + + since := res.NextBatch.String() + w := httptest.NewRecorder() + routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{ + "access_token": alice.AccessToken, + "timeout": "0", + "filter": fmt.Sprintf(`{"event_format":"%s"}`, format), + "since": since, + }))) + if w.Code != 200 { + t.Errorf("since=%s got HTTP %d want 200", since, w.Code) + } + + res = *types.NewResponse() + if err := json.NewDecoder(w.Body).Decode(&res); err != nil { + t.Errorf("failed to decode response body: %s", err) + } + if len(res.Rooms.Join) != 1 { + t.Fatalf("since=%s got %d joined rooms, want 1", since, len(res.Rooms.Join)) + } + gotEventIDs = make([]string, len(res.Rooms.Join[room.ID].Timeline.Events)) + for j, ev := range res.Rooms.Join[room.ID].Timeline.Events { + gotEventIDs[j] = ev.EventID + if ev.Type == spec.MRoomPowerLevels { + content := gomatrixserverlib.PowerLevelContent{} + err := json.Unmarshal(ev.Content, &content) + if err != nil { + t.Errorf("failed to unmarshal power level content: %s", err) + } + otherUserLevel := content.UserLevel("@otheruser:localhost") + if otherUserLevel != 50 { + t.Errorf("Expected user PL of %d but got %d", 50, otherUserLevel) + } + } + } + events := []*rstypes.HeaderedEvent{room.Events()[len(room.Events())-1]} + test.AssertEventIDsEqual(t, gotEventIDs, events) + } + } +} + // Tests what happens when we create a room and then /sync before all events from /createRoom have // been sent to the syncapi func TestSyncAPICreateRoomSyncEarly(t *testing.T) { diff --git a/syncapi/synctypes/clientevent.go b/syncapi/synctypes/clientevent.go index a78aea1c6..7e5b1c1bc 100644 --- a/syncapi/synctypes/clientevent.go +++ b/syncapi/synctypes/clientevent.go @@ -16,12 +16,21 @@ package synctypes import ( + "encoding/json" "fmt" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib/spec" + "github.com/sirupsen/logrus" ) +// PrevEventRef represents a reference to a previous event in a state event upgrade +type PrevEventRef struct { + PrevContent json.RawMessage `json:"prev_content"` + ReplacesState string `json:"replaces_state"` + PrevSenderID string `json:"prev_sender"` +} + type ClientEventFormat int const ( @@ -30,8 +39,21 @@ const ( // FormatSync will include only the event keys required by the /sync API. Notably, this // means the 'room_id' will be missing from the events. FormatSync + // FormatSyncFederation will include all event keys normally included in federated events. + // This allows clients to request federated formatted events via the /sync API. + FormatSyncFederation ) +// ClientFederationFields extends a ClientEvent to contain the additional fields present in a +// federation event. Used when the client requests `event_format` of type `federation`. +type ClientFederationFields struct { + Depth int64 `json:"depth,omitempty"` + PrevEvents []string `json:"prev_events,omitempty"` + AuthEvents []string `json:"auth_events,omitempty"` + Signatures spec.RawJSON `json:"signatures,omitempty"` + Hashes spec.RawJSON `json:"hashes,omitempty"` +} + // ClientEvent is an event which is fit for consumption by clients, in accordance with the specification. type ClientEvent struct { Content spec.RawJSON `json:"content"` @@ -44,6 +66,9 @@ type ClientEvent struct { Type string `json:"type"` Unsigned spec.RawJSON `json:"unsigned,omitempty"` Redacts string `json:"redacts,omitempty"` + + // Only sent to clients when `event_format` == `federation`. + ClientFederationFields } // ToClientEvents converts server events to client events. @@ -53,6 +78,11 @@ func ToClientEvents(serverEvs []gomatrixserverlib.PDU, format ClientEventFormat, if se == nil { continue // TODO: shouldn't happen? } + if format == FormatSyncFederation { + evs = append(evs, ToClientEvent(se, format, string(se.SenderID()), se.StateKey(), spec.RawJSON(se.Unsigned()))) + continue + } + sender := spec.UserID{} validRoomID, err := spec.NewRoomID(se.RoomID()) if err != nil { @@ -71,28 +101,61 @@ func ToClientEvents(serverEvs []gomatrixserverlib.PDU, format ClientEventFormat, sk = &skString } } - evs = append(evs, ToClientEvent(se, format, sender, sk)) + + unsigned := se.Unsigned() + var prev PrevEventRef + if err := json.Unmarshal(se.Unsigned(), &prev); err == nil && prev.PrevSenderID != "" { + prevUserID, err := userIDForSender(*validRoomID, spec.SenderID(prev.PrevSenderID)) + if err == nil && userID != nil { + prev.PrevSenderID = prevUserID.String() + } else { + errString := "userID unknown" + if err != nil { + errString = err.Error() + } + logrus.Warnf("Failed to find userID for prev_sender in ClientEvent: %s", errString) + // NOTE: Not much can be done here, so leave the previous value in place. + } + unsigned, err = json.Marshal(prev) + if err != nil { + logrus.Errorf("Failed to marshal unsigned content for ClientEvent: %s", err.Error()) + continue + } + } + evs = append(evs, ToClientEvent(se, format, sender.String(), sk, spec.RawJSON(unsigned))) } return evs } // ToClientEvent converts a single server event to a client event. -func ToClientEvent(se gomatrixserverlib.PDU, format ClientEventFormat, sender spec.UserID, stateKey *string) ClientEvent { +func ToClientEvent(se gomatrixserverlib.PDU, format ClientEventFormat, sender string, stateKey *string, unsigned spec.RawJSON) ClientEvent { ce := ClientEvent{ Content: spec.RawJSON(se.Content()), - Sender: sender.String(), + Sender: sender, Type: se.Type(), StateKey: stateKey, - Unsigned: spec.RawJSON(se.Unsigned()), + Unsigned: unsigned, OriginServerTS: se.OriginServerTS(), EventID: se.EventID(), Redacts: se.Redacts(), } - if format == FormatAll { + + switch format { + case FormatAll: ce.RoomID = se.RoomID() + case FormatSync: + case FormatSyncFederation: + ce.RoomID = se.RoomID() + ce.AuthEvents = se.AuthEventIDs() + ce.PrevEvents = se.PrevEventIDs() + ce.Depth = se.Depth() + // TODO: Set Signatures & Hashes fields } - if se.Version() == gomatrixserverlib.RoomVersionPseudoIDs { - ce.SenderKey = se.SenderID() + + if format != FormatSyncFederation { + if se.Version() == gomatrixserverlib.RoomVersionPseudoIDs { + ce.SenderKey = se.SenderID() + } } return ce } @@ -118,7 +181,7 @@ func ToClientEventDefault(userIDQuery spec.UserIDForSender, event gomatrixserver sk = &skString } } - return ToClientEvent(event, FormatAll, sender, sk) + return ToClientEvent(event, FormatAll, sender.String(), sk, event.Unsigned()) } // If provided state key is a user ID (state keys beginning with @ are reserved for this purpose) diff --git a/syncapi/synctypes/clientevent_test.go b/syncapi/synctypes/clientevent_test.go index 63c65b2af..202c185f1 100644 --- a/syncapi/synctypes/clientevent_test.go +++ b/syncapi/synctypes/clientevent_test.go @@ -18,12 +18,69 @@ package synctypes import ( "bytes" "encoding/json" + "fmt" + "reflect" "testing" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib/spec" ) +const testSenderID = "testSenderID" +const testUserID = "@test:localhost" + +type EventFieldsToVerify struct { + EventID string + Type string + OriginServerTS spec.Timestamp + StateKey *string + Content spec.RawJSON + Unsigned spec.RawJSON + Sender string + Depth int64 + PrevEvents []string + AuthEvents []string +} + +func verifyEventFields(t *testing.T, got EventFieldsToVerify, want EventFieldsToVerify) { + if got.EventID != want.EventID { + t.Errorf("ClientEvent.EventID: wanted %s, got %s", want.EventID, got.EventID) + } + if got.OriginServerTS != want.OriginServerTS { + t.Errorf("ClientEvent.OriginServerTS: wanted %d, got %d", want.OriginServerTS, got.OriginServerTS) + } + if got.StateKey == nil && want.StateKey != nil { + t.Errorf("ClientEvent.StateKey: no state key present when one was wanted: %s", *want.StateKey) + } + if got.StateKey != nil && want.StateKey == nil { + t.Errorf("ClientEvent.StateKey: state key present when one was not wanted: %s", *got.StateKey) + } + if got.StateKey != nil && want.StateKey != nil && *got.StateKey != *want.StateKey { + t.Errorf("ClientEvent.StateKey: wanted %s, got %s", *want.StateKey, *got.StateKey) + } + if got.Type != want.Type { + t.Errorf("ClientEvent.Type: wanted %s, got %s", want.Type, got.Type) + } + if !bytes.Equal(got.Content, want.Content) { + t.Errorf("ClientEvent.Content: wanted %s, got %s", string(want.Content), string(got.Content)) + } + if !bytes.Equal(got.Unsigned, want.Unsigned) { + t.Errorf("ClientEvent.Unsigned: wanted %s, got %s", string(want.Unsigned), string(got.Unsigned)) + } + if got.Sender != want.Sender { + t.Errorf("ClientEvent.Sender: wanted %s, got %s", want.Sender, got.Sender) + } + if got.Depth != want.Depth { + t.Errorf("ClientEvent.Depth: wanted %d, got %d", want.Depth, got.Depth) + } + if !reflect.DeepEqual(got.PrevEvents, want.PrevEvents) { + t.Errorf("ClientEvent.PrevEvents: wanted %v, got %v", want.PrevEvents, got.PrevEvents) + } + if !reflect.DeepEqual(got.AuthEvents, want.AuthEvents) { + t.Errorf("ClientEvent.AuthEvents: wanted %v, got %v", want.AuthEvents, got.AuthEvents) + } +} + func TestToClientEvent(t *testing.T) { // nolint: gocyclo ev, err := gomatrixserverlib.MustGetRoomVersion(gomatrixserverlib.RoomVersionV1).NewEventFromTrustedJSON([]byte(`{ "type": "m.room.name", @@ -49,28 +106,28 @@ func TestToClientEvent(t *testing.T) { // nolint: gocyclo t.Fatalf("failed to create userID: %s", err) } sk := "" - ce := ToClientEvent(ev, FormatAll, *userID, &sk) - if ce.EventID != ev.EventID() { - t.Errorf("ClientEvent.EventID: wanted %s, got %s", ev.EventID(), ce.EventID) - } - if ce.OriginServerTS != ev.OriginServerTS() { - t.Errorf("ClientEvent.OriginServerTS: wanted %d, got %d", ev.OriginServerTS(), ce.OriginServerTS) - } - if ce.StateKey == nil || *ce.StateKey != "" { - t.Errorf("ClientEvent.StateKey: wanted '', got %v", ce.StateKey) - } - if ce.Type != ev.Type() { - t.Errorf("ClientEvent.Type: wanted %s, got %s", ev.Type(), ce.Type) - } - if !bytes.Equal(ce.Content, ev.Content()) { - t.Errorf("ClientEvent.Content: wanted %s, got %s", string(ev.Content()), string(ce.Content)) - } - if !bytes.Equal(ce.Unsigned, ev.Unsigned()) { - t.Errorf("ClientEvent.Unsigned: wanted %s, got %s", string(ev.Unsigned()), string(ce.Unsigned)) - } - if ce.Sender != userID.String() { - t.Errorf("ClientEvent.Sender: wanted %s, got %s", userID.String(), ce.Sender) - } + ce := ToClientEvent(ev, FormatAll, userID.String(), &sk, ev.Unsigned()) + + verifyEventFields(t, + EventFieldsToVerify{ + EventID: ce.EventID, + Type: ce.Type, + OriginServerTS: ce.OriginServerTS, + StateKey: ce.StateKey, + Content: ce.Content, + Unsigned: ce.Unsigned, + Sender: ce.Sender, + }, + EventFieldsToVerify{ + EventID: ev.EventID(), + Type: ev.Type(), + OriginServerTS: ev.OriginServerTS(), + StateKey: &sk, + Content: ev.Content(), + Unsigned: ev.Unsigned(), + Sender: userID.String(), + }) + j, err := json.Marshal(ce) if err != nil { t.Fatalf("failed to Marshal ClientEvent: %s", err) @@ -109,8 +166,378 @@ func TestToClientFormatSync(t *testing.T) { t.Fatalf("failed to create userID: %s", err) } sk := "" - ce := ToClientEvent(ev, FormatSync, *userID, &sk) + ce := ToClientEvent(ev, FormatSync, userID.String(), &sk, ev.Unsigned()) if ce.RoomID != "" { t.Errorf("ClientEvent.RoomID: wanted '', got %s", ce.RoomID) } } + +func TestToClientEventFormatSyncFederation(t *testing.T) { // nolint: gocyclo + ev, err := gomatrixserverlib.MustGetRoomVersion(gomatrixserverlib.RoomVersionV10).NewEventFromTrustedJSON([]byte(`{ + "type": "m.room.name", + "state_key": "", + "event_id": "$test:localhost", + "room_id": "!test:localhost", + "sender": "@test:localhost", + "content": { + "name": "Hello World" + }, + "origin_server_ts": 123456, + "unsigned": { + "prev_content": { + "name": "Goodbye World" + } + }, + "depth": 8, + "prev_events": [ + "$f597Tp0Mm1PPxEgiprzJc2cZAjVhxCxACOGuwJb33Oo" + ], + "auth_events": [ + "$Bj0ZGgX6VTqAQdqKH4ZG3l6rlbxY3rZlC5D3MeuK1OQ", + "$QsMs6A1PUVUhgSvmHBfpqEYJPgv4DXt96r8P2AK7iXQ", + "$tBteKtlnFiwlmPJsv0wkKTMEuUVWpQH89H7Xskxve1Q" + ] + }`), false) + if err != nil { + t.Fatalf("failed to create Event: %s", err) + } + userID, err := spec.NewUserID("@test:localhost", true) + if err != nil { + t.Fatalf("failed to create userID: %s", err) + } + sk := "" + ce := ToClientEvent(ev, FormatSyncFederation, userID.String(), &sk, ev.Unsigned()) + + verifyEventFields(t, + EventFieldsToVerify{ + EventID: ce.EventID, + Type: ce.Type, + OriginServerTS: ce.OriginServerTS, + StateKey: ce.StateKey, + Content: ce.Content, + Unsigned: ce.Unsigned, + Sender: ce.Sender, + Depth: ce.Depth, + PrevEvents: ce.PrevEvents, + AuthEvents: ce.AuthEvents, + }, + EventFieldsToVerify{ + EventID: ev.EventID(), + Type: ev.Type(), + OriginServerTS: ev.OriginServerTS(), + StateKey: &sk, + Content: ev.Content(), + Unsigned: ev.Unsigned(), + Sender: userID.String(), + Depth: ev.Depth(), + PrevEvents: ev.PrevEventIDs(), + AuthEvents: ev.AuthEventIDs(), + }) +} + +func userIDForSender(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) { + if senderID == "unknownSenderID" { + return nil, fmt.Errorf("Cannot find userID") + } + return spec.NewUserID(testUserID, true) +} + +func TestToClientEventsFormatSyncFederation(t *testing.T) { // nolint: gocyclo + ev, err := gomatrixserverlib.MustGetRoomVersion(gomatrixserverlib.RoomVersionPseudoIDs).NewEventFromTrustedJSON([]byte(`{ + "type": "m.room.name", + "state_key": "testSenderID", + "event_id": "$test:localhost", + "room_id": "!test:localhost", + "sender": "testSenderID", + "content": { + "name": "Hello World" + }, + "origin_server_ts": 123456, + "unsigned": { + "prev_content": { + "name": "Goodbye World" + } + }, + "depth": 8, + "prev_events": [ + "$f597Tp0Mm1PPxEgiprzJc2cZAjVhxCxACOGuwJb33Oo" + ], + "auth_events": [ + "$Bj0ZGgX6VTqAQdqKH4ZG3l6rlbxY3rZlC5D3MeuK1OQ", + "$QsMs6A1PUVUhgSvmHBfpqEYJPgv4DXt96r8P2AK7iXQ", + "$tBteKtlnFiwlmPJsv0wkKTMEuUVWpQH89H7Xskxve1Q" + ] + }`), false) + if err != nil { + t.Fatalf("failed to create Event: %s", err) + } + ev2, err := gomatrixserverlib.MustGetRoomVersion(gomatrixserverlib.RoomVersionPseudoIDs).NewEventFromTrustedJSON([]byte(`{ + "type": "m.room.name", + "state_key": "testSenderID", + "event_id": "$test2:localhost", + "room_id": "!test:localhost", + "sender": "testSenderID", + "content": { + "name": "Hello World 2" + }, + "origin_server_ts": 1234567, + "unsigned": { + "prev_content": { + "name": "Goodbye World 2" + }, + "prev_sender": "testSenderID" + }, + "depth": 9, + "prev_events": [ + "$f597Tp0Mm1PPxEgiprzJc2cZAjVhxCxACOGuwJb33Oo" + ], + "auth_events": [ + "$Bj0ZGgX6VTqAQdqKH4ZG3l6rlbxY3rZlC5D3MeuK1OQ", + "$QsMs6A1PUVUhgSvmHBfpqEYJPgv4DXt96r8P2AK7iXQ", + "$tBteKtlnFiwlmPJsv0wkKTMEuUVWpQH89H7Xskxve1Q" + ] + }`), false) + if err != nil { + t.Fatalf("failed to create Event: %s", err) + } + + clientEvents := ToClientEvents([]gomatrixserverlib.PDU{ev, ev2}, FormatSyncFederation, userIDForSender) + ce := clientEvents[0] + sk := testSenderID + verifyEventFields(t, + EventFieldsToVerify{ + EventID: ce.EventID, + Type: ce.Type, + OriginServerTS: ce.OriginServerTS, + StateKey: ce.StateKey, + Content: ce.Content, + Unsigned: ce.Unsigned, + Sender: ce.Sender, + Depth: ce.Depth, + PrevEvents: ce.PrevEvents, + AuthEvents: ce.AuthEvents, + }, + EventFieldsToVerify{ + EventID: ev.EventID(), + Type: ev.Type(), + OriginServerTS: ev.OriginServerTS(), + StateKey: &sk, + Content: ev.Content(), + Unsigned: ev.Unsigned(), + Sender: testSenderID, + Depth: ev.Depth(), + PrevEvents: ev.PrevEventIDs(), + AuthEvents: ev.AuthEventIDs(), + }) + + ce2 := clientEvents[1] + verifyEventFields(t, + EventFieldsToVerify{ + EventID: ce2.EventID, + Type: ce2.Type, + OriginServerTS: ce2.OriginServerTS, + StateKey: ce2.StateKey, + Content: ce2.Content, + Unsigned: ce2.Unsigned, + Sender: ce2.Sender, + Depth: ce2.Depth, + PrevEvents: ce2.PrevEvents, + AuthEvents: ce2.AuthEvents, + }, + EventFieldsToVerify{ + EventID: ev2.EventID(), + Type: ev2.Type(), + OriginServerTS: ev2.OriginServerTS(), + StateKey: &sk, + Content: ev2.Content(), + Unsigned: ev2.Unsigned(), + Sender: testSenderID, + Depth: ev2.Depth(), + PrevEvents: ev2.PrevEventIDs(), + AuthEvents: ev2.AuthEventIDs(), + }) +} + +func TestToClientEventsFormatSync(t *testing.T) { // nolint: gocyclo + ev, err := gomatrixserverlib.MustGetRoomVersion(gomatrixserverlib.RoomVersionPseudoIDs).NewEventFromTrustedJSON([]byte(`{ + "type": "m.room.name", + "state_key": "testSenderID", + "event_id": "$test:localhost", + "room_id": "!test:localhost", + "sender": "testSenderID", + "content": { + "name": "Hello World" + }, + "origin_server_ts": 123456, + "unsigned": { + "prev_content": { + "name": "Goodbye World" + } + } + }`), false) + if err != nil { + t.Fatalf("failed to create Event: %s", err) + } + ev2, err := gomatrixserverlib.MustGetRoomVersion(gomatrixserverlib.RoomVersionPseudoIDs).NewEventFromTrustedJSON([]byte(`{ + "type": "m.room.name", + "state_key": "testSenderID", + "event_id": "$test2:localhost", + "room_id": "!test:localhost", + "sender": "testSenderID", + "content": { + "name": "Hello World 2" + }, + "origin_server_ts": 1234567, + "unsigned": { + "prev_content": { + "name": "Goodbye World 2" + }, + "prev_sender": "testSenderID" + }, + "depth": 9 + }`), false) + if err != nil { + t.Fatalf("failed to create Event: %s", err) + } + + clientEvents := ToClientEvents([]gomatrixserverlib.PDU{ev, ev2}, FormatSync, userIDForSender) + ce := clientEvents[0] + sk := testUserID + verifyEventFields(t, + EventFieldsToVerify{ + EventID: ce.EventID, + Type: ce.Type, + OriginServerTS: ce.OriginServerTS, + StateKey: ce.StateKey, + Content: ce.Content, + Unsigned: ce.Unsigned, + Sender: ce.Sender, + }, + EventFieldsToVerify{ + EventID: ev.EventID(), + Type: ev.Type(), + OriginServerTS: ev.OriginServerTS(), + StateKey: &sk, + Content: ev.Content(), + Unsigned: ev.Unsigned(), + Sender: testUserID, + }) + + var prev PrevEventRef + prev.PrevContent = []byte(`{"name": "Goodbye World 2"}`) + prev.PrevSenderID = testUserID + expectedUnsigned, _ := json.Marshal(prev) + + ce2 := clientEvents[1] + verifyEventFields(t, + EventFieldsToVerify{ + EventID: ce2.EventID, + Type: ce2.Type, + OriginServerTS: ce2.OriginServerTS, + StateKey: ce2.StateKey, + Content: ce2.Content, + Unsigned: ce2.Unsigned, + Sender: ce2.Sender, + }, + EventFieldsToVerify{ + EventID: ev2.EventID(), + Type: ev2.Type(), + OriginServerTS: ev2.OriginServerTS(), + StateKey: &sk, + Content: ev2.Content(), + Unsigned: expectedUnsigned, + Sender: testUserID, + }) +} + +func TestToClientEventsFormatSyncUnknownPrevSender(t *testing.T) { // nolint: gocyclo + ev, err := gomatrixserverlib.MustGetRoomVersion(gomatrixserverlib.RoomVersionPseudoIDs).NewEventFromTrustedJSON([]byte(`{ + "type": "m.room.name", + "state_key": "testSenderID", + "event_id": "$test:localhost", + "room_id": "!test:localhost", + "sender": "testSenderID", + "content": { + "name": "Hello World" + }, + "origin_server_ts": 123456, + "unsigned": { + "prev_content": { + "name": "Goodbye World" + } + } + }`), false) + if err != nil { + t.Fatalf("failed to create Event: %s", err) + } + ev2, err := gomatrixserverlib.MustGetRoomVersion(gomatrixserverlib.RoomVersionPseudoIDs).NewEventFromTrustedJSON([]byte(`{ + "type": "m.room.name", + "state_key": "testSenderID", + "event_id": "$test2:localhost", + "room_id": "!test:localhost", + "sender": "testSenderID", + "content": { + "name": "Hello World 2" + }, + "origin_server_ts": 1234567, + "unsigned": { + "prev_content": { + "name": "Goodbye World 2" + }, + "prev_sender": "unknownSenderID" + }, + "depth": 9 + }`), false) + if err != nil { + t.Fatalf("failed to create Event: %s", err) + } + + clientEvents := ToClientEvents([]gomatrixserverlib.PDU{ev, ev2}, FormatSync, userIDForSender) + ce := clientEvents[0] + sk := testUserID + verifyEventFields(t, + EventFieldsToVerify{ + EventID: ce.EventID, + Type: ce.Type, + OriginServerTS: ce.OriginServerTS, + StateKey: ce.StateKey, + Content: ce.Content, + Unsigned: ce.Unsigned, + Sender: ce.Sender, + }, + EventFieldsToVerify{ + EventID: ev.EventID(), + Type: ev.Type(), + OriginServerTS: ev.OriginServerTS(), + StateKey: &sk, + Content: ev.Content(), + Unsigned: ev.Unsigned(), + Sender: testUserID, + }) + + var prev PrevEventRef + prev.PrevContent = []byte(`{"name": "Goodbye World 2"}`) + prev.PrevSenderID = "unknownSenderID" + expectedUnsigned, _ := json.Marshal(prev) + + ce2 := clientEvents[1] + verifyEventFields(t, + EventFieldsToVerify{ + EventID: ce2.EventID, + Type: ce2.Type, + OriginServerTS: ce2.OriginServerTS, + StateKey: ce2.StateKey, + Content: ce2.Content, + Unsigned: ce2.Unsigned, + Sender: ce2.Sender, + }, + EventFieldsToVerify{ + EventID: ev2.EventID(), + Type: ev2.Type(), + OriginServerTS: ev2.OriginServerTS(), + StateKey: &sk, + Content: ev2.Content(), + Unsigned: expectedUnsigned, + Sender: testUserID, + }) +} diff --git a/syncapi/synctypes/filter.go b/syncapi/synctypes/filter.go index c994ddb96..8998d4433 100644 --- a/syncapi/synctypes/filter.go +++ b/syncapi/synctypes/filter.go @@ -78,9 +78,14 @@ type RoomEventFilter struct { ContainsURL *bool `json:"contains_url,omitempty"` } +const ( + EventFormatClient = "client" + EventFormatFederation = "federation" +) + // Validate checks if the filter contains valid property values func (filter *Filter) Validate() error { - if filter.EventFormat != "" && filter.EventFormat != "client" && filter.EventFormat != "federation" { + if filter.EventFormat != "" && filter.EventFormat != EventFormatClient && filter.EventFormat != EventFormatFederation { return errors.New("Bad event_format value. Must be one of [\"client\", \"federation\"]") } return nil diff --git a/syncapi/types/types.go b/syncapi/types/types.go index cb3c362d5..b90c128c3 100644 --- a/syncapi/types/types.go +++ b/syncapi/types/types.go @@ -339,13 +339,6 @@ func NewStreamTokenFromString(tok string) (token StreamingToken, err error) { return token, nil } -// PrevEventRef represents a reference to a previous event in a state event upgrade -type PrevEventRef struct { - PrevContent json.RawMessage `json:"prev_content"` - ReplacesState string `json:"replaces_state"` - PrevSenderID string `json:"prev_sender"` -} - type DeviceLists struct { Changed []string `json:"changed,omitempty"` Left []string `json:"left,omitempty"` @@ -539,7 +532,7 @@ type InviteResponse struct { } // NewInviteResponse creates an empty response with initialised arrays. -func NewInviteResponse(event *types.HeaderedEvent, userID spec.UserID, stateKey *string) *InviteResponse { +func NewInviteResponse(event *types.HeaderedEvent, userID spec.UserID, stateKey *string, eventFormat synctypes.ClientEventFormat) *InviteResponse { res := InviteResponse{} res.InviteState.Events = []json.RawMessage{} @@ -552,7 +545,7 @@ func NewInviteResponse(event *types.HeaderedEvent, userID spec.UserID, stateKey // Then we'll see if we can create a partial of the invite event itself. // This is needed for clients to work out *who* sent the invite. - inviteEvent := synctypes.ToClientEvent(event.PDU, synctypes.FormatSync, userID, stateKey) + inviteEvent := synctypes.ToClientEvent(event.PDU, eventFormat, userID.String(), stateKey, event.Unsigned()) inviteEvent.Unsigned = nil if ev, err := json.Marshal(inviteEvent); err == nil { res.InviteState.Events = append(res.InviteState.Events, ev) diff --git a/syncapi/types/types_test.go b/syncapi/types/types_test.go index c1b7f70bd..a79b9fc5d 100644 --- a/syncapi/types/types_test.go +++ b/syncapi/types/types_test.go @@ -72,7 +72,7 @@ func TestNewInviteResponse(t *testing.T) { skString := skUserID.String() sk := &skString - res := NewInviteResponse(&types.HeaderedEvent{PDU: ev}, *sender, sk) + res := NewInviteResponse(&types.HeaderedEvent{PDU: ev}, *sender, sk, synctypes.FormatSync) j, err := json.Marshal(res) if err != nil { t.Fatal(err) diff --git a/userapi/consumers/roomserver.go b/userapi/consumers/roomserver.go index a88b2129d..8863d258a 100644 --- a/userapi/consumers/roomserver.go +++ b/userapi/consumers/roomserver.go @@ -321,7 +321,7 @@ func (s *OutputRoomEventConsumer) processMessage(ctx context.Context, event *rst return fmt.Errorf("queryUserIDForSender: userID unknown for %s", *sk) } } - cevent := synctypes.ToClientEvent(event, synctypes.FormatAll, sender, sk) + cevent := synctypes.ToClientEvent(event, synctypes.FormatAll, sender.String(), sk, event.Unsigned()) var member *localMembership member, err = newLocalMembership(&cevent) if err != nil { @@ -566,7 +566,7 @@ func (s *OutputRoomEventConsumer) notifyLocal(ctx context.Context, event *rstype // UNSPEC: the spec doesn't say this is a ClientEvent, but the // fields seem to match. room_id should be missing, which // matches the behaviour of FormatSync. - Event: synctypes.ToClientEvent(event, synctypes.FormatSync, sender, sk), + Event: synctypes.ToClientEvent(event, synctypes.FormatSync, sender.String(), sk, event.Unsigned()), // TODO: this is per-device, but it's not part of the primary // key. So inserting one notification per profile tag doesn't // make sense. What is this supposed to be? Sytests require it diff --git a/userapi/util/notify_test.go b/userapi/util/notify_test.go index 3017069bc..27e77cf7a 100644 --- a/userapi/util/notify_test.go +++ b/userapi/util/notify_test.go @@ -106,7 +106,7 @@ func TestNotifyUserCountsAsync(t *testing.T) { } sk := "" if err := db.InsertNotification(ctx, aliceLocalpart, serverName, dummyEvent.EventID(), 0, nil, &api.Notification{ - Event: synctypes.ToClientEvent(dummyEvent, synctypes.FormatAll, *sender, &sk), + Event: synctypes.ToClientEvent(dummyEvent, synctypes.FormatAll, sender.String(), &sk, dummyEvent.Unsigned()), }); err != nil { t.Error(err) }