diff --git a/clientapi/producers/roomserver.go b/clientapi/producers/roomserver.go index 553b590eb..69961cda1 100644 --- a/clientapi/producers/roomserver.go +++ b/clientapi/producers/roomserver.go @@ -37,31 +37,14 @@ func NewRoomserverProducer(inputAPI api.RoomserverInputAPI, queryAPI api.Roomser // SendEvents writes the given events to the roomserver input log. The events are written with KindNew. func (c *RoomserverProducer) SendEvents( - ctx context.Context, events []gomatrixserverlib.Event, sendAsServer gomatrixserverlib.ServerName, + ctx context.Context, events []gomatrixserverlib.HeaderedEvent, sendAsServer gomatrixserverlib.ServerName, txnID *api.TransactionID, ) (string, error) { - roomVersions := make(map[string]gomatrixserverlib.RoomVersion) - ires := make([]api.InputRoomEvent, len(events)) for i, event := range events { - var roomVersion gomatrixserverlib.RoomVersion - roomID := event.RoomID() - if v, ok := roomVersions[roomID]; ok { - roomVersion = v - } else { - verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID} - verRes := api.QueryRoomVersionForRoomResponse{} - err := c.QueryAPI.QueryRoomVersionForRoom(ctx, &verReq, &verRes) - if err != nil { - return "", err - } - roomVersion = verRes.RoomVersion - roomVersions[roomID] = roomVersion - } - ires[i] = api.InputRoomEvent{ Kind: api.KindNew, - Event: event.Headered(roomVersion), + Event: event, AuthEventIDs: event.AuthEventIDs(), SendAsServer: string(sendAsServer), TransactionID: txnID, @@ -73,25 +56,18 @@ func (c *RoomserverProducer) SendEvents( // SendEventWithState writes an event with KindNew to the roomserver input log // with the state at the event as KindOutlier before it. func (c *RoomserverProducer) SendEventWithState( - ctx context.Context, state gomatrixserverlib.RespState, event gomatrixserverlib.Event, + ctx context.Context, state gomatrixserverlib.RespState, event gomatrixserverlib.HeaderedEvent, ) error { outliers, err := state.Events() if err != nil { return err } - verReq := api.QueryRoomVersionForRoomRequest{RoomID: event.RoomID()} - verRes := api.QueryRoomVersionForRoomResponse{} - err = c.QueryAPI.QueryRoomVersionForRoom(ctx, &verReq, &verRes) - if err != nil { - return err - } - ires := make([]api.InputRoomEvent, len(outliers)+1) for i, outlier := range outliers { ires[i] = api.InputRoomEvent{ Kind: api.KindOutlier, - Event: outlier.Headered(verRes.RoomVersion), + Event: outlier.Headered(event.RoomVersion), AuthEventIDs: outlier.AuthEventIDs(), } } @@ -103,7 +79,7 @@ func (c *RoomserverProducer) SendEventWithState( ires[len(outliers)] = api.InputRoomEvent{ Kind: api.KindNew, - Event: event.Headered(verRes.RoomVersion), + Event: event, AuthEventIDs: event.AuthEventIDs(), HasState: true, StateEventIDs: stateEventIDs, diff --git a/clientapi/routing/createroom.go b/clientapi/routing/createroom.go index f8a7bfa2a..fb96fe01b 100644 --- a/clientapi/routing/createroom.go +++ b/clientapi/routing/createroom.go @@ -237,7 +237,7 @@ func createRoom( historyVisibility = historyVisibilityShared } - var builtEvents []gomatrixserverlib.Event + var builtEvents []gomatrixserverlib.HeaderedEvent // send events into the room in order of: // 1- m.room.create @@ -311,7 +311,7 @@ func createRoom( } // Add the event to the list of auth events - builtEvents = append(builtEvents, *ev) + builtEvents = append(builtEvents, (*ev).Headered(roomVersion)) err = authEvents.AddEvent(ev) if err != nil { util.GetLogger(req.Context()).WithError(err).Error("authEvents.AddEvent failed") diff --git a/clientapi/routing/joinroom.go b/clientapi/routing/joinroom.go index 3af34a668..432fabe41 100644 --- a/clientapi/routing/joinroom.go +++ b/clientapi/routing/joinroom.go @@ -242,7 +242,12 @@ func (r joinRoomReq) joinRoomUsingServers( queryRes := roomserverAPI.QueryLatestEventsAndStateResponse{} event, err := common.BuildEvent(r.req.Context(), &eb, r.cfg, r.evTime, r.queryAPI, &queryRes) if err == nil { - if _, err = r.producer.SendEvents(r.req.Context(), []gomatrixserverlib.Event{*event}, r.cfg.Matrix.ServerName, nil); err != nil { + if _, err = r.producer.SendEvents( + r.req.Context(), + []gomatrixserverlib.HeaderedEvent{(*event).Headered(queryRes.RoomVersion)}, + r.cfg.Matrix.ServerName, + nil, + ); err != nil { util.GetLogger(r.req.Context()).WithError(err).Error("r.producer.SendEvents failed") return jsonerror.InternalServerError() } @@ -363,7 +368,9 @@ func (r joinRoomReq) joinRoomUsingServer(roomID string, server gomatrixserverlib } if err = r.producer.SendEventWithState( - r.req.Context(), gomatrixserverlib.RespState(respSendJoin.RespState), event, + r.req.Context(), + gomatrixserverlib.RespState(respSendJoin.RespState), + event.Headered(respMakeJoin.RoomVersion), ); err != nil { util.GetLogger(r.req.Context()).WithError(err).Error("gomatrixserverlib.RespState failed") res := jsonerror.InternalServerError() diff --git a/clientapi/routing/membership.go b/clientapi/routing/membership.go index 61d9e8e39..9f386b718 100644 --- a/clientapi/routing/membership.go +++ b/clientapi/routing/membership.go @@ -29,6 +29,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/threepid" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common/config" + "github.com/matrix-org/dendrite/roomserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" @@ -45,6 +46,15 @@ func SendMembership( queryAPI roomserverAPI.RoomserverQueryAPI, asAPI appserviceAPI.AppServiceQueryAPI, producer *producers.RoomserverProducer, ) util.JSONResponse { + verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID} + verRes := api.QueryRoomVersionForRoomResponse{} + if err := queryAPI.QueryRoomVersionForRoom(req.Context(), &verReq, &verRes); err != nil { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.UnsupportedRoomVersion(err.Error()), + } + } + var body threepid.MembershipRequest if reqErr := httputil.UnmarshalJSONRequest(req, &body); reqErr != nil { return *reqErr @@ -95,7 +105,10 @@ func SendMembership( } if _, err := producer.SendEvents( - req.Context(), []gomatrixserverlib.Event{*event}, cfg.Matrix.ServerName, nil, + req.Context(), + []gomatrixserverlib.HeaderedEvent{(*event).Headered(verRes.RoomVersion)}, + cfg.Matrix.ServerName, + nil, ); err != nil { util.GetLogger(req.Context()).WithError(err).Error("producer.SendEvents failed") return jsonerror.InternalServerError() diff --git a/clientapi/routing/profile.go b/clientapi/routing/profile.go index b1909e7ab..d32fe268f 100644 --- a/clientapi/routing/profile.go +++ b/clientapi/routing/profile.go @@ -338,10 +338,16 @@ func buildMembershipEvents( memberships []authtypes.Membership, newProfile authtypes.Profile, userID string, cfg *config.Dendrite, evTime time.Time, queryAPI api.RoomserverQueryAPI, -) ([]gomatrixserverlib.Event, error) { - evs := []gomatrixserverlib.Event{} +) ([]gomatrixserverlib.HeaderedEvent, error) { + evs := []gomatrixserverlib.HeaderedEvent{} for _, membership := range memberships { + verReq := api.QueryRoomVersionForRoomRequest{RoomID: membership.EventID} + verRes := api.QueryRoomVersionForRoomResponse{} + if err := queryAPI.QueryRoomVersionForRoom(ctx, &verReq, &verRes); err != nil { + return []gomatrixserverlib.HeaderedEvent{}, err + } + builder := gomatrixserverlib.EventBuilder{ Sender: userID, RoomID: membership.RoomID, @@ -365,7 +371,7 @@ func buildMembershipEvents( return nil, err } - evs = append(evs, *event) + evs = append(evs, (*event).Headered(verRes.RoomVersion)) } return evs, nil diff --git a/clientapi/routing/sendevent.go b/clientapi/routing/sendevent.go index dbfea09c5..a031c5790 100644 --- a/clientapi/routing/sendevent.go +++ b/clientapi/routing/sendevent.go @@ -48,6 +48,15 @@ func SendEvent( producer *producers.RoomserverProducer, txnCache *transactions.Cache, ) util.JSONResponse { + verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID} + verRes := api.QueryRoomVersionForRoomResponse{} + if err := queryAPI.QueryRoomVersionForRoom(req.Context(), &verReq, &verRes); err != nil { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.UnsupportedRoomVersion(err.Error()), + } + } + if txnID != nil { // Try to fetch response from transactionsCache if res, ok := txnCache.FetchTransaction(device.AccessToken, *txnID); ok { @@ -71,7 +80,12 @@ func SendEvent( // pass the new event to the roomserver and receive the correct event ID // event ID in case of duplicate transaction is discarded eventID, err := producer.SendEvents( - req.Context(), []gomatrixserverlib.Event{*e}, cfg.Matrix.ServerName, txnAndSessionID, + req.Context(), + []gomatrixserverlib.HeaderedEvent{ + (*e).Headered(verRes.RoomVersion), + }, + cfg.Matrix.ServerName, + txnAndSessionID, ) if err != nil { util.GetLogger(req.Context()).WithError(err).Error("producer.SendEvents failed") diff --git a/clientapi/threepid/invites.go b/clientapi/threepid/invites.go index a21aa2b7b..e34e91b56 100644 --- a/clientapi/threepid/invites.go +++ b/clientapi/threepid/invites.go @@ -359,6 +359,13 @@ func emit3PIDInviteEvent( return err } - _, err = producer.SendEvents(ctx, []gomatrixserverlib.Event{*event}, cfg.Matrix.ServerName, nil) + _, err = producer.SendEvents( + ctx, + []gomatrixserverlib.HeaderedEvent{ + (*event).Headered(queryRes.RoomVersion), + }, + cfg.Matrix.ServerName, + nil, + ) return err } diff --git a/federationapi/routing/join.go b/federationapi/routing/join.go index 70e2c7273..73273552d 100644 --- a/federationapi/routing/join.go +++ b/federationapi/routing/join.go @@ -209,7 +209,12 @@ func SendJoin( // We are responsible for notifying other servers that the user has joined // the room, so set SendAsServer to cfg.Matrix.ServerName _, err = producer.SendEvents( - httpReq.Context(), []gomatrixserverlib.Event{event}, cfg.Matrix.ServerName, nil, + httpReq.Context(), + []gomatrixserverlib.HeaderedEvent{ + event.Headered(stateAndAuthChainResponse.RoomVersion), + }, + cfg.Matrix.ServerName, + nil, ) if err != nil { util.GetLogger(httpReq.Context()).WithError(err).Error("producer.SendEvents failed") diff --git a/federationapi/routing/leave.go b/federationapi/routing/leave.go index 00d71866d..888683dd9 100644 --- a/federationapi/routing/leave.go +++ b/federationapi/routing/leave.go @@ -101,6 +101,15 @@ func SendLeave( keys gomatrixserverlib.KeyRing, roomID, eventID string, ) util.JSONResponse { + verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID} + verRes := api.QueryRoomVersionForRoomResponse{} + if err := producer.QueryAPI.QueryRoomVersionForRoom(httpReq.Context(), &verReq, &verRes); err != nil { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.UnsupportedRoomVersion(err.Error()), + } + } + var event gomatrixserverlib.Event if err := json.Unmarshal(request.Content(), &event); err != nil { return util.JSONResponse{ @@ -167,7 +176,14 @@ func SendLeave( // Send the events to the room server. // We are responsible for notifying other servers that the user has left // the room, so set SendAsServer to cfg.Matrix.ServerName - _, err = producer.SendEvents(httpReq.Context(), []gomatrixserverlib.Event{event}, cfg.Matrix.ServerName, nil) + _, err = producer.SendEvents( + httpReq.Context(), + []gomatrixserverlib.HeaderedEvent{ + event.Headered(verRes.RoomVersion), + }, + cfg.Matrix.ServerName, + nil, + ) if err != nil { util.GetLogger(httpReq.Context()).WithError(err).Error("producer.SendEvents failed") return jsonerror.InternalServerError() diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index b903a2d09..e829a38d7 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -179,7 +179,12 @@ func (t *txnReq) processEvent(e gomatrixserverlib.Event) error { // TODO: Check that the event is allowed by its auth_events. // pass the event to the roomserver - _, err := t.producer.SendEvents(t.context, []gomatrixserverlib.Event{e}, api.DoNotSendToOtherServers, nil) + _, err := t.producer.SendEvents( + t.context, + stateResp.StateEvents, + api.DoNotSendToOtherServers, + nil, + ) return err } @@ -240,6 +245,13 @@ retryAllowedState: } return err } + + verReq := api.QueryRoomVersionForRoomRequest{RoomID: e.RoomID()} + verRes := api.QueryRoomVersionForRoomResponse{} + if err := t.query.QueryRoomVersionForRoom(context.Background(), &verReq, &verRes); err != nil { + return err + } + // pass the event along with the state to the roomserver - return t.producer.SendEventWithState(t.context, state, e) + return t.producer.SendEventWithState(t.context, state, e.Headered(verRes.RoomVersion)) } diff --git a/federationapi/routing/threepid.go b/federationapi/routing/threepid.go index f06f76fc0..d17470eae 100644 --- a/federationapi/routing/threepid.go +++ b/federationapi/routing/threepid.go @@ -28,6 +28,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/common/config" + "github.com/matrix-org/dendrite/roomserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" @@ -68,8 +69,17 @@ func CreateInvitesFrom3PIDInvites( return *reqErr } - evs := []gomatrixserverlib.Event{} + evs := []gomatrixserverlib.HeaderedEvent{} for _, inv := range body.Invites { + verReq := api.QueryRoomVersionForRoomRequest{RoomID: inv.RoomID} + verRes := api.QueryRoomVersionForRoomResponse{} + if err := queryAPI.QueryRoomVersionForRoom(req.Context(), &verReq, &verRes); err != nil { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.UnsupportedRoomVersion(err.Error()), + } + } + event, err := createInviteFrom3PIDInvite( req.Context(), queryAPI, asAPI, cfg, inv, federation, accountDB, ) @@ -78,7 +88,7 @@ func CreateInvitesFrom3PIDInvites( return jsonerror.InternalServerError() } if event != nil { - evs = append(evs, *event) + evs = append(evs, (*event).Headered(verRes.RoomVersion)) } } @@ -137,6 +147,15 @@ func ExchangeThirdPartyInvite( } } + verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID} + verRes := api.QueryRoomVersionForRoomResponse{} + if err := queryAPI.QueryRoomVersionForRoom(httpReq.Context(), &verReq, &verRes); err != nil { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.UnsupportedRoomVersion(err.Error()), + } + } + // Auth and build the event from what the remote server sent us event, err := buildMembershipEvent(httpReq.Context(), &builder, queryAPI, cfg) if err == errNotInRoom { @@ -159,7 +178,12 @@ func ExchangeThirdPartyInvite( // Send the event to the roomserver if _, err = producer.SendEvents( - httpReq.Context(), []gomatrixserverlib.Event{signedEvent.Event}, cfg.Matrix.ServerName, nil, + httpReq.Context(), + []gomatrixserverlib.HeaderedEvent{ + signedEvent.Event.Headered(verRes.RoomVersion), + }, + cfg.Matrix.ServerName, + nil, ); err != nil { util.GetLogger(httpReq.Context()).WithError(err).Error("producer.SendEvents failed") return jsonerror.InternalServerError() @@ -181,6 +205,12 @@ func createInviteFrom3PIDInvite( inv invite, federation *gomatrixserverlib.FederationClient, accountDB accounts.Database, ) (*gomatrixserverlib.Event, error) { + verReq := api.QueryRoomVersionForRoomRequest{RoomID: inv.RoomID} + verRes := api.QueryRoomVersionForRoomResponse{} + if err := queryAPI.QueryRoomVersionForRoom(ctx, &verReq, &verRes); err != nil { + return nil, err + } + _, server, err := gomatrixserverlib.SplitID('@', inv.MXID) if err != nil { return nil, err diff --git a/roomserver/input/events.go b/roomserver/input/events.go index ab2bbe1cd..52b411574 100644 --- a/roomserver/input/events.go +++ b/roomserver/input/events.go @@ -97,10 +97,12 @@ func processRoomEvent( ) (eventID string, err error) { // Parse and validate the event JSON event := input.Event + fmt.Println("Starting processRoomEvent") // Check that the event passes authentication checks and work out the numeric IDs for the auth events. authEventNIDs, err := checkAuthEvents(ctx, db, event.Event, input.AuthEventIDs) if err != nil { + fmt.Println("checkAuthEvents:", err) return } @@ -111,6 +113,7 @@ func processRoomEvent( ) // On error OR event with the transaction already processed/processesing if err != nil || eventID != "" { + fmt.Println("db.GetTransactionEventID:", err) return } } @@ -118,6 +121,7 @@ func processRoomEvent( // Store the event roomNID, stateAtEvent, err := db.StoreEvent(ctx, event.Event, input.TransactionID, authEventNIDs) if err != nil { + fmt.Println("db.StoreEvent:", err) return } @@ -133,6 +137,7 @@ func processRoomEvent( // Lets calculate one. err = calculateAndSetState(ctx, db, input, roomNID, &stateAtEvent, event.Event) if err != nil { + fmt.Println("calculateAndSetState:", err) return } } diff --git a/roomserver/input/input.go b/roomserver/input/input.go index bd029d8df..956afc640 100644 --- a/roomserver/input/input.go +++ b/roomserver/input/input.go @@ -18,6 +18,7 @@ package input import ( "context" "encoding/json" + "fmt" "net/http" "sync" @@ -61,16 +62,19 @@ func (r *RoomserverInputAPI) InputRoomEvents( request *api.InputRoomEventsRequest, response *api.InputRoomEventsResponse, ) (err error) { + fmt.Println("Starting InputRoomEvents") // We lock as processRoomEvent can only be called once at a time r.mutex.Lock() defer r.mutex.Unlock() for i := range request.InputRoomEvents { if response.EventID, err = processRoomEvent(ctx, r.DB, r, request.InputRoomEvents[i]); err != nil { + fmt.Println("processRoomEvent:", err) return err } } for i := range request.InputInviteEvents { if err = processInviteEvent(ctx, r.DB, r, request.InputInviteEvents[i]); err != nil { + fmt.Println("processInviteEvent:", err) return err } }