Roomserver generates input room events for invites automatically now

This commit is contained in:
Neil Alexander 2020-04-27 09:47:52 +01:00
parent 3ab8ebf6b8
commit 4129895a67
4 changed files with 57 additions and 30 deletions

View file

@ -127,18 +127,18 @@ func SendMembership(
returnData = struct {
RoomID string `json:"room_id"`
}{roomID}
fallthrough
default:
}
_, err = producer.SendEvents(
req.Context(),
[]gomatrixserverlib.HeaderedEvent{event.Headered(verRes.RoomVersion)},
cfg.Matrix.ServerName,
nil,
)
if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("producer.SendEvents failed")
return jsonerror.InternalServerError()
_, err = producer.SendEvents(
req.Context(),
[]gomatrixserverlib.HeaderedEvent{event.Headered(verRes.RoomVersion)},
cfg.Matrix.ServerName,
nil,
)
if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("producer.SendEvents failed")
return jsonerror.InternalServerError()
}
}
return util.JSONResponse{

View file

@ -40,7 +40,7 @@ type OutputRoomEventWriter interface {
// TODO(#375): This should be rewritten to allow concurrent calls. The
// difficulty is in ensuring that we correctly annotate events with the correct
// state deltas when sending to kafka streams
func processRoomEvent(
func (r *RoomserverInputAPI) processRoomEvent(
ctx context.Context,
db storage.Database,
ow OutputRoomEventWriter,
@ -131,14 +131,14 @@ func calculateAndSetState(
return db.SetState(ctx, stateAtEvent.EventNID, stateAtEvent.BeforeStateSnapshotNID)
}
func processInviteEvent(
func (r *RoomserverInputAPI) processInviteEvent(
ctx context.Context,
db storage.Database,
ow OutputRoomEventWriter,
input api.InputInviteEvent,
) (err error) {
) (api.InputRoomEvent, error) {
if input.Event.StateKey() == nil {
return fmt.Errorf("invite must be a state event")
return api.InputRoomEvent{}, fmt.Errorf("invite must be a state event")
}
roomID := input.Event.RoomID()
@ -153,7 +153,7 @@ func processInviteEvent(
updater, err := db.MembershipUpdater(ctx, roomID, targetUserID, input.RoomVersion)
if err != nil {
return err
return api.InputRoomEvent{}, err
}
succeeded := false
defer func() {
@ -191,7 +191,7 @@ func processInviteEvent(
// For now we will implement option 2. Since in the abesence of a retry
// mechanism it will be equivalent to option 1, and we don't have a
// signalling mechanism to implement option 3.
return nil
return api.InputRoomEvent{}, nil
}
event := input.Event.Unwrap()
@ -201,7 +201,7 @@ func processInviteEvent(
// most likely to be if the event came in over federation) then use
// that.
if err = event.SetUnsignedField("invite_room_state", input.InviteRoomState); err != nil {
return err
return api.InputRoomEvent{}, err
}
} else {
// There's no invite room state, so let's have a go at building it
@ -210,22 +210,43 @@ func processInviteEvent(
// the invite room state, if we don't then we just fail quietly.
if irs, ierr := buildInviteStrippedState(ctx, db, input); ierr == nil {
if err = event.SetUnsignedField("invite_room_state", irs); err != nil {
return err
return api.InputRoomEvent{}, err
}
}
}
outputUpdates, err := updateToInviteMembership(updater, &event, nil, input.Event.RoomVersion)
if err != nil {
return err
return api.InputRoomEvent{}, err
}
if err = ow.WriteOutputEvents(roomID, outputUpdates); err != nil {
return err
return api.InputRoomEvent{}, err
}
// If the event was originated from ourselves then we will set the
// SendAsServer field, which will instruct the federation sender to
// transmit the event. If the event was originated from somewhere else
// and we received it over federation then we don't want to reflect it
// again.
sendAsServer := api.DoNotSendToOtherServers
if input.SendAsServer == string(r.Cfg.Matrix.ServerName) {
sendAsServer = input.SendAsServer
}
// Generate a loopback event which we will send through the roomserver,
// which will notify existing users in the room that the invite took
// place.
loopbackUpdate := api.InputRoomEvent{
Kind: api.KindNew,
Event: input.Event,
AuthEventIDs: input.Event.AuthEventIDs(),
SendAsServer: sendAsServer,
TransactionID: input.TransactionID,
}
succeeded = true
return nil
return loopbackUpdate, nil
}
func buildInviteStrippedState(

View file

@ -23,6 +23,7 @@ import (
"github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/util"
@ -31,6 +32,7 @@ import (
// RoomserverInputAPI implements api.RoomserverInputAPI
type RoomserverInputAPI struct {
DB storage.Database
Cfg *config.Dendrite
Producer sarama.SyncProducer
// The kafkaesque topic to output new room events to.
// This is the name used in kafka to identify the stream to write events to.
@ -61,21 +63,24 @@ func (r *RoomserverInputAPI) InputRoomEvents(
ctx context.Context,
request *api.InputRoomEventsRequest,
response *api.InputRoomEventsResponse,
) (err error) {
) error {
// We lock as processRoomEvent can only be called once at a time
r.mutex.Lock()
defer r.mutex.Unlock()
for i := range request.InputRoomEvents {
if response.EventID, err = processRoomEvent(ctx, r.DB, r, request.InputRoomEvents[i]); err != nil {
return err
}
}
for i := range request.InputInviteEvents {
if err = processInviteEvent(ctx, r.DB, r, request.InputInviteEvents[i]); err != nil {
roomEvent, err := r.processInviteEvent(ctx, r.DB, r, request.InputInviteEvents[i])
if err != nil {
return err
}
request.InputRoomEvents = append(request.InputRoomEvents, roomEvent)
}
return nil
var err error
for i := range request.InputRoomEvents {
if response.EventID, err = r.processRoomEvent(ctx, r.DB, r, request.InputRoomEvents[i]); err != nil {
break
}
}
return err
}
// SetupHTTP adds the RoomserverInputAPI handlers to the http.ServeMux.

View file

@ -42,6 +42,7 @@ func SetupRoomServerComponent(
inputAPI := input.RoomserverInputAPI{
DB: roomserverDB,
Cfg: base.Cfg,
Producer: base.KafkaProducer,
OutputRoomEventTopic: string(base.Cfg.Kafka.Topics.OutputRoomEvent),
}