From acb505b71751f637531ba9113d4b4daae589bcbc Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 16 Mar 2020 17:29:52 +0000 Subject: [PATCH] Implement gomatrixserverlib.HeaderedEvent in roomserver query API (#912) * Implement gomatrixserverlib.HeaderedEvent, which should allow us to store room version headers along with the event across API boundaries and consumers/producers, and intercept unmarshalling to get the event structure right * Add federationsender to previous --- appservice/consumers/roomserver.go | 4 +- clientapi/consumers/roomserver.go | 4 +- clientapi/routing/getevent.go | 2 +- clientapi/routing/sendevent.go | 2 +- common/events.go | 2 +- federationapi/routing/backfill.go | 4 +- federationapi/routing/events.go | 2 +- federationapi/routing/join.go | 2 +- federationapi/routing/leave.go | 2 +- federationapi/routing/missingevents.go | 4 +- federationapi/routing/send.go | 6 +- federationapi/routing/state.go | 12 ++- federationapi/routing/threepid.go | 2 +- federationsender/consumers/roomserver.go | 5 +- go.mod | 2 +- go.sum | 2 + publicroomsapi/consumers/roomserver.go | 11 ++- roomserver/alias/alias.go | 2 +- roomserver/api/query.go | 14 +-- roomserver/query/query.go | 110 +++++++++++++++++++++-- syncapi/consumers/roomserver.go | 5 +- 21 files changed, 163 insertions(+), 36 deletions(-) diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go index 6d3ea808f..04eb3d992 100644 --- a/appservice/consumers/roomserver.go +++ b/appservice/consumers/roomserver.go @@ -143,7 +143,9 @@ func (s *OutputRoomEventConsumer) lookupMissingStateEvents( return nil, err } - result = append(result, eventResp.Events...) + for _, headeredEvent := range eventResp.Events { + result = append(result, headeredEvent.Event) + } return result, nil } diff --git a/clientapi/consumers/roomserver.go b/clientapi/consumers/roomserver.go index a65281514..e90d56693 100644 --- a/clientapi/consumers/roomserver.go +++ b/clientapi/consumers/roomserver.go @@ -138,7 +138,9 @@ func (s *OutputRoomEventConsumer) lookupStateEvents( return nil, err } - result = append(result, eventResp.Events...) + for _, headeredEvent := range eventResp.Events { + result = append(result, headeredEvent.Event) + } return result, nil } diff --git a/clientapi/routing/getevent.go b/clientapi/routing/getevent.go index 1c1a7add9..2d3152510 100644 --- a/clientapi/routing/getevent.go +++ b/clientapi/routing/getevent.go @@ -66,7 +66,7 @@ func GetEvent( } } - requestedEvent := eventsResp.Events[0] + requestedEvent := eventsResp.Events[0].Event r := getEventRequest{ req: req, diff --git a/clientapi/routing/sendevent.go b/clientapi/routing/sendevent.go index da96c5c65..dbfea09c5 100644 --- a/clientapi/routing/sendevent.go +++ b/clientapi/routing/sendevent.go @@ -144,7 +144,7 @@ func generateSendEvent( // check to see if this user can perform this operation stateEvents := make([]*gomatrixserverlib.Event, len(queryRes.StateEvents)) for i := range queryRes.StateEvents { - stateEvents[i] = &queryRes.StateEvents[i] + stateEvents[i] = &queryRes.StateEvents[i].Event } provider := gomatrixserverlib.NewAuthEvents(stateEvents) if err = gomatrixserverlib.Allowed(*e, &provider); err != nil { diff --git a/common/events.go b/common/events.go index 3c060ee65..ff46e8e0b 100644 --- a/common/events.go +++ b/common/events.go @@ -89,7 +89,7 @@ func AddPrevEventsToEvent( authEvents := gomatrixserverlib.NewAuthEvents(nil) for i := range queryRes.StateEvents { - err = authEvents.AddEvent(&queryRes.StateEvents[i]) + err = authEvents.AddEvent(&queryRes.StateEvents[i].Event) if err != nil { return err } diff --git a/federationapi/routing/backfill.go b/federationapi/routing/backfill.go index b74bfe3ac..a4bc3c67d 100644 --- a/federationapi/routing/backfill.go +++ b/federationapi/routing/backfill.go @@ -84,10 +84,10 @@ func Backfill( // Filter any event that's not from the requested room out. evs := make([]gomatrixserverlib.Event, 0) - var ev gomatrixserverlib.Event + var ev gomatrixserverlib.HeaderedEvent for _, ev = range res.Events { if ev.RoomID() == roomID { - evs = append(evs, ev) + evs = append(evs, ev.Event) } } diff --git a/federationapi/routing/events.go b/federationapi/routing/events.go index c4248022b..a91528b3d 100644 --- a/federationapi/routing/events.go +++ b/federationapi/routing/events.go @@ -80,5 +80,5 @@ func getEvent( return nil, &util.JSONResponse{Code: http.StatusNotFound, JSON: nil} } - return &eventsResponse.Events[0], nil + return &eventsResponse.Events[0].Event, nil } diff --git a/federationapi/routing/join.go b/federationapi/routing/join.go index 369e19aed..7d48c86dc 100644 --- a/federationapi/routing/join.go +++ b/federationapi/routing/join.go @@ -78,7 +78,7 @@ func MakeJoin( // Check that the join is allowed or not stateEvents := make([]*gomatrixserverlib.Event, len(queryRes.StateEvents)) for i := range queryRes.StateEvents { - stateEvents[i] = &queryRes.StateEvents[i] + stateEvents[i] = &queryRes.StateEvents[i].Event } provider := gomatrixserverlib.NewAuthEvents(stateEvents) if err = gomatrixserverlib.Allowed(*event, &provider); err != nil { diff --git a/federationapi/routing/leave.go b/federationapi/routing/leave.go index f27989ff7..3eceb6f25 100644 --- a/federationapi/routing/leave.go +++ b/federationapi/routing/leave.go @@ -76,7 +76,7 @@ func MakeLeave( // Check that the leave is allowed or not stateEvents := make([]*gomatrixserverlib.Event, len(queryRes.StateEvents)) for i := range queryRes.StateEvents { - stateEvents[i] = &queryRes.StateEvents[i] + stateEvents[i] = &queryRes.StateEvents[i].Event } provider := gomatrixserverlib.NewAuthEvents(stateEvents) if err = gomatrixserverlib.Allowed(*event, &provider); err != nil { diff --git a/federationapi/routing/missingevents.go b/federationapi/routing/missingevents.go index 661ec33a4..069bff3dd 100644 --- a/federationapi/routing/missingevents.go +++ b/federationapi/routing/missingevents.go @@ -68,8 +68,8 @@ func GetMissingEvents( // filterEvents returns only those events with matching roomID and having depth greater than minDepth func filterEvents( - events []gomatrixserverlib.Event, minDepth int64, roomID string, -) []gomatrixserverlib.Event { + events []gomatrixserverlib.HeaderedEvent, minDepth int64, roomID string, +) []gomatrixserverlib.HeaderedEvent { ref := events[:0] for _, ev := range events { if ev.Depth() >= minDepth && ev.RoomID() == roomID { diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index 079e121f9..ce9ad8b13 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -162,7 +162,11 @@ func (t *txnReq) processEvent(e gomatrixserverlib.Event) error { } // Check that the event is allowed by the state at the event. - if err := checkAllowedByState(e, stateResp.StateEvents); err != nil { + var events []gomatrixserverlib.Event + for _, headeredEvent := range stateResp.StateEvents { + events = append(events, headeredEvent.Event) + } + if err := checkAllowedByState(e, events); err != nil { return err } diff --git a/federationapi/routing/state.go b/federationapi/routing/state.go index 86cf1cf54..86a1e08d2 100644 --- a/federationapi/routing/state.go +++ b/federationapi/routing/state.go @@ -129,9 +129,17 @@ func getState( return nil, &util.JSONResponse{Code: http.StatusNotFound, JSON: nil} } + var stateEvents, authEvents []gomatrixserverlib.Event + for _, headeredEvent := range response.StateEvents { + stateEvents = append(stateEvents, headeredEvent.Event) + } + for _, headeredEvent := range response.AuthChainEvents { + authEvents = append(authEvents, headeredEvent.Event) + } + return &gomatrixserverlib.RespState{ - StateEvents: response.StateEvents, - AuthEvents: response.AuthChainEvents, + StateEvents: stateEvents, + AuthEvents: authEvents, }, nil } diff --git a/federationapi/routing/threepid.go b/federationapi/routing/threepid.go index 03f3c5bba..18ebc07e1 100644 --- a/federationapi/routing/threepid.go +++ b/federationapi/routing/threepid.go @@ -264,7 +264,7 @@ func buildMembershipEvent( authEvents := gomatrixserverlib.NewAuthEvents(nil) for i := range queryRes.StateEvents { - err = authEvents.AddEvent(&queryRes.StateEvents[i]) + err = authEvents.AddEvent(&queryRes.StateEvents[i].Event) if err != nil { return nil, err } diff --git a/federationsender/consumers/roomserver.go b/federationsender/consumers/roomserver.go index 4568f44dc..39d8d62db 100644 --- a/federationsender/consumers/roomserver.go +++ b/federationsender/consumers/roomserver.go @@ -325,7 +325,10 @@ func (s *OutputRoomEventConsumer) lookupStateEvents( return nil, err } - result = append(result, eventResp.Events...) + for _, headeredEvent := range eventResp.Events { + result = append(result, headeredEvent.Event) + } + missing = missingEventsFrom(result, addsStateEventIDs) if len(missing) != 0 { diff --git a/go.mod b/go.mod index 0728a03cc..9b5aa1414 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/matrix-org/go-http-js-libp2p v0.0.0-20200310180544-7f3fad43b51c github.com/matrix-org/go-sqlite3-js v0.0.0-20200304164012-aa524245b658 github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 - github.com/matrix-org/gomatrixserverlib v0.0.0-20200316144058-cc6847798a3f + github.com/matrix-org/gomatrixserverlib v0.0.0-20200316163031-36bb5f40ae9b github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1 github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 github.com/mattn/go-sqlite3 v2.0.2+incompatible diff --git a/go.sum b/go.sum index af0876489..95d39e5b0 100644 --- a/go.sum +++ b/go.sum @@ -142,6 +142,8 @@ github.com/matrix-org/gomatrixserverlib v0.0.0-20200316100021-ac4a53d49690 h1:aQ github.com/matrix-org/gomatrixserverlib v0.0.0-20200316100021-ac4a53d49690/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI= github.com/matrix-org/gomatrixserverlib v0.0.0-20200316144058-cc6847798a3f h1:JXSvzG4movqXT1KcdX+XZ9HM61m1FW4rIMFKUM9j/Dk= github.com/matrix-org/gomatrixserverlib v0.0.0-20200316144058-cc6847798a3f/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI= +github.com/matrix-org/gomatrixserverlib v0.0.0-20200316163031-36bb5f40ae9b h1:DdVa6Ledhi5dqwYMw8e9zG+kZmRlnWMgsYOxb3NaFOU= +github.com/matrix-org/gomatrixserverlib v0.0.0-20200316163031-36bb5f40ae9b/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI= github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1 h1:osLoFdOy+ChQqVUn2PeTDETFftVkl4w9t/OW18g3lnk= github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1/go.mod h1:cXoYQIENbdWIQHt1SyCo6Bl3C3raHwJ0wgVrXHSqf+A= github.com/matrix-org/util v0.0.0-20171127121716-2e2df66af2f5 h1:W7l5CP4V7wPyPb4tYE11dbmeAOwtFQBTW0rf4OonOS8= diff --git a/publicroomsapi/consumers/roomserver.go b/publicroomsapi/consumers/roomserver.go index 9a817735a..2bbd92b72 100644 --- a/publicroomsapi/consumers/roomserver.go +++ b/publicroomsapi/consumers/roomserver.go @@ -22,6 +22,7 @@ import ( "github.com/matrix-org/dendrite/common/config" "github.com/matrix-org/dendrite/publicroomsapi/storage" "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/gomatrixserverlib" log "github.com/sirupsen/logrus" sarama "gopkg.in/Shopify/sarama.v1" ) @@ -98,5 +99,13 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { return err } - return s.db.UpdateRoomFromEvents(context.TODO(), addQueryRes.Events, remQueryRes.Events) + var addQueryEvents, remQueryEvents []gomatrixserverlib.Event + for _, headeredEvent := range addQueryRes.Events { + addQueryEvents = append(addQueryEvents, headeredEvent.Event) + } + for _, headeredEvent := range remQueryRes.Events { + remQueryEvents = append(remQueryEvents, headeredEvent.Event) + } + + return s.db.UpdateRoomFromEvents(context.TODO(), addQueryEvents, remQueryEvents) } diff --git a/roomserver/alias/alias.go b/roomserver/alias/alias.go index aeaf5ae94..59da35b3f 100644 --- a/roomserver/alias/alias.go +++ b/roomserver/alias/alias.go @@ -229,7 +229,7 @@ func (r *RoomserverAliasAPI) sendUpdatedAliasesEvent( // Add auth events authEvents := gomatrixserverlib.NewAuthEvents(nil) for i := range res.StateEvents { - err = authEvents.AddEvent(&res.StateEvents[i]) + err = authEvents.AddEvent(&res.StateEvents[i].Event) if err != nil { return err } diff --git a/roomserver/api/query.go b/roomserver/api/query.go index e1850e723..24cca03ec 100644 --- a/roomserver/api/query.go +++ b/roomserver/api/query.go @@ -50,7 +50,7 @@ type QueryLatestEventsAndStateResponse struct { // This list will be in an arbitrary order. // These are used to set the auth_events when sending an event. // These are used to check whether the event is allowed. - StateEvents []gomatrixserverlib.Event `json:"state_events"` + StateEvents []gomatrixserverlib.HeaderedEvent `json:"state_events"` // The depth of the latest events. // This is one greater than the maximum depth of the latest events. // This is used to set the depth when sending an event. @@ -79,7 +79,7 @@ type QueryStateAfterEventsResponse struct { PrevEventsExist bool `json:"prev_events_exist"` // The state events requested. // This list will be in an arbitrary order. - StateEvents []gomatrixserverlib.Event `json:"state_events"` + StateEvents []gomatrixserverlib.HeaderedEvent `json:"state_events"` } // QueryEventsByIDRequest is a request to QueryEventsByID @@ -99,7 +99,7 @@ type QueryEventsByIDResponse struct { // fails to read it from the database then it will fail // the entire request. // This list will be in an arbitrary order. - Events []gomatrixserverlib.Event `json:"events"` + Events []gomatrixserverlib.HeaderedEvent `json:"events"` } // QueryMembershipForUserRequest is a request to QueryMembership @@ -186,7 +186,7 @@ type QueryMissingEventsRequest struct { // QueryMissingEventsResponse is a response to QueryMissingEvents type QueryMissingEventsResponse struct { // Missing events, arbritrary order. - Events []gomatrixserverlib.Event `json:"events"` + Events []gomatrixserverlib.HeaderedEvent `json:"events"` } // QueryStateAndAuthChainRequest is a request to QueryStateAndAuthChain @@ -212,8 +212,8 @@ type QueryStateAndAuthChainResponse struct { PrevEventsExist bool `json:"prev_events_exist"` // The state and auth chain events that were requested. // The lists will be in an arbitrary order. - StateEvents []gomatrixserverlib.Event `json:"state_events"` - AuthChainEvents []gomatrixserverlib.Event `json:"auth_chain_events"` + StateEvents []gomatrixserverlib.HeaderedEvent `json:"state_events"` + AuthChainEvents []gomatrixserverlib.HeaderedEvent `json:"auth_chain_events"` } // QueryBackfillRequest is a request to QueryBackfill. @@ -229,7 +229,7 @@ type QueryBackfillRequest struct { // QueryBackfillResponse is a response to QueryBackfill. type QueryBackfillResponse struct { // Missing events, arbritrary order. - Events []gomatrixserverlib.Event `json:"events"` + Events []gomatrixserverlib.HeaderedEvent `json:"events"` } // QueryServersInRoomAtEventRequest is a request to QueryServersInRoomAtEvent diff --git a/roomserver/query/query.go b/roomserver/query/query.go index 83f89777f..7f69699f7 100644 --- a/roomserver/query/query.go +++ b/roomserver/query/query.go @@ -89,6 +89,10 @@ type RoomserverQueryAPIDatabase interface { EventStateKeys( context.Context, []types.EventStateKeyNID, ) (map[types.EventStateKeyNID]string, error) + // Look up the room version for a given room. + GetRoomVersionForRoom( + ctx context.Context, roomNID types.RoomNID, + ) (gomatrixserverlib.RoomVersion, error) } // RoomserverQueryAPI is an implementation of api.RoomserverQueryAPI @@ -116,6 +120,12 @@ func (r *RoomserverQueryAPI) QueryLatestEventsAndState( return nil } response.RoomExists = true + + roomVersion, err := r.DB.GetRoomVersionForRoom(ctx, roomNID) + if err != nil { + return err + } + var currentStateSnapshotNID types.StateSnapshotNID response.LatestEvents, currentStateSnapshotNID, response.Depth, err = r.DB.LatestEventIDs(ctx, roomNID) @@ -136,7 +146,15 @@ func (r *RoomserverQueryAPI) QueryLatestEventsAndState( return err } - response.StateEvents = stateEvents + for _, event := range stateEvents { + response.StateEvents = append(response.StateEvents, gomatrixserverlib.HeaderedEvent{ + EventHeader: gomatrixserverlib.EventHeader{ + RoomVersion: roomVersion, + }, + Event: event, + }) + } + return nil } @@ -161,6 +179,11 @@ func (r *RoomserverQueryAPI) QueryStateAfterEvents( } response.RoomExists = true + roomVersion, err := r.DB.GetRoomVersionForRoom(ctx, roomNID) + if err != nil { + return err + } + prevStates, err := r.DB.StateAtEventIDs(ctx, request.PrevEventIDs) if err != nil { switch err.(type) { @@ -185,7 +208,15 @@ func (r *RoomserverQueryAPI) QueryStateAfterEvents( return err } - response.StateEvents = stateEvents + for _, event := range stateEvents { + response.StateEvents = append(response.StateEvents, gomatrixserverlib.HeaderedEvent{ + EventHeader: gomatrixserverlib.EventHeader{ + RoomVersion: roomVersion, + }, + Event: event, + }) + } + return nil } @@ -212,7 +243,18 @@ func (r *RoomserverQueryAPI) QueryEventsByID( return err } - response.Events = events + for _, event := range events { + // TODO: Room version here + roomVersion := gomatrixserverlib.RoomVersionV1 + + response.Events = append(response.Events, gomatrixserverlib.HeaderedEvent{ + EventHeader: gomatrixserverlib.EventHeader{ + RoomVersion: roomVersion, + }, + Event: event, + }) + } + return nil } @@ -486,10 +528,18 @@ func (r *RoomserverQueryAPI) QueryMissingEvents( return err } - response.Events = make([]gomatrixserverlib.Event, 0, len(loadedEvents)-len(eventsToFilter)) + response.Events = make([]gomatrixserverlib.HeaderedEvent, 0, len(loadedEvents)-len(eventsToFilter)) for _, event := range loadedEvents { if !eventsToFilter[event.EventID()] { - response.Events = append(response.Events, event) + // TODO: Room version here + roomVersion := gomatrixserverlib.RoomVersionV1 + + response.Events = append(response.Events, gomatrixserverlib.HeaderedEvent{ + EventHeader: gomatrixserverlib.EventHeader{ + RoomVersion: roomVersion, + }, + Event: event, + }) } } @@ -525,7 +575,24 @@ func (r *RoomserverQueryAPI) QueryBackfill( } // Retrieve events from the list that was filled previously. - response.Events, err = r.loadEvents(ctx, resultNIDs) + var loadedEvents []gomatrixserverlib.Event + loadedEvents, err = r.loadEvents(ctx, resultNIDs) + if err != nil { + return err + } + + for _, event := range loadedEvents { + // TODO: Room version here + roomVersion := gomatrixserverlib.RoomVersionV1 + + response.Events = append(response.Events, gomatrixserverlib.HeaderedEvent{ + EventHeader: gomatrixserverlib.EventHeader{ + RoomVersion: roomVersion, + }, + Event: event, + }) + } + return err } @@ -635,8 +702,35 @@ func (r *RoomserverQueryAPI) QueryStateAndAuthChain( return err } - response.StateEvents = stateEvents - response.AuthChainEvents, err = getAuthChain(ctx, r.DB, request.AuthEventIDs) + authEvents, err := getAuthChain(ctx, r.DB, request.AuthEventIDs) + if err != nil { + return err + } + + for _, event := range stateEvents { + // TODO: Room version here + roomVersion := gomatrixserverlib.RoomVersionV1 + + response.StateEvents = append(response.StateEvents, gomatrixserverlib.HeaderedEvent{ + EventHeader: gomatrixserverlib.EventHeader{ + RoomVersion: roomVersion, + }, + Event: event, + }) + } + + for _, event := range authEvents { + // TODO: Room version here + roomVersion := gomatrixserverlib.RoomVersionV1 + + response.AuthChainEvents = append(response.AuthChainEvents, gomatrixserverlib.HeaderedEvent{ + EventHeader: gomatrixserverlib.EventHeader{ + RoomVersion: roomVersion, + }, + Event: event, + }) + } + return err } diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index 5dbef4b7d..976dd8e8f 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -229,7 +229,10 @@ func (s *OutputRoomEventConsumer) lookupStateEvents( return nil, err } - result = append(result, eventResp.Events...) + for _, headeredEvent := range eventResp.Events { + result = append(result, headeredEvent.Event) + } + missing = missingEventsFrom(result, addsStateEventIDs) if len(missing) != 0 {