Merge branch 'master' into kegan/backfill-again

This commit is contained in:
Kegan Dougal 2020-04-28 11:27:26 +01:00
commit 1d0da512cc
26 changed files with 304 additions and 91 deletions

View file

@ -106,12 +106,15 @@ func (c *RoomserverProducer) SendInputRoomEvents(
func (c *RoomserverProducer) SendInvite( func (c *RoomserverProducer) SendInvite(
ctx context.Context, inviteEvent gomatrixserverlib.HeaderedEvent, ctx context.Context, inviteEvent gomatrixserverlib.HeaderedEvent,
inviteRoomState []gomatrixserverlib.InviteV2StrippedState, inviteRoomState []gomatrixserverlib.InviteV2StrippedState,
sendAsServer gomatrixserverlib.ServerName, txnID *api.TransactionID,
) error { ) error {
request := api.InputRoomEventsRequest{ request := api.InputRoomEventsRequest{
InputInviteEvents: []api.InputInviteEvent{{ InputInviteEvents: []api.InputInviteEvent{{
Event: inviteEvent, Event: inviteEvent,
InviteRoomState: inviteRoomState, InviteRoomState: inviteRoomState,
RoomVersion: inviteEvent.RoomVersion, RoomVersion: inviteEvent.RoomVersion,
SendAsServer: string(sendAsServer),
TransactionID: txnID,
}}, }},
} }
var response api.InputRoomEventsResponse var response api.InputRoomEventsResponse

View file

@ -40,6 +40,8 @@ var errMissingUserID = errors.New("'user_id' must be supplied")
// SendMembership implements PUT /rooms/{roomID}/(join|kick|ban|unban|leave|invite) // SendMembership implements PUT /rooms/{roomID}/(join|kick|ban|unban|leave|invite)
// by building a m.room.member event then sending it to the room server // by building a m.room.member event then sending it to the room server
// TODO: Can we improve the cyclo count here? Separate code paths for invites?
// nolint:gocyclo
func SendMembership( func SendMembership(
req *http.Request, accountDB accounts.Database, device *authtypes.Device, req *http.Request, accountDB accounts.Database, device *authtypes.Device,
roomID string, membership string, cfg *config.Dendrite, roomID string, membership string, cfg *config.Dendrite,
@ -104,23 +106,39 @@ func SendMembership(
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
} }
if _, err := producer.SendEvents(
req.Context(),
[]gomatrixserverlib.HeaderedEvent{(*event).Headered(verRes.RoomVersion)},
cfg.Matrix.ServerName,
nil,
); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("producer.SendEvents failed")
return jsonerror.InternalServerError()
}
var returnData interface{} = struct{}{} var returnData interface{} = struct{}{}
// The join membership requires the room id to be sent in the response switch membership {
if membership == gomatrixserverlib.Join { case gomatrixserverlib.Invite:
// Invites need to be handled specially
err = producer.SendInvite(
req.Context(),
event.Headered(verRes.RoomVersion),
nil, // ask the roomserver to draw up invite room state for us
cfg.Matrix.ServerName,
nil,
)
if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("producer.SendInvite failed")
return jsonerror.InternalServerError()
}
case gomatrixserverlib.Join:
// The join membership requires the room id to be sent in the response
returnData = struct { returnData = struct {
RoomID string `json:"room_id"` RoomID string `json:"room_id"`
}{roomID} }{roomID}
fallthrough
default:
_, err = producer.SendEvents(
req.Context(),
[]gomatrixserverlib.HeaderedEvent{event.Headered(verRes.RoomVersion)},
cfg.Matrix.ServerName,
nil,
)
if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("producer.SendEvents failed")
return jsonerror.InternalServerError()
}
} }
return util.JSONResponse{ return util.JSONResponse{

View file

@ -153,7 +153,7 @@ func main() {
asQuery := appservice.SetupAppServiceAPIComponent( asQuery := appservice.SetupAppServiceAPIComponent(
&base.Base, accountDB, deviceDB, federation, alias, query, transactions.New(), &base.Base, accountDB, deviceDB, federation, alias, query, transactions.New(),
) )
fedSenderAPI := federationsender.SetupFederationSenderComponent(&base.Base, federation, query) fedSenderAPI := federationsender.SetupFederationSenderComponent(&base.Base, federation, query, input)
clientapi.SetupClientAPIComponent( clientapi.SetupClientAPIComponent(
&base.Base, deviceDB, accountDB, &base.Base, deviceDB, accountDB,

View file

@ -26,10 +26,10 @@ func main() {
federation := base.CreateFederationClient() federation := base.CreateFederationClient()
_, _, query := base.CreateHTTPRoomserverAPIs() _, input, query := base.CreateHTTPRoomserverAPIs()
federationsender.SetupFederationSenderComponent( federationsender.SetupFederationSenderComponent(
base, federation, query, base, federation, query, input,
) )
base.SetupAndServeHTTP(string(base.Cfg.Bind.FederationSender), string(base.Cfg.Listen.FederationSender)) base.SetupAndServeHTTP(string(base.Cfg.Bind.FederationSender), string(base.Cfg.Listen.FederationSender))

View file

@ -62,7 +62,7 @@ func main() {
asQuery := appservice.SetupAppServiceAPIComponent( asQuery := appservice.SetupAppServiceAPIComponent(
base, accountDB, deviceDB, federation, alias, query, transactions.New(), base, accountDB, deviceDB, federation, alias, query, transactions.New(),
) )
fedSenderAPI := federationsender.SetupFederationSenderComponent(base, federation, query) fedSenderAPI := federationsender.SetupFederationSenderComponent(base, federation, query, input)
clientapi.SetupClientAPIComponent( clientapi.SetupClientAPIComponent(
base, deviceDB, accountDB, base, deviceDB, accountDB,

View file

@ -98,7 +98,10 @@ func Backfill(
} }
var eventJSONs []json.RawMessage var eventJSONs []json.RawMessage
for _, e := range gomatrixserverlib.ReverseTopologicalOrdering(evs, gomatrixserverlib.TopologicalOrderByPrevEvents) { for _, e := range gomatrixserverlib.ReverseTopologicalOrdering(
evs,
gomatrixserverlib.TopologicalOrderByPrevEvents,
) {
eventJSONs = append(eventJSONs, e.JSON()) eventJSONs = append(eventJSONs, e.JSON())
} }

View file

@ -16,11 +16,13 @@ package routing
import ( import (
"encoding/json" "encoding/json"
"fmt"
"net/http" "net/http"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/common/config" "github.com/matrix-org/dendrite/common/config"
roomserverVersion "github.com/matrix-org/dendrite/roomserver/version"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
) )
@ -44,6 +46,16 @@ func Invite(
} }
event := inviteReq.Event() event := inviteReq.Event()
// Check that we can accept invites for this room version.
if _, err := roomserverVersion.SupportedRoomVersion(inviteReq.RoomVersion()); err != nil {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.UnsupportedRoomVersion(
fmt.Sprintf("Room version %q is not supported by this server.", inviteReq.RoomVersion()),
),
}
}
// Check that the room ID is correct. // Check that the room ID is correct.
if event.RoomID() != roomID { if event.RoomID() != roomID {
return util.JSONResponse{ return util.JSONResponse{
@ -90,6 +102,8 @@ func Invite(
httpReq.Context(), httpReq.Context(),
signedEvent.Headered(inviteReq.RoomVersion()), signedEvent.Headered(inviteReq.RoomVersion()),
inviteReq.InviteRoomState(), inviteReq.InviteRoomState(),
event.Origin(),
nil,
); err != nil { ); err != nil {
util.GetLogger(httpReq.Context()).WithError(err).Error("producer.SendInvite failed") util.GetLogger(httpReq.Context()).WithError(err).Error("producer.SendInvite failed")
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
@ -99,6 +113,6 @@ func Invite(
// the other servers in the room that we have been invited. // the other servers in the room that we have been invited.
return util.JSONResponse{ return util.JSONResponse{
Code: http.StatusOK, Code: http.StatusOK,
JSON: gomatrixserverlib.RespInvite{Event: signedEvent}, JSON: gomatrixserverlib.RespInviteV2{Event: signedEvent},
} }
} }

View file

@ -28,6 +28,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/tidwall/gjson"
) )
// OutputRoomEventConsumer consumes events that originated in the room server. // OutputRoomEventConsumer consumes events that originated in the room server.
@ -187,49 +188,12 @@ func (s *OutputRoomEventConsumer) processInvite(oie api.OutputNewInviteEvent) er
return nil return nil
} }
// When sending a v2 invite, the inviting server should try and include // Try to extract the room invite state. The roomserver will have stashed
// a "stripped down" version of the room state. This is pretty much just // this for us in invite_room_state if it didn't already exist.
// enough information for the remote side to show something useful to the
// user, like the room name, aliases etc.
strippedState := []gomatrixserverlib.InviteV2StrippedState{} strippedState := []gomatrixserverlib.InviteV2StrippedState{}
stateWanted := []string{ if inviteRoomState := gjson.GetBytes(oie.Event.Unsigned(), "invite_room_state"); inviteRoomState.Exists() {
gomatrixserverlib.MRoomName, gomatrixserverlib.MRoomCanonicalAlias, if err := json.Unmarshal([]byte(inviteRoomState.Raw), &strippedState); err != nil {
gomatrixserverlib.MRoomAliases, gomatrixserverlib.MRoomJoinRules, log.WithError(err).Warn("failed to extract invite_room_state from event unsigned")
}
// For each of the state keys that we want to try and send, ask the
// roomserver if we have a state event for that room that matches the
// state key.
for _, wanted := range stateWanted {
queryReq := api.QueryLatestEventsAndStateRequest{
RoomID: oie.Event.RoomID(),
StateToFetch: []gomatrixserverlib.StateKeyTuple{
gomatrixserverlib.StateKeyTuple{
EventType: wanted,
StateKey: "",
},
},
}
// If this fails then we just move onto the next event - we don't
// actually know at this point whether the room even has that type
// of state.
queryRes := api.QueryLatestEventsAndStateResponse{}
if err := s.query.QueryLatestEventsAndState(context.TODO(), &queryReq, &queryRes); err != nil {
log.WithFields(log.Fields{
"room_id": queryReq.RoomID,
"event_type": wanted,
}).WithError(err).Info("couldn't find state to strip")
continue
}
// Append the stripped down copy of the state to our list.
for _, headeredEvent := range queryRes.StateEvents {
event := headeredEvent.Unwrap()
strippedState = append(strippedState, gomatrixserverlib.NewInviteV2StrippedState(&event))
log.WithFields(log.Fields{
"room_id": queryReq.RoomID,
"event_type": event.Type(),
}).Info("adding stripped state")
} }
} }

View file

@ -20,6 +20,7 @@ import (
"github.com/matrix-org/dendrite/common/basecomponent" "github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/federationsender/consumers" "github.com/matrix-org/dendrite/federationsender/consumers"
"github.com/matrix-org/dendrite/federationsender/producers"
"github.com/matrix-org/dendrite/federationsender/query" "github.com/matrix-org/dendrite/federationsender/query"
"github.com/matrix-org/dendrite/federationsender/queue" "github.com/matrix-org/dendrite/federationsender/queue"
"github.com/matrix-org/dendrite/federationsender/storage" "github.com/matrix-org/dendrite/federationsender/storage"
@ -34,13 +35,16 @@ func SetupFederationSenderComponent(
base *basecomponent.BaseDendrite, base *basecomponent.BaseDendrite,
federation *gomatrixserverlib.FederationClient, federation *gomatrixserverlib.FederationClient,
rsQueryAPI roomserverAPI.RoomserverQueryAPI, rsQueryAPI roomserverAPI.RoomserverQueryAPI,
rsInputAPI roomserverAPI.RoomserverInputAPI,
) api.FederationSenderQueryAPI { ) api.FederationSenderQueryAPI {
federationSenderDB, err := storage.NewDatabase(string(base.Cfg.Database.FederationSender)) federationSenderDB, err := storage.NewDatabase(string(base.Cfg.Database.FederationSender))
if err != nil { if err != nil {
logrus.WithError(err).Panic("failed to connect to federation sender db") logrus.WithError(err).Panic("failed to connect to federation sender db")
} }
queues := queue.NewOutgoingQueues(base.Cfg.Matrix.ServerName, federation) roomserverProducer := producers.NewRoomserverProducer(rsInputAPI, base.Cfg.Matrix.ServerName)
queues := queue.NewOutgoingQueues(base.Cfg.Matrix.ServerName, federation, roomserverProducer)
rsConsumer := consumers.NewOutputRoomEventConsumer( rsConsumer := consumers.NewOutputRoomEventConsumer(
base.Cfg, base.KafkaConsumer, queues, base.Cfg, base.KafkaConsumer, queues,

View file

@ -0,0 +1,66 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package producers
import (
"context"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
)
// RoomserverProducer produces events for the roomserver to consume.
type RoomserverProducer struct {
InputAPI api.RoomserverInputAPI
serverName gomatrixserverlib.ServerName
}
// NewRoomserverProducer creates a new RoomserverProducer
func NewRoomserverProducer(
inputAPI api.RoomserverInputAPI, serverName gomatrixserverlib.ServerName,
) *RoomserverProducer {
return &RoomserverProducer{
InputAPI: inputAPI,
serverName: serverName,
}
}
// SendInviteResponse drops an invite response back into the roomserver so that users
// already in the room will be notified of the new invite. The invite response is signed
// by the remote side.
func (c *RoomserverProducer) SendInviteResponse(
ctx context.Context, res gomatrixserverlib.RespInviteV2, roomVersion gomatrixserverlib.RoomVersion,
) (string, error) {
ev := res.Event.Headered(roomVersion)
ire := api.InputRoomEvent{
Kind: api.KindNew,
Event: ev,
AuthEventIDs: ev.AuthEventIDs(),
SendAsServer: string(c.serverName),
TransactionID: nil,
}
return c.SendInputRoomEvents(ctx, []api.InputRoomEvent{ire})
}
// SendInputRoomEvents writes the given input room events to the roomserver input API.
func (c *RoomserverProducer) SendInputRoomEvents(
ctx context.Context, ires []api.InputRoomEvent,
) (eventID string, err error) {
request := api.InputRoomEventsRequest{InputRoomEvents: ires}
var response api.InputRoomEventsResponse
err = c.InputAPI.InputRoomEvents(ctx, &request, &response)
eventID = response.EventID
return
}

View file

@ -21,6 +21,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/matrix-org/dendrite/federationsender/producers"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -32,6 +33,7 @@ import (
// ensures that only one request is in flight to a given destination // ensures that only one request is in flight to a given destination
// at a time. // at a time.
type destinationQueue struct { type destinationQueue struct {
rsProducer *producers.RoomserverProducer
client *gomatrixserverlib.FederationClient client *gomatrixserverlib.FederationClient
origin gomatrixserverlib.ServerName origin gomatrixserverlib.ServerName
destination gomatrixserverlib.ServerName destination gomatrixserverlib.ServerName
@ -165,18 +167,38 @@ func (oq *destinationQueue) nextInvites() bool {
} }
for _, inviteReq := range oq.pendingInvites { for _, inviteReq := range oq.pendingInvites {
ev := inviteReq.Event() ev, roomVersion := inviteReq.Event(), inviteReq.RoomVersion()
if _, err := oq.client.SendInviteV2( log.WithFields(log.Fields{
"event_id": ev.EventID(),
"room_version": roomVersion,
"destination": oq.destination,
}).Info("sending invite")
inviteRes, err := oq.client.SendInviteV2(
context.TODO(), context.TODO(),
oq.destination, oq.destination,
*inviteReq, *inviteReq,
); err != nil { )
if err != nil {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"event_id": ev.EventID(), "event_id": ev.EventID(),
"state_key": ev.StateKey(), "state_key": ev.StateKey(),
"destination": oq.destination, "destination": oq.destination,
}).WithError(err).Error("failed to send invite") }).WithError(err).Error("failed to send invite")
continue
}
if _, err = oq.rsProducer.SendInviteResponse(
context.TODO(),
inviteRes,
roomVersion,
); err != nil {
log.WithFields(log.Fields{
"event_id": ev.EventID(),
"state_key": ev.StateKey(),
"destination": oq.destination,
}).WithError(err).Error("failed to return signed invite to roomserver")
} }
} }

View file

@ -18,6 +18,7 @@ import (
"fmt" "fmt"
"sync" "sync"
"github.com/matrix-org/dendrite/federationsender/producers"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
@ -25,19 +26,25 @@ import (
// OutgoingQueues is a collection of queues for sending transactions to other // OutgoingQueues is a collection of queues for sending transactions to other
// matrix servers // matrix servers
type OutgoingQueues struct { type OutgoingQueues struct {
origin gomatrixserverlib.ServerName rsProducer *producers.RoomserverProducer
client *gomatrixserverlib.FederationClient origin gomatrixserverlib.ServerName
client *gomatrixserverlib.FederationClient
// The queuesMutex protects queues // The queuesMutex protects queues
queuesMutex sync.Mutex queuesMutex sync.Mutex
queues map[gomatrixserverlib.ServerName]*destinationQueue queues map[gomatrixserverlib.ServerName]*destinationQueue
} }
// NewOutgoingQueues makes a new OutgoingQueues // NewOutgoingQueues makes a new OutgoingQueues
func NewOutgoingQueues(origin gomatrixserverlib.ServerName, client *gomatrixserverlib.FederationClient) *OutgoingQueues { func NewOutgoingQueues(
origin gomatrixserverlib.ServerName,
client *gomatrixserverlib.FederationClient,
rsProducer *producers.RoomserverProducer,
) *OutgoingQueues {
return &OutgoingQueues{ return &OutgoingQueues{
origin: origin, rsProducer: rsProducer,
client: client, origin: origin,
queues: map[gomatrixserverlib.ServerName]*destinationQueue{}, client: client,
queues: map[gomatrixserverlib.ServerName]*destinationQueue{},
} }
} }
@ -67,6 +74,7 @@ func (oqs *OutgoingQueues) SendEvent(
oq := oqs.queues[destination] oq := oqs.queues[destination]
if oq == nil { if oq == nil {
oq = &destinationQueue{ oq = &destinationQueue{
rsProducer: oqs.rsProducer,
origin: oqs.origin, origin: oqs.origin,
destination: destination, destination: destination,
client: oqs.client, client: oqs.client,
@ -111,6 +119,7 @@ func (oqs *OutgoingQueues) SendInvite(
oq := oqs.queues[destination] oq := oqs.queues[destination]
if oq == nil { if oq == nil {
oq = &destinationQueue{ oq = &destinationQueue{
rsProducer: oqs.rsProducer,
origin: oqs.origin, origin: oqs.origin,
destination: destination, destination: destination,
client: oqs.client, client: oqs.client,
@ -151,6 +160,7 @@ func (oqs *OutgoingQueues) SendEDU(
oq := oqs.queues[destination] oq := oqs.queues[destination]
if oq == nil { if oq == nil {
oq = &destinationQueue{ oq = &destinationQueue{
rsProducer: oqs.rsProducer,
origin: oqs.origin, origin: oqs.origin,
destination: destination, destination: destination,
client: oqs.client, client: oqs.client,

3
go.mod
View file

@ -17,7 +17,7 @@ require (
github.com/matrix-org/go-http-js-libp2p v0.0.0-20200318135427-31631a9ef51f github.com/matrix-org/go-http-js-libp2p v0.0.0-20200318135427-31631a9ef51f
github.com/matrix-org/go-sqlite3-js v0.0.0-20200325174927-327088cdef10 github.com/matrix-org/go-sqlite3-js v0.0.0-20200325174927-327088cdef10
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26
github.com/matrix-org/gomatrixserverlib v0.0.0-20200427152419-6a0535cc473a github.com/matrix-org/gomatrixserverlib v0.0.0-20200428095012-a95e289995b1
github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7
github.com/mattn/go-sqlite3 v2.0.2+incompatible github.com/mattn/go-sqlite3 v2.0.2+incompatible
@ -28,6 +28,7 @@ require (
github.com/prometheus/client_golang v1.4.1 github.com/prometheus/client_golang v1.4.1
github.com/prometheus/common v0.9.1 github.com/prometheus/common v0.9.1
github.com/sirupsen/logrus v1.4.2 github.com/sirupsen/logrus v1.4.2
github.com/tidwall/gjson v1.6.0
github.com/uber/jaeger-client-go v2.15.0+incompatible github.com/uber/jaeger-client-go v2.15.0+incompatible
github.com/uber/jaeger-lib v1.5.0 github.com/uber/jaeger-lib v1.5.0
go.uber.org/atomic v1.4.0 go.uber.org/atomic v1.4.0

2
go.sum
View file

@ -377,6 +377,8 @@ github.com/matrix-org/gomatrixserverlib v0.0.0-20200427134702-21db6d1430e3 h1:aJ
github.com/matrix-org/gomatrixserverlib v0.0.0-20200427134702-21db6d1430e3/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= github.com/matrix-org/gomatrixserverlib v0.0.0-20200427134702-21db6d1430e3/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200427152419-6a0535cc473a h1:tlXCVU3eab9kksGYBRA3oyrmIRwD/aPujo5KJCdlCVQ= github.com/matrix-org/gomatrixserverlib v0.0.0-20200427152419-6a0535cc473a h1:tlXCVU3eab9kksGYBRA3oyrmIRwD/aPujo5KJCdlCVQ=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200427152419-6a0535cc473a/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= github.com/matrix-org/gomatrixserverlib v0.0.0-20200427152419-6a0535cc473a/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200428095012-a95e289995b1 h1:TB4V69eOtvmHdFp0+BgLNrDCcCwq6QDUOTjmi8fjC/M=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200428095012-a95e289995b1/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU=
github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1 h1:osLoFdOy+ChQqVUn2PeTDETFftVkl4w9t/OW18g3lnk= github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1 h1:osLoFdOy+ChQqVUn2PeTDETFftVkl4w9t/OW18g3lnk=
github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1/go.mod h1:cXoYQIENbdWIQHt1SyCo6Bl3C3raHwJ0wgVrXHSqf+A= github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1/go.mod h1:cXoYQIENbdWIQHt1SyCo6Bl3C3raHwJ0wgVrXHSqf+A=
github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f h1:pRz4VTiRCO4zPlEMc3ESdUOcW4PXHH4Kj+YDz1XyE+Y= github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f h1:pRz4VTiRCO4zPlEMc3ESdUOcW4PXHH4Kj+YDz1XyE+Y=

View file

@ -89,6 +89,8 @@ type InputInviteEvent struct {
RoomVersion gomatrixserverlib.RoomVersion `json:"room_version"` RoomVersion gomatrixserverlib.RoomVersion `json:"room_version"`
Event gomatrixserverlib.HeaderedEvent `json:"event"` Event gomatrixserverlib.HeaderedEvent `json:"event"`
InviteRoomState []gomatrixserverlib.InviteV2StrippedState `json:"invite_room_state"` InviteRoomState []gomatrixserverlib.InviteV2StrippedState `json:"invite_room_state"`
SendAsServer string `json:"send_as_server"`
TransactionID *TransactionID `json:"transaction_id"`
} }
// InputRoomEventsRequest is a request to InputRoomEvents // InputRoomEventsRequest is a request to InputRoomEvents

View file

@ -194,8 +194,23 @@ func processInviteEvent(
event := input.Event.Unwrap() event := input.Event.Unwrap()
if err = event.SetUnsignedField("invite_room_state", input.InviteRoomState); err != nil { if len(input.InviteRoomState) > 0 {
return err // If we were supplied with some invite room state already (which is
// most likely to be if the event came in over federation) then use
// that.
if err = event.SetUnsignedField("invite_room_state", input.InviteRoomState); err != nil {
return err
}
} else {
// There's no invite room state, so let's have a go at building it
// up from local data (which is most likely to be if the event came
// from the CS API). If we know about the room then we can insert
// the invite room state, if we don't then we just fail quietly.
if irs, ierr := buildInviteStrippedState(ctx, db, input); ierr == nil {
if err = event.SetUnsignedField("invite_room_state", irs); err != nil {
return err
}
}
} }
outputUpdates, err := updateToInviteMembership(updater, &event, nil, input.Event.RoomVersion) outputUpdates, err := updateToInviteMembership(updater, &event, nil, input.Event.RoomVersion)
@ -210,3 +225,50 @@ func processInviteEvent(
succeeded = true succeeded = true
return nil return nil
} }
func buildInviteStrippedState(
ctx context.Context,
db storage.Database,
input api.InputInviteEvent,
) ([]gomatrixserverlib.InviteV2StrippedState, error) {
roomNID, err := db.RoomNID(ctx, input.Event.RoomID())
if err != nil || roomNID == 0 {
return nil, fmt.Errorf("room %q unknown", input.Event.RoomID())
}
stateWanted := []gomatrixserverlib.StateKeyTuple{}
for _, t := range []string{
gomatrixserverlib.MRoomName, gomatrixserverlib.MRoomCanonicalAlias,
gomatrixserverlib.MRoomAliases, gomatrixserverlib.MRoomJoinRules,
} {
stateWanted = append(stateWanted, gomatrixserverlib.StateKeyTuple{
EventType: t,
StateKey: "",
})
}
_, currentStateSnapshotNID, _, err := db.LatestEventIDs(ctx, roomNID)
if err != nil {
return nil, err
}
roomState := state.NewStateResolution(db)
stateEntries, err := roomState.LoadStateAtSnapshotForStringTuples(
ctx, currentStateSnapshotNID, stateWanted,
)
if err != nil {
return nil, err
}
stateNIDs := []types.EventNID{}
for _, stateNID := range stateEntries {
stateNIDs = append(stateNIDs, stateNID.EventNID)
}
stateEvents, err := db.Events(ctx, stateNIDs)
if err != nil {
return nil, err
}
inviteState := []gomatrixserverlib.InviteV2StrippedState{
gomatrixserverlib.NewInviteV2StrippedState(&input.Event.Event),
}
for _, event := range stateEvents {
inviteState = append(inviteState, gomatrixserverlib.NewInviteV2StrippedState(&event.Event))
}
return inviteState, nil
}

View file

@ -144,7 +144,7 @@ func updateToInviteMembership(
// consider a single stream of events when determining whether a user // consider a single stream of events when determining whether a user
// is invited, rather than having to combine multiple streams themselves. // is invited, rather than having to combine multiple streams themselves.
onie := api.OutputNewInviteEvent{ onie := api.OutputNewInviteEvent{
Event: (*add).Headered(roomVersion), Event: add.Headered(roomVersion),
RoomVersion: roomVersion, RoomVersion: roomVersion,
} }
updates = append(updates, api.OutputEvent{ updates = append(updates, api.OutputEvent{

View file

@ -59,7 +59,7 @@ func (r *RoomserverQueryAPI) QueryLatestEventsAndState(
roomState := state.NewStateResolution(r.DB) roomState := state.NewStateResolution(r.DB)
response.QueryLatestEventsAndStateRequest = *request response.QueryLatestEventsAndStateRequest = *request
roomNID, err := r.DB.RoomNID(ctx, request.RoomID) roomNID, err := r.DB.RoomNIDExcludingStubs(ctx, request.RoomID)
if err != nil { if err != nil {
return err return err
} }
@ -119,7 +119,7 @@ func (r *RoomserverQueryAPI) QueryStateAfterEvents(
roomState := state.NewStateResolution(r.DB) roomState := state.NewStateResolution(r.DB)
response.QueryStateAfterEventsRequest = *request response.QueryStateAfterEventsRequest = *request
roomNID, err := r.DB.RoomNID(ctx, request.RoomID) roomNID, err := r.DB.RoomNIDExcludingStubs(ctx, request.RoomID)
if err != nil { if err != nil {
return err return err
} }
@ -710,7 +710,7 @@ func (r *RoomserverQueryAPI) QueryStateAndAuthChain(
response *api.QueryStateAndAuthChainResponse, response *api.QueryStateAndAuthChainResponse,
) error { ) error {
response.QueryStateAndAuthChainRequest = *request response.QueryStateAndAuthChainRequest = *request
roomNID, err := r.DB.RoomNID(ctx, request.RoomID) roomNID, err := r.DB.RoomNIDExcludingStubs(ctx, request.RoomID)
if err != nil { if err != nil {
return err return err
} }

View file

@ -72,6 +72,10 @@ type Database interface {
GetLatestEventsForUpdate(ctx context.Context, roomNID types.RoomNID) (types.RoomRecentEventsUpdater, error) GetLatestEventsForUpdate(ctx context.Context, roomNID types.RoomNID) (types.RoomRecentEventsUpdater, error)
GetTransactionEventID(ctx context.Context, transactionID string, sessionID int64, userID string) (string, error) GetTransactionEventID(ctx context.Context, transactionID string, sessionID int64, userID string) (string, error)
RoomNID(ctx context.Context, roomID string) (types.RoomNID, error) RoomNID(ctx context.Context, roomID string) (types.RoomNID, error)
// RoomNIDExcludingStubs is a special variation of RoomNID that will return 0 as if the room
// does not exist if the room has no latest events. This can happen when we've received an
// invite over federation for a room that we don't know anything else about yet.
RoomNIDExcludingStubs(ctx context.Context, roomID string) (types.RoomNID, error)
LatestEventIDs(ctx context.Context, roomNID types.RoomNID) ([]gomatrixserverlib.EventReference, types.StateSnapshotNID, int64, error) LatestEventIDs(ctx context.Context, roomNID types.RoomNID) ([]gomatrixserverlib.EventReference, types.StateSnapshotNID, int64, error)
GetInvitesForUser(ctx context.Context, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID) (senderUserIDs []types.EventStateKeyNID, err error) GetInvitesForUser(ctx context.Context, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID) (senderUserIDs []types.EventStateKeyNID, err error)
SetRoomAlias(ctx context.Context, alias string, roomID string, creatorUserID string) error SetRoomAlias(ctx context.Context, alias string, roomID string, creatorUserID string) error

View file

@ -471,6 +471,23 @@ func (d *Database) RoomNID(ctx context.Context, roomID string) (types.RoomNID, e
return roomNID, err return roomNID, err
} }
// RoomNIDExcludingStubs implements query.RoomserverQueryAPIDB
func (d *Database) RoomNIDExcludingStubs(ctx context.Context, roomID string) (roomNID types.RoomNID, err error) {
roomNID, err = d.RoomNID(ctx, roomID)
if err != nil {
return
}
latestEvents, _, err := d.statements.selectLatestEventNIDs(ctx, roomNID)
if err != nil {
return
}
if len(latestEvents) == 0 {
roomNID = 0
return
}
return
}
// LatestEventIDs implements query.RoomserverQueryAPIDatabase // LatestEventIDs implements query.RoomserverQueryAPIDatabase
func (d *Database) LatestEventIDs( func (d *Database) LatestEventIDs(
ctx context.Context, roomNID types.RoomNID, ctx context.Context, roomNID types.RoomNID,

View file

@ -590,6 +590,23 @@ func (d *Database) RoomNID(ctx context.Context, roomID string) (roomNID types.Ro
return return
} }
// RoomNIDExcludingStubs implements query.RoomserverQueryAPIDB
func (d *Database) RoomNIDExcludingStubs(ctx context.Context, roomID string) (roomNID types.RoomNID, err error) {
roomNID, err = d.RoomNID(ctx, roomID)
if err != nil {
return
}
latestEvents, _, err := d.statements.selectLatestEventNIDs(ctx, nil, roomNID)
if err != nil {
return
}
if len(latestEvents) == 0 {
roomNID = 0
return
}
return
}
// LatestEventIDs implements query.RoomserverQueryAPIDatabase // LatestEventIDs implements query.RoomserverQueryAPIDatabase
func (d *Database) LatestEventIDs( func (d *Database) LatestEventIDs(
ctx context.Context, roomNID types.RoomNID, ctx context.Context, roomNID types.RoomNID,

View file

@ -210,7 +210,10 @@ func (r *messagesReq) retrieveEvents() (
} }
// Sort the events to ensure we send them in the right order. // Sort the events to ensure we send them in the right order.
events = gomatrixserverlib.HeaderedReverseTopologicalOrdering(events, gomatrixserverlib.TopologicalOrderByPrevEvents) events = gomatrixserverlib.HeaderedReverseTopologicalOrdering(
events,
gomatrixserverlib.TopologicalOrderByPrevEvents,
)
if r.backwardOrdering { if r.backwardOrdering {
// This reverses the array from old->new to new->old // This reverses the array from old->new to new->old
sort.SliceStable(events, func(i, j int) bool { sort.SliceStable(events, func(i, j int) bool {

View file

@ -752,11 +752,7 @@ func (d *SyncServerDatasource) addInvitesToResponse(
return err return err
} }
for roomID, inviteEvent := range invites { for roomID, inviteEvent := range invites {
ir := types.NewInviteResponse() ir := types.NewInviteResponse(inviteEvent)
ir.InviteState.Events = gomatrixserverlib.ToClientEvents(
[]gomatrixserverlib.Event{inviteEvent.Event}, gomatrixserverlib.FormatSync,
)
// TODO: add the invite state from the invite event.
res.Rooms.Invite[roomID] = *ir res.Rooms.Invite[roomID] = *ir
} }
return nil return nil

View file

@ -799,11 +799,7 @@ func (d *SyncServerDatasource) addInvitesToResponse(
return err return err
} }
for roomID, inviteEvent := range invites { for roomID, inviteEvent := range invites {
ir := types.NewInviteResponse() ir := types.NewInviteResponse(inviteEvent)
ir.InviteState.Events = gomatrixserverlib.HeaderedToClientEvents(
[]gomatrixserverlib.HeaderedEvent{inviteEvent}, gomatrixserverlib.FormatSync,
)
// TODO: add the invite state from the invite event.
res.Rooms.Invite[roomID] = *ir res.Rooms.Invite[roomID] = *ir
} }
return nil return nil

View file

@ -23,6 +23,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/tidwall/gjson"
) )
var ( var (
@ -247,14 +248,17 @@ func NewJoinResponse() *JoinResponse {
// InviteResponse represents a /sync response for a room which is under the 'invite' key. // InviteResponse represents a /sync response for a room which is under the 'invite' key.
type InviteResponse struct { type InviteResponse struct {
InviteState struct { InviteState struct {
Events []gomatrixserverlib.ClientEvent `json:"events"` Events json.RawMessage `json:"events"`
} `json:"invite_state"` } `json:"invite_state"`
} }
// NewInviteResponse creates an empty response with initialised arrays. // NewInviteResponse creates an empty response with initialised arrays.
func NewInviteResponse() *InviteResponse { func NewInviteResponse(event gomatrixserverlib.HeaderedEvent) *InviteResponse {
res := InviteResponse{} res := InviteResponse{}
res.InviteState.Events = make([]gomatrixserverlib.ClientEvent, 0) res.InviteState.Events = json.RawMessage{'[', ']'}
if inviteRoomState := gjson.GetBytes(event.Unsigned(), "invite_room_state"); inviteRoomState.Exists() {
res.InviteState.Events = json.RawMessage(inviteRoomState.Raw)
}
return &res return &res
} }

View file

@ -253,3 +253,8 @@ User can invite local user to room with version 3
User can invite local user to room with version 4 User can invite local user to room with version 4
A pair of servers can establish a join in a v2 room A pair of servers can establish a join in a v2 room
Can logout all devices Can logout all devices
State from remote users is included in the timeline in an incremental sync
User can invite remote user to room with version 1
User can invite remote user to room with version 2
User can invite remote user to room with version 3
User can invite remote user to room with version 4