Invites v2 endpoint (#952)

* Start converting v1 invite endpoint to v2

* Update gomatrixserverlib

* Early federationsender code for sending invites

* Sending invites sorta happens now

* Populate invite request with stripped state

* Remodel a bit, don't reflect received invites

* Handle invite_room_state

* Handle room versions a bit better

* Update gomatrixserverlib

* Tweak order in destinationQueue.next

* Revert check in processMessage

* Tweak federation sender destination queue code a bit

* Add comments
This commit is contained in:
Neil Alexander 2020-04-03 14:29:06 +01:00 committed by GitHub
parent 955244c092
commit 067b875063
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 286 additions and 95 deletions

View file

@ -104,18 +104,14 @@ func (c *RoomserverProducer) SendInputRoomEvents(
// This should only be needed for invite events that occur outside of a known room. // This should only be needed for invite events that occur outside of a known room.
// If we are in the room then the event should be sent using the SendEvents method. // If we are in the room then the event should be sent using the SendEvents method.
func (c *RoomserverProducer) SendInvite( func (c *RoomserverProducer) SendInvite(
ctx context.Context, inviteEvent gomatrixserverlib.Event, ctx context.Context, inviteEvent gomatrixserverlib.HeaderedEvent,
inviteRoomState []gomatrixserverlib.InviteV2StrippedState,
) error { ) error {
verReq := api.QueryRoomVersionForRoomRequest{RoomID: inviteEvent.RoomID()}
verRes := api.QueryRoomVersionForRoomResponse{}
err := c.QueryAPI.QueryRoomVersionForRoom(ctx, &verReq, &verRes)
if err != nil {
return err
}
request := api.InputRoomEventsRequest{ request := api.InputRoomEventsRequest{
InputInviteEvents: []api.InputInviteEvent{{ InputInviteEvents: []api.InputInviteEvent{{
Event: inviteEvent.Headered(verRes.RoomVersion), Event: inviteEvent,
InviteRoomState: inviteRoomState,
RoomVersion: inviteEvent.RoomVersion,
}}, }},
} }
var response api.InputRoomEventsResponse var response api.InputRoomEventsResponse

View file

@ -260,6 +260,9 @@ func (r joinRoomReq) joinRoomUsingServers(
}{roomID}, }{roomID},
} }
} }
// TODO: This needs to be re-thought, as in the case of an invite, the room
// will exist in the database in roomserver_rooms but won't have any state
// events, therefore this below check fails.
if err != common.ErrRoomNoExists { if err != common.ErrRoomNoExists {
util.GetLogger(r.req.Context()).WithError(err).Error("common.BuildEvent failed") util.GetLogger(r.req.Context()).WithError(err).Error("common.BuildEvent failed")
return jsonerror.InternalServerError() return jsonerror.InternalServerError()

View file

@ -15,18 +15,17 @@
package routing package routing
import ( import (
"context" "encoding/json"
"net/http" "net/http"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/common/config" "github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
) )
// Invite implements /_matrix/federation/v1/invite/{roomID}/{eventID} // Invite implements /_matrix/federation/v2/invite/{roomID}/{eventID}
func Invite( func Invite(
httpReq *http.Request, httpReq *http.Request,
request *gomatrixserverlib.FederationRequest, request *gomatrixserverlib.FederationRequest,
@ -36,24 +35,14 @@ func Invite(
producer *producers.RoomserverProducer, producer *producers.RoomserverProducer,
keys gomatrixserverlib.KeyRing, keys gomatrixserverlib.KeyRing,
) util.JSONResponse { ) util.JSONResponse {
// Look up the room version for the room. inviteReq := gomatrixserverlib.InviteV2Request{}
verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID} if err := json.Unmarshal(request.Content(), &inviteReq); err != nil {
verRes := api.QueryRoomVersionForRoomResponse{}
if err := producer.QueryAPI.QueryRoomVersionForRoom(context.Background(), &verReq, &verRes); err != nil {
return util.JSONResponse{ return util.JSONResponse{
Code: http.StatusBadRequest, Code: http.StatusBadRequest,
JSON: jsonerror.UnsupportedRoomVersion(err.Error()), JSON: jsonerror.NotJSON("The request body could not be decoded into an invite request. " + err.Error()),
}
}
// Decode the event JSON from the request.
event, err := gomatrixserverlib.NewEventFromUntrustedJSON(request.Content(), verRes.RoomVersion)
if err != nil {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.NotJSON("The request body could not be decoded into valid JSON. " + err.Error()),
} }
} }
event := inviteReq.Event()
// Check that the room ID is correct. // Check that the room ID is correct.
if event.RoomID() != roomID { if event.RoomID() != roomID {
@ -71,14 +60,6 @@ func Invite(
} }
} }
// Check that the event is from the server sending the request.
if event.Origin() != request.Origin() {
return util.JSONResponse{
Code: http.StatusForbidden,
JSON: jsonerror.Forbidden("The invite must be sent by the server it originated on"),
}
}
// Check that the event is signed by the server sending the request. // Check that the event is signed by the server sending the request.
redacted := event.Redact() redacted := event.Redact()
verifyRequests := []gomatrixserverlib.VerifyJSONRequest{{ verifyRequests := []gomatrixserverlib.VerifyJSONRequest{{
@ -104,7 +85,11 @@ func Invite(
) )
// Add the invite event to the roomserver. // Add the invite event to the roomserver.
if err = producer.SendInvite(httpReq.Context(), signedEvent); err != nil { if err = producer.SendInvite(
httpReq.Context(),
signedEvent.Headered(inviteReq.RoomVersion()),
inviteReq.InviteRoomState(),
); err != nil {
util.GetLogger(httpReq.Context()).WithError(err).Error("producer.SendInvite failed") util.GetLogger(httpReq.Context()).WithError(err).Error("producer.SendInvite failed")
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
} }

View file

@ -85,7 +85,7 @@ func Setup(
}, },
)).Methods(http.MethodPut, http.MethodOptions) )).Methods(http.MethodPut, http.MethodOptions)
v1fedmux.Handle("/invite/{roomID}/{eventID}", common.MakeFedAPI( v2fedmux.Handle("/invite/{roomID}/{eventID}", common.MakeFedAPI(
"federation_invite", cfg.Matrix.ServerName, keys, "federation_invite", cfg.Matrix.ServerName, keys,
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest) util.JSONResponse { func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest) util.JSONResponse {
vars, err := common.URLDecodeMapValues(mux.Vars(httpReq)) vars, err := common.URLDecodeMapValues(mux.Vars(httpReq))

View file

@ -32,6 +32,7 @@ import (
// OutputRoomEventConsumer consumes events that originated in the room server. // OutputRoomEventConsumer consumes events that originated in the room server.
type OutputRoomEventConsumer struct { type OutputRoomEventConsumer struct {
cfg *config.Dendrite
roomServerConsumer *common.ContinualConsumer roomServerConsumer *common.ContinualConsumer
db storage.Database db storage.Database
queues *queue.OutgoingQueues queues *queue.OutgoingQueues
@ -52,6 +53,7 @@ func NewOutputRoomEventConsumer(
PartitionStore: store, PartitionStore: store,
} }
s := &OutputRoomEventConsumer{ s := &OutputRoomEventConsumer{
cfg: cfg,
roomServerConsumer: &consumer, roomServerConsumer: &consumer,
db: store, db: store,
queues: queues, queues: queues,
@ -79,29 +81,48 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
log.WithError(err).Errorf("roomserver output log: message parse failure") log.WithError(err).Errorf("roomserver output log: message parse failure")
return nil return nil
} }
if output.Type != api.OutputTypeNewRoomEvent {
switch output.Type {
case api.OutputTypeNewRoomEvent:
ev := &output.NewRoomEvent.Event
log.WithFields(log.Fields{
"event_id": ev.EventID(),
"room_id": ev.RoomID(),
"send_as_server": output.NewRoomEvent.SendAsServer,
}).Info("received room event from roomserver")
if err := s.processMessage(*output.NewRoomEvent); err != nil {
// panic rather than continue with an inconsistent database
log.WithFields(log.Fields{
"event": string(ev.JSON()),
"add": output.NewRoomEvent.AddsStateEventIDs,
"del": output.NewRoomEvent.RemovesStateEventIDs,
log.ErrorKey: err,
}).Panicf("roomserver output log: write room event failure")
return nil
}
case api.OutputTypeNewInviteEvent:
ev := &output.NewInviteEvent.Event
log.WithFields(log.Fields{
"event_id": ev.EventID(),
"room_id": ev.RoomID(),
"state_key": ev.StateKey(),
}).Info("received invite event from roomserver")
if err := s.processInvite(*output.NewInviteEvent); err != nil {
// panic rather than continue with an inconsistent database
log.WithFields(log.Fields{
"event": string(ev.JSON()),
log.ErrorKey: err,
}).Panicf("roomserver output log: write invite event failure")
return nil
}
default:
log.WithField("type", output.Type).Debug( log.WithField("type", output.Type).Debug(
"roomserver output log: ignoring unknown output type", "roomserver output log: ignoring unknown output type",
) )
return nil return nil
} }
ev := &output.NewRoomEvent.Event
log.WithFields(log.Fields{
"event_id": ev.EventID(),
"room_id": ev.RoomID(),
"send_as_server": output.NewRoomEvent.SendAsServer,
}).Info("received event from roomserver")
if err := s.processMessage(*output.NewRoomEvent); err != nil {
// panic rather than continue with an inconsistent database
log.WithFields(log.Fields{
"event": string(ev.JSON()),
log.ErrorKey: err,
"add": output.NewRoomEvent.AddsStateEventIDs,
"del": output.NewRoomEvent.RemovesStateEventIDs,
}).Panicf("roomserver output log: write event failure")
return nil
}
return nil return nil
} }
@ -159,6 +180,69 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err
) )
} }
// processInvite handles an invite event for sending over federation.
func (s *OutputRoomEventConsumer) processInvite(oie api.OutputNewInviteEvent) error {
// Don't try to reflect and resend invites that didn't originate from us.
if s.cfg.Matrix.ServerName != oie.Event.Origin() {
return nil
}
// When sending a v2 invite, the inviting server should try and include
// a "stripped down" version of the room state. This is pretty much just
// enough information for the remote side to show something useful to the
// user, like the room name, aliases etc.
strippedState := []gomatrixserverlib.InviteV2StrippedState{}
stateWanted := []string{
gomatrixserverlib.MRoomName, gomatrixserverlib.MRoomCanonicalAlias,
gomatrixserverlib.MRoomAliases, gomatrixserverlib.MRoomJoinRules,
}
// For each of the state keys that we want to try and send, ask the
// roomserver if we have a state event for that room that matches the
// state key.
for _, wanted := range stateWanted {
queryReq := api.QueryLatestEventsAndStateRequest{
RoomID: oie.Event.RoomID(),
StateToFetch: []gomatrixserverlib.StateKeyTuple{
gomatrixserverlib.StateKeyTuple{
EventType: wanted,
StateKey: "",
},
},
}
// If this fails then we just move onto the next event - we don't
// actually know at this point whether the room even has that type
// of state.
queryRes := api.QueryLatestEventsAndStateResponse{}
if err := s.query.QueryLatestEventsAndState(context.TODO(), &queryReq, &queryRes); err != nil {
log.WithFields(log.Fields{
"room_id": queryReq.RoomID,
"event_type": wanted,
}).WithError(err).Info("couldn't find state to strip")
continue
}
// Append the stripped down copy of the state to our list.
for _, headeredEvent := range queryRes.StateEvents {
event := headeredEvent.Unwrap()
strippedState = append(strippedState, gomatrixserverlib.NewInviteV2StrippedState(&event))
log.WithFields(log.Fields{
"room_id": queryReq.RoomID,
"event_type": event.Type(),
}).Info("adding stripped state")
}
}
// Build the invite request with the info we've got.
inviteReq, err := gomatrixserverlib.NewInviteV2Request(&oie.Event, strippedState)
if err != nil {
return fmt.Errorf("gomatrixserverlib.NewInviteV2Request: %w", err)
}
// Send the event.
return s.queues.SendInvite(&inviteReq)
}
// joinedHostsAtEvent works out a list of matrix servers that were joined to // joinedHostsAtEvent works out a list of matrix servers that were joined to
// the room at the event. // the room at the event.
// It is important to use the state at the event for sending messages because: // It is important to use the state at the event for sending messages because:

View file

@ -24,6 +24,7 @@ import (
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"go.uber.org/atomic"
) )
// destinationQueue is a queue of events for a single destination. // destinationQueue is a queue of events for a single destination.
@ -34,14 +35,15 @@ type destinationQueue struct {
client *gomatrixserverlib.FederationClient client *gomatrixserverlib.FederationClient
origin gomatrixserverlib.ServerName origin gomatrixserverlib.ServerName
destination gomatrixserverlib.ServerName destination gomatrixserverlib.ServerName
// The running mutex protects running, sentCounter, lastTransactionIDs and running atomic.Bool
// The running mutex protects sentCounter, lastTransactionIDs and
// pendingEvents, pendingEDUs. // pendingEvents, pendingEDUs.
runningMutex sync.Mutex runningMutex sync.Mutex
running bool
sentCounter int sentCounter int
lastTransactionIDs []gomatrixserverlib.TransactionID lastTransactionIDs []gomatrixserverlib.TransactionID
pendingEvents []*gomatrixserverlib.HeaderedEvent pendingEvents []*gomatrixserverlib.HeaderedEvent
pendingEDUs []*gomatrixserverlib.EDU pendingEDUs []*gomatrixserverlib.EDU
pendingInvites []*gomatrixserverlib.InviteV2Request
} }
// Send event adds the event to the pending queue for the destination. // Send event adds the event to the pending queue for the destination.
@ -51,29 +53,43 @@ func (oq *destinationQueue) sendEvent(ev *gomatrixserverlib.HeaderedEvent) {
oq.runningMutex.Lock() oq.runningMutex.Lock()
defer oq.runningMutex.Unlock() defer oq.runningMutex.Unlock()
oq.pendingEvents = append(oq.pendingEvents, ev) oq.pendingEvents = append(oq.pendingEvents, ev)
if !oq.running { if !oq.running.Load() {
oq.running = true
go oq.backgroundSend() go oq.backgroundSend()
} }
} }
// sendEDU adds the EDU event to the pending queue for the destination. // sendEDU adds the EDU event to the pending queue for the destination.
// If the queue is empty then it starts a background goroutine to // If the queue is empty then it starts a background goroutine to
// start sending event to that destination. // start sending events to that destination.
func (oq *destinationQueue) sendEDU(e *gomatrixserverlib.EDU) { func (oq *destinationQueue) sendEDU(e *gomatrixserverlib.EDU) {
oq.runningMutex.Lock() oq.runningMutex.Lock()
defer oq.runningMutex.Unlock() defer oq.runningMutex.Unlock()
oq.pendingEDUs = append(oq.pendingEDUs, e) oq.pendingEDUs = append(oq.pendingEDUs, e)
if !oq.running { if !oq.running.Load() {
oq.running = true
go oq.backgroundSend() go oq.backgroundSend()
} }
} }
// sendInvite adds the invite event to the pending queue for the
// destination. If the queue is empty then it starts a background
// goroutine to start sending events to that destination.
func (oq *destinationQueue) sendInvite(ev *gomatrixserverlib.InviteV2Request) {
oq.runningMutex.Lock()
defer oq.runningMutex.Unlock()
oq.pendingInvites = append(oq.pendingInvites, ev)
if !oq.running.Load() {
go oq.backgroundSend()
}
}
// backgroundSend is the worker goroutine for sending events.
func (oq *destinationQueue) backgroundSend() { func (oq *destinationQueue) backgroundSend() {
oq.running.Store(true)
defer oq.running.Store(false)
for { for {
t := oq.next() transaction, invites := oq.nextTransaction(), oq.nextInvites()
if t == nil { if !transaction && !invites {
// If the queue is empty then stop processing for this destination. // If the queue is empty then stop processing for this destination.
// TODO: Remove this destination from the queue map. // TODO: Remove this destination from the queue map.
return return
@ -81,29 +97,18 @@ func (oq *destinationQueue) backgroundSend() {
// TODO: handle retries. // TODO: handle retries.
// TODO: blacklist uncooperative servers. // TODO: blacklist uncooperative servers.
util.GetLogger(context.TODO()).Infof("Sending transaction %q containing %d PDUs, %d EDUs", t.TransactionID, len(t.PDUs), len(t.EDUs))
_, err := oq.client.SendTransaction(context.TODO(), *t)
if err != nil {
log.WithFields(log.Fields{
"destination": oq.destination,
log.ErrorKey: err,
}).Info("problem sending transaction")
}
} }
} }
// next creates a new transaction from the pending event queue // nextTransaction creates a new transaction from the pending event
// and flushes the queue. // queue and sends it. Returns true if a transaction was sent or
// Returns nil if the queue was empty. // false otherwise.
func (oq *destinationQueue) next() *gomatrixserverlib.Transaction { func (oq *destinationQueue) nextTransaction() bool {
oq.runningMutex.Lock() oq.runningMutex.Lock()
defer oq.runningMutex.Unlock() defer oq.runningMutex.Unlock()
if len(oq.pendingEvents) == 0 && len(oq.pendingEDUs) == 0 { if len(oq.pendingEvents) == 0 && len(oq.pendingEDUs) == 0 {
oq.running = false return false
return nil
} }
t := gomatrixserverlib.Transaction{ t := gomatrixserverlib.Transaction{
@ -136,5 +141,46 @@ func (oq *destinationQueue) next() *gomatrixserverlib.Transaction {
oq.pendingEDUs = nil oq.pendingEDUs = nil
oq.sentCounter += len(t.EDUs) oq.sentCounter += len(t.EDUs)
return &t util.GetLogger(context.TODO()).Infof("Sending transaction %q containing %d PDUs, %d EDUs", t.TransactionID, len(t.PDUs), len(t.EDUs))
_, err := oq.client.SendTransaction(context.TODO(), t)
if err != nil {
log.WithFields(log.Fields{
"destination": oq.destination,
log.ErrorKey: err,
}).Info("problem sending transaction")
}
return true
}
// nextInvite takes pending invite events from the queue and sends
// them. Returns true if a transaction was sent or false otherwise.
func (oq *destinationQueue) nextInvites() bool {
oq.runningMutex.Lock()
defer oq.runningMutex.Unlock()
if len(oq.pendingInvites) == 0 {
return false
}
for _, inviteReq := range oq.pendingInvites {
ev := inviteReq.Event()
if _, err := oq.client.SendInviteV2(
context.TODO(),
oq.destination,
*inviteReq,
); err != nil {
log.WithFields(log.Fields{
"event_id": ev.EventID(),
"state_key": ev.StateKey(),
"destination": oq.destination,
}).WithError(err).Error("failed to send invite")
}
}
oq.pendingInvites = nil
return true
} }

View file

@ -80,6 +80,49 @@ func (oqs *OutgoingQueues) SendEvent(
return nil return nil
} }
// SendEvent sends an event to the destinations
func (oqs *OutgoingQueues) SendInvite(
inviteReq *gomatrixserverlib.InviteV2Request,
) error {
ev := inviteReq.Event()
stateKey := ev.StateKey()
if stateKey == nil {
log.WithFields(log.Fields{
"event_id": ev.EventID(),
}).Info("invite had no state key, dropping")
return nil
}
_, destination, err := gomatrixserverlib.SplitID('@', *stateKey)
if err != nil {
log.WithFields(log.Fields{
"event_id": ev.EventID(),
"state_key": stateKey,
}).Info("failed to split destination from state key")
return nil
}
log.WithFields(log.Fields{
"event_id": ev.EventID(),
}).Info("Sending invite")
oqs.queuesMutex.Lock()
defer oqs.queuesMutex.Unlock()
oq := oqs.queues[destination]
if oq == nil {
oq = &destinationQueue{
origin: oqs.origin,
destination: destination,
client: oqs.client,
}
oqs.queues[destination] = oq
}
oq.sendInvite(inviteReq)
return nil
}
// SendEDU sends an EDU event to the destinations // SendEDU sends an EDU event to the destinations
func (oqs *OutgoingQueues) SendEDU( func (oqs *OutgoingQueues) SendEDU(
e *gomatrixserverlib.EDU, origin gomatrixserverlib.ServerName, e *gomatrixserverlib.EDU, origin gomatrixserverlib.ServerName,

2
go.mod
View file

@ -9,7 +9,7 @@ require (
github.com/matrix-org/go-http-js-libp2p v0.0.0-20200318135427-31631a9ef51f github.com/matrix-org/go-http-js-libp2p v0.0.0-20200318135427-31631a9ef51f
github.com/matrix-org/go-sqlite3-js v0.0.0-20200325174927-327088cdef10 github.com/matrix-org/go-sqlite3-js v0.0.0-20200325174927-327088cdef10
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26
github.com/matrix-org/gomatrixserverlib v0.0.0-20200327155501-33fb4c7049dc github.com/matrix-org/gomatrixserverlib v0.0.0-20200402141635-4a6e1ade46f8
github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1 github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7
github.com/mattn/go-sqlite3 v2.0.2+incompatible github.com/mattn/go-sqlite3 v2.0.2+incompatible

4
go.sum
View file

@ -130,8 +130,8 @@ github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 h1:Hr3zjRsq2bh
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0= github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200124100636-0c2ec91d1df5 h1:kmRjpmFOenVpOaV/DRlo9p6z/IbOKlUC+hhKsAAh8Qg= github.com/matrix-org/gomatrixserverlib v0.0.0-20200124100636-0c2ec91d1df5 h1:kmRjpmFOenVpOaV/DRlo9p6z/IbOKlUC+hhKsAAh8Qg=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200124100636-0c2ec91d1df5/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI= github.com/matrix-org/gomatrixserverlib v0.0.0-20200124100636-0c2ec91d1df5/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200327155501-33fb4c7049dc h1:qrRu4/AlulnldLiyGpYYm+ELIkrP51XCRlA3txWpN30= github.com/matrix-org/gomatrixserverlib v0.0.0-20200402141635-4a6e1ade46f8 h1:VZ7xGklSuzU9geMekuxKO4FvUBUaPjP+8IkcwzQtqOI=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200327155501-33fb4c7049dc/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI= github.com/matrix-org/gomatrixserverlib v0.0.0-20200402141635-4a6e1ade46f8/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI=
github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1 h1:osLoFdOy+ChQqVUn2PeTDETFftVkl4w9t/OW18g3lnk= github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1 h1:osLoFdOy+ChQqVUn2PeTDETFftVkl4w9t/OW18g3lnk=
github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1/go.mod h1:cXoYQIENbdWIQHt1SyCo6Bl3C3raHwJ0wgVrXHSqf+A= github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1/go.mod h1:cXoYQIENbdWIQHt1SyCo6Bl3C3raHwJ0wgVrXHSqf+A=
github.com/matrix-org/util v0.0.0-20171127121716-2e2df66af2f5 h1:W7l5CP4V7wPyPb4tYE11dbmeAOwtFQBTW0rf4OonOS8= github.com/matrix-org/util v0.0.0-20171127121716-2e2df66af2f5 h1:W7l5CP4V7wPyPb4tYE11dbmeAOwtFQBTW0rf4OonOS8=

View file

@ -86,7 +86,9 @@ type TransactionID struct {
// the usual context a matrix room event would have. We usually do not have // the usual context a matrix room event would have. We usually do not have
// access to the events needed to check the event auth rules for the invite. // access to the events needed to check the event auth rules for the invite.
type InputInviteEvent struct { type InputInviteEvent struct {
Event gomatrixserverlib.HeaderedEvent `json:"event"` RoomVersion gomatrixserverlib.RoomVersion `json:"room_version"`
Event gomatrixserverlib.HeaderedEvent `json:"event"`
InviteRoomState []gomatrixserverlib.InviteV2StrippedState `json:"invite_room_state"`
} }
// InputRoomEventsRequest is a request to InputRoomEvents // InputRoomEventsRequest is a request to InputRoomEvents

View file

@ -116,6 +116,8 @@ type OutputNewRoomEvent struct {
// Invite events can be received outside of an existing room so have to be // Invite events can be received outside of an existing room so have to be
// tracked separately from the room events themselves. // tracked separately from the room events themselves.
type OutputNewInviteEvent struct { type OutputNewInviteEvent struct {
// The room version of the invited room.
RoomVersion gomatrixserverlib.RoomVersion `json:"room_version"`
// The "m.room.member" invite event. // The "m.room.member" invite event.
Event gomatrixserverlib.HeaderedEvent `json:"event"` Event gomatrixserverlib.HeaderedEvent `json:"event"`
} }

View file

@ -26,6 +26,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/state/database" "github.com/matrix-org/dendrite/roomserver/state/database"
"github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus"
) )
// A RoomEventDatabase has the storage APIs needed to store a room event. // A RoomEventDatabase has the storage APIs needed to store a room event.
@ -64,6 +65,7 @@ type RoomEventDatabase interface {
// Build a membership updater for the target user in a room. // Build a membership updater for the target user in a room.
MembershipUpdater( MembershipUpdater(
ctx context.Context, roomID, targerUserID string, ctx context.Context, roomID, targerUserID string,
roomVersion gomatrixserverlib.RoomVersion,
) (types.MembershipUpdater, error) ) (types.MembershipUpdater, error)
// Look up event ID by transaction's info. // Look up event ID by transaction's info.
// This is used to determine if the room event is processed/processing already. // This is used to determine if the room event is processed/processing already.
@ -193,7 +195,14 @@ func processInviteEvent(
roomID := input.Event.RoomID() roomID := input.Event.RoomID()
targetUserID := *input.Event.StateKey() targetUserID := *input.Event.StateKey()
updater, err := db.MembershipUpdater(ctx, roomID, targetUserID) log.WithFields(log.Fields{
"event_id": input.Event.EventID(),
"room_id": roomID,
"room_version": input.RoomVersion,
"target_user_id": targetUserID,
}).Info("processing invite event")
updater, err := db.MembershipUpdater(ctx, roomID, targetUserID, input.RoomVersion)
if err != nil { if err != nil {
return err return err
} }
@ -237,7 +246,12 @@ func processInviteEvent(
} }
event := input.Event.Unwrap() event := input.Event.Unwrap()
outputUpdates, err := updateToInviteMembership(updater, &event, nil)
if err = event.SetUnsignedField("invite_room_state", input.InviteRoomState); err != nil {
return err
}
outputUpdates, err := updateToInviteMembership(updater, &event, nil, input.Event.RoomVersion)
if err != nil { if err != nil {
return err return err
} }

View file

@ -112,7 +112,7 @@ func updateMembership(
switch newMembership { switch newMembership {
case gomatrixserverlib.Invite: case gomatrixserverlib.Invite:
return updateToInviteMembership(mu, add, updates) return updateToInviteMembership(mu, add, updates, updater.RoomVersion())
case gomatrixserverlib.Join: case gomatrixserverlib.Join:
return updateToJoinMembership(mu, add, updates) return updateToJoinMembership(mu, add, updates)
case gomatrixserverlib.Leave, gomatrixserverlib.Ban: case gomatrixserverlib.Leave, gomatrixserverlib.Ban:
@ -126,6 +126,7 @@ func updateMembership(
func updateToInviteMembership( func updateToInviteMembership(
mu types.MembershipUpdater, add *gomatrixserverlib.Event, updates []api.OutputEvent, mu types.MembershipUpdater, add *gomatrixserverlib.Event, updates []api.OutputEvent,
roomVersion gomatrixserverlib.RoomVersion,
) ([]api.OutputEvent, error) { ) ([]api.OutputEvent, error) {
// We may have already sent the invite to the user, either because we are // We may have already sent the invite to the user, either because we are
// reprocessing this event, or because the we received this invite from a // reprocessing this event, or because the we received this invite from a
@ -136,14 +137,14 @@ func updateToInviteMembership(
return nil, err return nil, err
} }
if needsSending { if needsSending {
roomVersion := gomatrixserverlib.RoomVersionV1
// We notify the consumers using a special event even though we will // We notify the consumers using a special event even though we will
// notify them about the change in current state as part of the normal // notify them about the change in current state as part of the normal
// room event stream. This ensures that the consumers only have to // room event stream. This ensures that the consumers only have to
// consider a single stream of events when determining whether a user // consider a single stream of events when determining whether a user
// is invited, rather than having to combine multiple streams themselves. // is invited, rather than having to combine multiple streams themselves.
onie := api.OutputNewInviteEvent{ onie := api.OutputNewInviteEvent{
Event: (*add).Headered(roomVersion), Event: (*add).Headered(roomVersion),
RoomVersion: roomVersion,
} }
updates = append(updates, api.OutputEvent{ updates = append(updates, api.OutputEvent{
Type: api.OutputTypeNewInviteEvent, Type: api.OutputTypeNewInviteEvent,

View file

@ -41,7 +41,7 @@ type Database interface {
GetAliasesForRoomID(ctx context.Context, roomID string) ([]string, error) GetAliasesForRoomID(ctx context.Context, roomID string) ([]string, error)
GetCreatorIDForAlias(ctx context.Context, alias string) (string, error) GetCreatorIDForAlias(ctx context.Context, alias string) (string, error)
RemoveRoomAlias(ctx context.Context, alias string) error RemoveRoomAlias(ctx context.Context, alias string) error
MembershipUpdater(ctx context.Context, roomID, targetUserID string) (types.MembershipUpdater, error) MembershipUpdater(ctx context.Context, roomID, targetUserID string, roomVersion gomatrixserverlib.RoomVersion) (types.MembershipUpdater, error)
GetMembership(ctx context.Context, roomNID types.RoomNID, requestSenderUserID string) (membershipEventNID types.EventNID, stillInRoom bool, err error) GetMembership(ctx context.Context, roomNID types.RoomNID, requestSenderUserID string) (membershipEventNID types.EventNID, stillInRoom bool, err error)
GetMembershipEventNIDsForRoom(ctx context.Context, roomNID types.RoomNID, joinOnly bool) ([]types.EventNID, error) GetMembershipEventNIDsForRoom(ctx context.Context, roomNID types.RoomNID, joinOnly bool) ([]types.EventNID, error)
EventsFromIDs(ctx context.Context, eventIDs []string) ([]types.Event, error) EventsFromIDs(ctx context.Context, eventIDs []string) ([]types.Event, error)

View file

@ -393,6 +393,12 @@ type roomRecentEventsUpdater struct {
currentStateSnapshotNID types.StateSnapshotNID currentStateSnapshotNID types.StateSnapshotNID
} }
// RoomVersion implements types.RoomRecentEventsUpdater
func (u *roomRecentEventsUpdater) RoomVersion() (version gomatrixserverlib.RoomVersion) {
version, _ = u.d.GetRoomVersionForRoomNID(u.ctx, u.roomNID)
return
}
// LatestEvents implements types.RoomRecentEventsUpdater // LatestEvents implements types.RoomRecentEventsUpdater
func (u *roomRecentEventsUpdater) LatestEvents() []types.StateAtEventAndReference { func (u *roomRecentEventsUpdater) LatestEvents() []types.StateAtEventAndReference {
return u.latestEvents return u.latestEvents
@ -534,6 +540,7 @@ func (d *Database) StateEntriesForTuples(
// MembershipUpdater implements input.RoomEventDatabase // MembershipUpdater implements input.RoomEventDatabase
func (d *Database) MembershipUpdater( func (d *Database) MembershipUpdater(
ctx context.Context, roomID, targetUserID string, ctx context.Context, roomID, targetUserID string,
roomVersion gomatrixserverlib.RoomVersion,
) (types.MembershipUpdater, error) { ) (types.MembershipUpdater, error) {
txn, err := d.db.Begin() txn, err := d.db.Begin()
if err != nil { if err != nil {
@ -546,8 +553,7 @@ func (d *Database) MembershipUpdater(
} }
}() }()
// TODO: Room version here roomNID, err := d.assignRoomNID(ctx, txn, roomID, roomVersion)
roomNID, err := d.assignRoomNID(ctx, txn, roomID, "1")
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -486,6 +486,12 @@ type roomRecentEventsUpdater struct {
currentStateSnapshotNID types.StateSnapshotNID currentStateSnapshotNID types.StateSnapshotNID
} }
// RoomVersion implements types.RoomRecentEventsUpdater
func (u *roomRecentEventsUpdater) RoomVersion() (version gomatrixserverlib.RoomVersion) {
version, _ = u.d.GetRoomVersionForRoomNID(u.ctx, u.roomNID)
return
}
// LatestEvents implements types.RoomRecentEventsUpdater // LatestEvents implements types.RoomRecentEventsUpdater
func (u *roomRecentEventsUpdater) LatestEvents() []types.StateAtEventAndReference { func (u *roomRecentEventsUpdater) LatestEvents() []types.StateAtEventAndReference {
return u.latestEvents return u.latestEvents
@ -657,6 +663,7 @@ func (d *Database) StateEntriesForTuples(
// MembershipUpdater implements input.RoomEventDatabase // MembershipUpdater implements input.RoomEventDatabase
func (d *Database) MembershipUpdater( func (d *Database) MembershipUpdater(
ctx context.Context, roomID, targetUserID string, ctx context.Context, roomID, targetUserID string,
roomVersion gomatrixserverlib.RoomVersion,
) (updater types.MembershipUpdater, err error) { ) (updater types.MembershipUpdater, err error) {
var txn *sql.Tx var txn *sql.Tx
txn, err = d.db.Begin() txn, err = d.db.Begin()
@ -682,8 +689,7 @@ func (d *Database) MembershipUpdater(
} }
}() }()
// TODO: Room version here roomNID, err := d.assignRoomNID(ctx, txn, roomID, roomVersion)
roomNID, err := d.assignRoomNID(ctx, txn, roomID, "1")
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -140,6 +140,8 @@ type StateEntryList struct {
// (On postgresql this wraps a database transaction that holds a "FOR UPDATE" // (On postgresql this wraps a database transaction that holds a "FOR UPDATE"
// lock on the row in the rooms table holding the latest events for the room.) // lock on the row in the rooms table holding the latest events for the room.)
type RoomRecentEventsUpdater interface { type RoomRecentEventsUpdater interface {
// The room version of the room.
RoomVersion() gomatrixserverlib.RoomVersion
// The latest event IDs and state in the room. // The latest event IDs and state in the room.
LatestEvents() []StateAtEventAndReference LatestEvents() []StateAtEventAndReference
// The event ID of the latest event written to the output log in the room. // The event ID of the latest event written to the output log in the room.

View file

@ -231,3 +231,4 @@ remote user can join room with version 2
User can invite local user to room with version 2 User can invite local user to room with version 2
Remote user can backfill in a room with version 2 Remote user can backfill in a room with version 2
Inbound federation accepts attempts to join v2 rooms from servers with support Inbound federation accepts attempts to join v2 rooms from servers with support
Outbound federation can send invites via v2 API