diff --git a/clientapi/routing/membership.go b/clientapi/routing/membership.go index c597dd27d..dff194dd3 100644 --- a/clientapi/routing/membership.go +++ b/clientapi/routing/membership.go @@ -127,18 +127,18 @@ func SendMembership( returnData = struct { RoomID string `json:"room_id"` }{roomID} + fallthrough default: - } - - _, err = producer.SendEvents( - req.Context(), - []gomatrixserverlib.HeaderedEvent{event.Headered(verRes.RoomVersion)}, - cfg.Matrix.ServerName, - nil, - ) - if err != nil { - util.GetLogger(req.Context()).WithError(err).Error("producer.SendEvents failed") - return jsonerror.InternalServerError() + _, err = producer.SendEvents( + req.Context(), + []gomatrixserverlib.HeaderedEvent{event.Headered(verRes.RoomVersion)}, + cfg.Matrix.ServerName, + nil, + ) + if err != nil { + util.GetLogger(req.Context()).WithError(err).Error("producer.SendEvents failed") + return jsonerror.InternalServerError() + } } return util.JSONResponse{ diff --git a/roomserver/input/events.go b/roomserver/input/events.go index 205035d98..9ec176426 100644 --- a/roomserver/input/events.go +++ b/roomserver/input/events.go @@ -40,7 +40,7 @@ type OutputRoomEventWriter interface { // TODO(#375): This should be rewritten to allow concurrent calls. The // difficulty is in ensuring that we correctly annotate events with the correct // state deltas when sending to kafka streams -func processRoomEvent( +func (r *RoomserverInputAPI) processRoomEvent( ctx context.Context, db storage.Database, ow OutputRoomEventWriter, @@ -131,14 +131,14 @@ func calculateAndSetState( return db.SetState(ctx, stateAtEvent.EventNID, stateAtEvent.BeforeStateSnapshotNID) } -func processInviteEvent( +func (r *RoomserverInputAPI) processInviteEvent( ctx context.Context, db storage.Database, ow OutputRoomEventWriter, input api.InputInviteEvent, -) (err error) { +) (api.InputRoomEvent, error) { if input.Event.StateKey() == nil { - return fmt.Errorf("invite must be a state event") + return api.InputRoomEvent{}, fmt.Errorf("invite must be a state event") } roomID := input.Event.RoomID() @@ -153,7 +153,7 @@ func processInviteEvent( updater, err := db.MembershipUpdater(ctx, roomID, targetUserID, input.RoomVersion) if err != nil { - return err + return api.InputRoomEvent{}, err } succeeded := false defer func() { @@ -191,7 +191,7 @@ func processInviteEvent( // For now we will implement option 2. Since in the abesence of a retry // mechanism it will be equivalent to option 1, and we don't have a // signalling mechanism to implement option 3. - return nil + return api.InputRoomEvent{}, nil } event := input.Event.Unwrap() @@ -201,7 +201,7 @@ func processInviteEvent( // most likely to be if the event came in over federation) then use // that. if err = event.SetUnsignedField("invite_room_state", input.InviteRoomState); err != nil { - return err + return api.InputRoomEvent{}, err } } else { // There's no invite room state, so let's have a go at building it @@ -210,22 +210,43 @@ func processInviteEvent( // the invite room state, if we don't then we just fail quietly. if irs, ierr := buildInviteStrippedState(ctx, db, input); ierr == nil { if err = event.SetUnsignedField("invite_room_state", irs); err != nil { - return err + return api.InputRoomEvent{}, err } } } outputUpdates, err := updateToInviteMembership(updater, &event, nil, input.Event.RoomVersion) if err != nil { - return err + return api.InputRoomEvent{}, err } if err = ow.WriteOutputEvents(roomID, outputUpdates); err != nil { - return err + return api.InputRoomEvent{}, err + } + + // If the event was originated from ourselves then we will set the + // SendAsServer field, which will instruct the federation sender to + // transmit the event. If the event was originated from somewhere else + // and we received it over federation then we don't want to reflect it + // again. + sendAsServer := api.DoNotSendToOtherServers + if input.SendAsServer == string(r.Cfg.Matrix.ServerName) { + sendAsServer = input.SendAsServer + } + + // Generate a loopback event which we will send through the roomserver, + // which will notify existing users in the room that the invite took + // place. + loopbackUpdate := api.InputRoomEvent{ + Kind: api.KindNew, + Event: input.Event, + AuthEventIDs: input.Event.AuthEventIDs(), + SendAsServer: sendAsServer, + TransactionID: input.TransactionID, } succeeded = true - return nil + return loopbackUpdate, nil } func buildInviteStrippedState( diff --git a/roomserver/input/input.go b/roomserver/input/input.go index cb588380a..5c7090265 100644 --- a/roomserver/input/input.go +++ b/roomserver/input/input.go @@ -23,6 +23,7 @@ import ( "github.com/Shopify/sarama" "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/common/config" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/util" @@ -31,6 +32,7 @@ import ( // RoomserverInputAPI implements api.RoomserverInputAPI type RoomserverInputAPI struct { DB storage.Database + Cfg *config.Dendrite Producer sarama.SyncProducer // The kafkaesque topic to output new room events to. // This is the name used in kafka to identify the stream to write events to. @@ -61,21 +63,24 @@ func (r *RoomserverInputAPI) InputRoomEvents( ctx context.Context, request *api.InputRoomEventsRequest, response *api.InputRoomEventsResponse, -) (err error) { +) error { // We lock as processRoomEvent can only be called once at a time r.mutex.Lock() defer r.mutex.Unlock() - for i := range request.InputRoomEvents { - if response.EventID, err = processRoomEvent(ctx, r.DB, r, request.InputRoomEvents[i]); err != nil { - return err - } - } for i := range request.InputInviteEvents { - if err = processInviteEvent(ctx, r.DB, r, request.InputInviteEvents[i]); err != nil { + roomEvent, err := r.processInviteEvent(ctx, r.DB, r, request.InputInviteEvents[i]) + if err != nil { return err } + request.InputRoomEvents = append(request.InputRoomEvents, roomEvent) } - return nil + var err error + for i := range request.InputRoomEvents { + if response.EventID, err = r.processRoomEvent(ctx, r.DB, r, request.InputRoomEvents[i]); err != nil { + break + } + } + return err } // SetupHTTP adds the RoomserverInputAPI handlers to the http.ServeMux. diff --git a/roomserver/roomserver.go b/roomserver/roomserver.go index fa4f20626..b7e94ffb1 100644 --- a/roomserver/roomserver.go +++ b/roomserver/roomserver.go @@ -42,6 +42,7 @@ func SetupRoomServerComponent( inputAPI := input.RoomserverInputAPI{ DB: roomserverDB, + Cfg: base.Cfg, Producer: base.KafkaProducer, OutputRoomEventTopic: string(base.Cfg.Kafka.Topics.OutputRoomEvent), }