Make room server producers use headered events

This commit is contained in:
Neil Alexander 2020-03-24 14:28:38 +00:00
parent a8c23426e7
commit 6fe6fc6704
13 changed files with 141 additions and 46 deletions

View file

@ -37,31 +37,14 @@ func NewRoomserverProducer(inputAPI api.RoomserverInputAPI, queryAPI api.Roomser
// SendEvents writes the given events to the roomserver input log. The events are written with KindNew. // SendEvents writes the given events to the roomserver input log. The events are written with KindNew.
func (c *RoomserverProducer) SendEvents( func (c *RoomserverProducer) SendEvents(
ctx context.Context, events []gomatrixserverlib.Event, sendAsServer gomatrixserverlib.ServerName, ctx context.Context, events []gomatrixserverlib.HeaderedEvent, sendAsServer gomatrixserverlib.ServerName,
txnID *api.TransactionID, txnID *api.TransactionID,
) (string, error) { ) (string, error) {
roomVersions := make(map[string]gomatrixserverlib.RoomVersion)
ires := make([]api.InputRoomEvent, len(events)) ires := make([]api.InputRoomEvent, len(events))
for i, event := range events { for i, event := range events {
var roomVersion gomatrixserverlib.RoomVersion
roomID := event.RoomID()
if v, ok := roomVersions[roomID]; ok {
roomVersion = v
} else {
verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID}
verRes := api.QueryRoomVersionForRoomResponse{}
err := c.QueryAPI.QueryRoomVersionForRoom(ctx, &verReq, &verRes)
if err != nil {
return "", err
}
roomVersion = verRes.RoomVersion
roomVersions[roomID] = roomVersion
}
ires[i] = api.InputRoomEvent{ ires[i] = api.InputRoomEvent{
Kind: api.KindNew, Kind: api.KindNew,
Event: event.Headered(roomVersion), Event: event,
AuthEventIDs: event.AuthEventIDs(), AuthEventIDs: event.AuthEventIDs(),
SendAsServer: string(sendAsServer), SendAsServer: string(sendAsServer),
TransactionID: txnID, TransactionID: txnID,
@ -73,25 +56,18 @@ func (c *RoomserverProducer) SendEvents(
// SendEventWithState writes an event with KindNew to the roomserver input log // SendEventWithState writes an event with KindNew to the roomserver input log
// with the state at the event as KindOutlier before it. // with the state at the event as KindOutlier before it.
func (c *RoomserverProducer) SendEventWithState( func (c *RoomserverProducer) SendEventWithState(
ctx context.Context, state gomatrixserverlib.RespState, event gomatrixserverlib.Event, ctx context.Context, state gomatrixserverlib.RespState, event gomatrixserverlib.HeaderedEvent,
) error { ) error {
outliers, err := state.Events() outliers, err := state.Events()
if err != nil { if err != nil {
return err return err
} }
verReq := api.QueryRoomVersionForRoomRequest{RoomID: event.RoomID()}
verRes := api.QueryRoomVersionForRoomResponse{}
err = c.QueryAPI.QueryRoomVersionForRoom(ctx, &verReq, &verRes)
if err != nil {
return err
}
ires := make([]api.InputRoomEvent, len(outliers)+1) ires := make([]api.InputRoomEvent, len(outliers)+1)
for i, outlier := range outliers { for i, outlier := range outliers {
ires[i] = api.InputRoomEvent{ ires[i] = api.InputRoomEvent{
Kind: api.KindOutlier, Kind: api.KindOutlier,
Event: outlier.Headered(verRes.RoomVersion), Event: outlier.Headered(event.RoomVersion),
AuthEventIDs: outlier.AuthEventIDs(), AuthEventIDs: outlier.AuthEventIDs(),
} }
} }
@ -103,7 +79,7 @@ func (c *RoomserverProducer) SendEventWithState(
ires[len(outliers)] = api.InputRoomEvent{ ires[len(outliers)] = api.InputRoomEvent{
Kind: api.KindNew, Kind: api.KindNew,
Event: event.Headered(verRes.RoomVersion), Event: event,
AuthEventIDs: event.AuthEventIDs(), AuthEventIDs: event.AuthEventIDs(),
HasState: true, HasState: true,
StateEventIDs: stateEventIDs, StateEventIDs: stateEventIDs,

View file

@ -237,7 +237,7 @@ func createRoom(
historyVisibility = historyVisibilityShared historyVisibility = historyVisibilityShared
} }
var builtEvents []gomatrixserverlib.Event var builtEvents []gomatrixserverlib.HeaderedEvent
// send events into the room in order of: // send events into the room in order of:
// 1- m.room.create // 1- m.room.create
@ -311,7 +311,7 @@ func createRoom(
} }
// Add the event to the list of auth events // Add the event to the list of auth events
builtEvents = append(builtEvents, *ev) builtEvents = append(builtEvents, (*ev).Headered(roomVersion))
err = authEvents.AddEvent(ev) err = authEvents.AddEvent(ev)
if err != nil { if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("authEvents.AddEvent failed") util.GetLogger(req.Context()).WithError(err).Error("authEvents.AddEvent failed")

View file

@ -242,7 +242,12 @@ func (r joinRoomReq) joinRoomUsingServers(
queryRes := roomserverAPI.QueryLatestEventsAndStateResponse{} queryRes := roomserverAPI.QueryLatestEventsAndStateResponse{}
event, err := common.BuildEvent(r.req.Context(), &eb, r.cfg, r.evTime, r.queryAPI, &queryRes) event, err := common.BuildEvent(r.req.Context(), &eb, r.cfg, r.evTime, r.queryAPI, &queryRes)
if err == nil { if err == nil {
if _, err = r.producer.SendEvents(r.req.Context(), []gomatrixserverlib.Event{*event}, r.cfg.Matrix.ServerName, nil); err != nil { if _, err = r.producer.SendEvents(
r.req.Context(),
[]gomatrixserverlib.HeaderedEvent{(*event).Headered(queryRes.RoomVersion)},
r.cfg.Matrix.ServerName,
nil,
); err != nil {
util.GetLogger(r.req.Context()).WithError(err).Error("r.producer.SendEvents failed") util.GetLogger(r.req.Context()).WithError(err).Error("r.producer.SendEvents failed")
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
} }
@ -363,7 +368,9 @@ func (r joinRoomReq) joinRoomUsingServer(roomID string, server gomatrixserverlib
} }
if err = r.producer.SendEventWithState( if err = r.producer.SendEventWithState(
r.req.Context(), gomatrixserverlib.RespState(respSendJoin.RespState), event, r.req.Context(),
gomatrixserverlib.RespState(respSendJoin.RespState),
event.Headered(respMakeJoin.RoomVersion),
); err != nil { ); err != nil {
util.GetLogger(r.req.Context()).WithError(err).Error("gomatrixserverlib.RespState failed") util.GetLogger(r.req.Context()).WithError(err).Error("gomatrixserverlib.RespState failed")
res := jsonerror.InternalServerError() res := jsonerror.InternalServerError()

View file

@ -29,6 +29,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/threepid" "github.com/matrix-org/dendrite/clientapi/threepid"
"github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/config" "github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/roomserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
@ -45,6 +46,15 @@ func SendMembership(
queryAPI roomserverAPI.RoomserverQueryAPI, asAPI appserviceAPI.AppServiceQueryAPI, queryAPI roomserverAPI.RoomserverQueryAPI, asAPI appserviceAPI.AppServiceQueryAPI,
producer *producers.RoomserverProducer, producer *producers.RoomserverProducer,
) util.JSONResponse { ) util.JSONResponse {
verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID}
verRes := api.QueryRoomVersionForRoomResponse{}
if err := queryAPI.QueryRoomVersionForRoom(req.Context(), &verReq, &verRes); err != nil {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.UnsupportedRoomVersion(err.Error()),
}
}
var body threepid.MembershipRequest var body threepid.MembershipRequest
if reqErr := httputil.UnmarshalJSONRequest(req, &body); reqErr != nil { if reqErr := httputil.UnmarshalJSONRequest(req, &body); reqErr != nil {
return *reqErr return *reqErr
@ -95,7 +105,10 @@ func SendMembership(
} }
if _, err := producer.SendEvents( if _, err := producer.SendEvents(
req.Context(), []gomatrixserverlib.Event{*event}, cfg.Matrix.ServerName, nil, req.Context(),
[]gomatrixserverlib.HeaderedEvent{(*event).Headered(verRes.RoomVersion)},
cfg.Matrix.ServerName,
nil,
); err != nil { ); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("producer.SendEvents failed") util.GetLogger(req.Context()).WithError(err).Error("producer.SendEvents failed")
return jsonerror.InternalServerError() return jsonerror.InternalServerError()

View file

@ -338,10 +338,16 @@ func buildMembershipEvents(
memberships []authtypes.Membership, memberships []authtypes.Membership,
newProfile authtypes.Profile, userID string, cfg *config.Dendrite, newProfile authtypes.Profile, userID string, cfg *config.Dendrite,
evTime time.Time, queryAPI api.RoomserverQueryAPI, evTime time.Time, queryAPI api.RoomserverQueryAPI,
) ([]gomatrixserverlib.Event, error) { ) ([]gomatrixserverlib.HeaderedEvent, error) {
evs := []gomatrixserverlib.Event{} evs := []gomatrixserverlib.HeaderedEvent{}
for _, membership := range memberships { for _, membership := range memberships {
verReq := api.QueryRoomVersionForRoomRequest{RoomID: membership.EventID}
verRes := api.QueryRoomVersionForRoomResponse{}
if err := queryAPI.QueryRoomVersionForRoom(ctx, &verReq, &verRes); err != nil {
return []gomatrixserverlib.HeaderedEvent{}, err
}
builder := gomatrixserverlib.EventBuilder{ builder := gomatrixserverlib.EventBuilder{
Sender: userID, Sender: userID,
RoomID: membership.RoomID, RoomID: membership.RoomID,
@ -365,7 +371,7 @@ func buildMembershipEvents(
return nil, err return nil, err
} }
evs = append(evs, *event) evs = append(evs, (*event).Headered(verRes.RoomVersion))
} }
return evs, nil return evs, nil

View file

@ -48,6 +48,15 @@ func SendEvent(
producer *producers.RoomserverProducer, producer *producers.RoomserverProducer,
txnCache *transactions.Cache, txnCache *transactions.Cache,
) util.JSONResponse { ) util.JSONResponse {
verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID}
verRes := api.QueryRoomVersionForRoomResponse{}
if err := queryAPI.QueryRoomVersionForRoom(req.Context(), &verReq, &verRes); err != nil {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.UnsupportedRoomVersion(err.Error()),
}
}
if txnID != nil { if txnID != nil {
// Try to fetch response from transactionsCache // Try to fetch response from transactionsCache
if res, ok := txnCache.FetchTransaction(device.AccessToken, *txnID); ok { if res, ok := txnCache.FetchTransaction(device.AccessToken, *txnID); ok {
@ -71,7 +80,12 @@ func SendEvent(
// pass the new event to the roomserver and receive the correct event ID // pass the new event to the roomserver and receive the correct event ID
// event ID in case of duplicate transaction is discarded // event ID in case of duplicate transaction is discarded
eventID, err := producer.SendEvents( eventID, err := producer.SendEvents(
req.Context(), []gomatrixserverlib.Event{*e}, cfg.Matrix.ServerName, txnAndSessionID, req.Context(),
[]gomatrixserverlib.HeaderedEvent{
(*e).Headered(verRes.RoomVersion),
},
cfg.Matrix.ServerName,
txnAndSessionID,
) )
if err != nil { if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("producer.SendEvents failed") util.GetLogger(req.Context()).WithError(err).Error("producer.SendEvents failed")

View file

@ -359,6 +359,13 @@ func emit3PIDInviteEvent(
return err return err
} }
_, err = producer.SendEvents(ctx, []gomatrixserverlib.Event{*event}, cfg.Matrix.ServerName, nil) _, err = producer.SendEvents(
ctx,
[]gomatrixserverlib.HeaderedEvent{
(*event).Headered(queryRes.RoomVersion),
},
cfg.Matrix.ServerName,
nil,
)
return err return err
} }

View file

@ -209,7 +209,12 @@ func SendJoin(
// We are responsible for notifying other servers that the user has joined // We are responsible for notifying other servers that the user has joined
// the room, so set SendAsServer to cfg.Matrix.ServerName // the room, so set SendAsServer to cfg.Matrix.ServerName
_, err = producer.SendEvents( _, err = producer.SendEvents(
httpReq.Context(), []gomatrixserverlib.Event{event}, cfg.Matrix.ServerName, nil, httpReq.Context(),
[]gomatrixserverlib.HeaderedEvent{
event.Headered(stateAndAuthChainResponse.RoomVersion),
},
cfg.Matrix.ServerName,
nil,
) )
if err != nil { if err != nil {
util.GetLogger(httpReq.Context()).WithError(err).Error("producer.SendEvents failed") util.GetLogger(httpReq.Context()).WithError(err).Error("producer.SendEvents failed")

View file

@ -101,6 +101,15 @@ func SendLeave(
keys gomatrixserverlib.KeyRing, keys gomatrixserverlib.KeyRing,
roomID, eventID string, roomID, eventID string,
) util.JSONResponse { ) util.JSONResponse {
verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID}
verRes := api.QueryRoomVersionForRoomResponse{}
if err := producer.QueryAPI.QueryRoomVersionForRoom(httpReq.Context(), &verReq, &verRes); err != nil {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.UnsupportedRoomVersion(err.Error()),
}
}
var event gomatrixserverlib.Event var event gomatrixserverlib.Event
if err := json.Unmarshal(request.Content(), &event); err != nil { if err := json.Unmarshal(request.Content(), &event); err != nil {
return util.JSONResponse{ return util.JSONResponse{
@ -167,7 +176,14 @@ func SendLeave(
// Send the events to the room server. // Send the events to the room server.
// We are responsible for notifying other servers that the user has left // We are responsible for notifying other servers that the user has left
// the room, so set SendAsServer to cfg.Matrix.ServerName // the room, so set SendAsServer to cfg.Matrix.ServerName
_, err = producer.SendEvents(httpReq.Context(), []gomatrixserverlib.Event{event}, cfg.Matrix.ServerName, nil) _, err = producer.SendEvents(
httpReq.Context(),
[]gomatrixserverlib.HeaderedEvent{
event.Headered(verRes.RoomVersion),
},
cfg.Matrix.ServerName,
nil,
)
if err != nil { if err != nil {
util.GetLogger(httpReq.Context()).WithError(err).Error("producer.SendEvents failed") util.GetLogger(httpReq.Context()).WithError(err).Error("producer.SendEvents failed")
return jsonerror.InternalServerError() return jsonerror.InternalServerError()

View file

@ -179,7 +179,12 @@ func (t *txnReq) processEvent(e gomatrixserverlib.Event) error {
// TODO: Check that the event is allowed by its auth_events. // TODO: Check that the event is allowed by its auth_events.
// pass the event to the roomserver // pass the event to the roomserver
_, err := t.producer.SendEvents(t.context, []gomatrixserverlib.Event{e}, api.DoNotSendToOtherServers, nil) _, err := t.producer.SendEvents(
t.context,
stateResp.StateEvents,
api.DoNotSendToOtherServers,
nil,
)
return err return err
} }
@ -240,6 +245,13 @@ retryAllowedState:
} }
return err return err
} }
// pass the event along with the state to the roomserver
return t.producer.SendEventWithState(t.context, state, e) verReq := api.QueryRoomVersionForRoomRequest{RoomID: e.RoomID()}
verRes := api.QueryRoomVersionForRoomResponse{}
if err := t.query.QueryRoomVersionForRoom(context.Background(), &verReq, &verRes); err != nil {
return err
}
// pass the event along with the state to the roomserver
return t.producer.SendEventWithState(t.context, state, e.Headered(verRes.RoomVersion))
} }

View file

@ -28,6 +28,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/common/config" "github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/roomserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
@ -68,8 +69,17 @@ func CreateInvitesFrom3PIDInvites(
return *reqErr return *reqErr
} }
evs := []gomatrixserverlib.Event{} evs := []gomatrixserverlib.HeaderedEvent{}
for _, inv := range body.Invites { for _, inv := range body.Invites {
verReq := api.QueryRoomVersionForRoomRequest{RoomID: inv.RoomID}
verRes := api.QueryRoomVersionForRoomResponse{}
if err := queryAPI.QueryRoomVersionForRoom(req.Context(), &verReq, &verRes); err != nil {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.UnsupportedRoomVersion(err.Error()),
}
}
event, err := createInviteFrom3PIDInvite( event, err := createInviteFrom3PIDInvite(
req.Context(), queryAPI, asAPI, cfg, inv, federation, accountDB, req.Context(), queryAPI, asAPI, cfg, inv, federation, accountDB,
) )
@ -78,7 +88,7 @@ func CreateInvitesFrom3PIDInvites(
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
} }
if event != nil { if event != nil {
evs = append(evs, *event) evs = append(evs, (*event).Headered(verRes.RoomVersion))
} }
} }
@ -137,6 +147,15 @@ func ExchangeThirdPartyInvite(
} }
} }
verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID}
verRes := api.QueryRoomVersionForRoomResponse{}
if err := queryAPI.QueryRoomVersionForRoom(httpReq.Context(), &verReq, &verRes); err != nil {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.UnsupportedRoomVersion(err.Error()),
}
}
// Auth and build the event from what the remote server sent us // Auth and build the event from what the remote server sent us
event, err := buildMembershipEvent(httpReq.Context(), &builder, queryAPI, cfg) event, err := buildMembershipEvent(httpReq.Context(), &builder, queryAPI, cfg)
if err == errNotInRoom { if err == errNotInRoom {
@ -159,7 +178,12 @@ func ExchangeThirdPartyInvite(
// Send the event to the roomserver // Send the event to the roomserver
if _, err = producer.SendEvents( if _, err = producer.SendEvents(
httpReq.Context(), []gomatrixserverlib.Event{signedEvent.Event}, cfg.Matrix.ServerName, nil, httpReq.Context(),
[]gomatrixserverlib.HeaderedEvent{
signedEvent.Event.Headered(verRes.RoomVersion),
},
cfg.Matrix.ServerName,
nil,
); err != nil { ); err != nil {
util.GetLogger(httpReq.Context()).WithError(err).Error("producer.SendEvents failed") util.GetLogger(httpReq.Context()).WithError(err).Error("producer.SendEvents failed")
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
@ -181,6 +205,12 @@ func createInviteFrom3PIDInvite(
inv invite, federation *gomatrixserverlib.FederationClient, inv invite, federation *gomatrixserverlib.FederationClient,
accountDB accounts.Database, accountDB accounts.Database,
) (*gomatrixserverlib.Event, error) { ) (*gomatrixserverlib.Event, error) {
verReq := api.QueryRoomVersionForRoomRequest{RoomID: inv.RoomID}
verRes := api.QueryRoomVersionForRoomResponse{}
if err := queryAPI.QueryRoomVersionForRoom(ctx, &verReq, &verRes); err != nil {
return nil, err
}
_, server, err := gomatrixserverlib.SplitID('@', inv.MXID) _, server, err := gomatrixserverlib.SplitID('@', inv.MXID)
if err != nil { if err != nil {
return nil, err return nil, err

View file

@ -97,10 +97,12 @@ func processRoomEvent(
) (eventID string, err error) { ) (eventID string, err error) {
// Parse and validate the event JSON // Parse and validate the event JSON
event := input.Event event := input.Event
fmt.Println("Starting processRoomEvent")
// Check that the event passes authentication checks and work out the numeric IDs for the auth events. // Check that the event passes authentication checks and work out the numeric IDs for the auth events.
authEventNIDs, err := checkAuthEvents(ctx, db, event.Event, input.AuthEventIDs) authEventNIDs, err := checkAuthEvents(ctx, db, event.Event, input.AuthEventIDs)
if err != nil { if err != nil {
fmt.Println("checkAuthEvents:", err)
return return
} }
@ -111,6 +113,7 @@ func processRoomEvent(
) )
// On error OR event with the transaction already processed/processesing // On error OR event with the transaction already processed/processesing
if err != nil || eventID != "" { if err != nil || eventID != "" {
fmt.Println("db.GetTransactionEventID:", err)
return return
} }
} }
@ -118,6 +121,7 @@ func processRoomEvent(
// Store the event // Store the event
roomNID, stateAtEvent, err := db.StoreEvent(ctx, event.Event, input.TransactionID, authEventNIDs) roomNID, stateAtEvent, err := db.StoreEvent(ctx, event.Event, input.TransactionID, authEventNIDs)
if err != nil { if err != nil {
fmt.Println("db.StoreEvent:", err)
return return
} }
@ -133,6 +137,7 @@ func processRoomEvent(
// Lets calculate one. // Lets calculate one.
err = calculateAndSetState(ctx, db, input, roomNID, &stateAtEvent, event.Event) err = calculateAndSetState(ctx, db, input, roomNID, &stateAtEvent, event.Event)
if err != nil { if err != nil {
fmt.Println("calculateAndSetState:", err)
return return
} }
} }

View file

@ -18,6 +18,7 @@ package input
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"fmt"
"net/http" "net/http"
"sync" "sync"
@ -61,16 +62,19 @@ func (r *RoomserverInputAPI) InputRoomEvents(
request *api.InputRoomEventsRequest, request *api.InputRoomEventsRequest,
response *api.InputRoomEventsResponse, response *api.InputRoomEventsResponse,
) (err error) { ) (err error) {
fmt.Println("Starting InputRoomEvents")
// We lock as processRoomEvent can only be called once at a time // We lock as processRoomEvent can only be called once at a time
r.mutex.Lock() r.mutex.Lock()
defer r.mutex.Unlock() defer r.mutex.Unlock()
for i := range request.InputRoomEvents { for i := range request.InputRoomEvents {
if response.EventID, err = processRoomEvent(ctx, r.DB, r, request.InputRoomEvents[i]); err != nil { if response.EventID, err = processRoomEvent(ctx, r.DB, r, request.InputRoomEvents[i]); err != nil {
fmt.Println("processRoomEvent:", err)
return err return err
} }
} }
for i := range request.InputInviteEvents { for i := range request.InputInviteEvents {
if err = processInviteEvent(ctx, r.DB, r, request.InputInviteEvents[i]); err != nil { if err = processInviteEvent(ctx, r.DB, r, request.InputInviteEvents[i]); err != nil {
fmt.Println("processInviteEvent:", err)
return err return err
} }
} }