Sending invites sorta happens now

This commit is contained in:
Neil Alexander 2020-04-02 10:27:52 +01:00
parent a0fc798978
commit 2122f9a7cb
10 changed files with 62 additions and 27 deletions

View file

@ -104,18 +104,11 @@ func (c *RoomserverProducer) SendInputRoomEvents(
// This should only be needed for invite events that occur outside of a known room.
// If we are in the room then the event should be sent using the SendEvents method.
func (c *RoomserverProducer) SendInvite(
ctx context.Context, inviteEvent gomatrixserverlib.Event,
ctx context.Context, inviteEvent gomatrixserverlib.HeaderedEvent,
) error {
verReq := api.QueryRoomVersionForRoomRequest{RoomID: inviteEvent.RoomID()}
verRes := api.QueryRoomVersionForRoomResponse{}
err := c.QueryAPI.QueryRoomVersionForRoom(ctx, &verReq, &verRes)
if err != nil {
return err
}
request := api.InputRoomEventsRequest{
InputInviteEvents: []api.InputInviteEvent{{
Event: inviteEvent.Headered(verRes.RoomVersion),
Event: inviteEvent,
}},
}
var response api.InputRoomEventsResponse

View file

@ -42,17 +42,7 @@ func Invite(
JSON: jsonerror.NotJSON("The request body could not be decoded into an invite request. " + err.Error()),
}
}
// In v2, the room_version field is mandatory.
// https://matrix.org/docs/spec/server_server/r0.1.3#put-matrix-federation-v2-invite-roomid-eventid
// Decode the event JSON from the request.
event, err := gomatrixserverlib.NewEventFromUntrustedJSON(inviteReq.Event(), inviteReq.RoomVersion())
if err != nil {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.NotJSON("The request body could not be decoded into valid JSON. " + err.Error()),
}
}
event := inviteReq.Event()
// Check that the room ID is correct.
if event.RoomID() != roomID {
@ -95,7 +85,7 @@ func Invite(
)
// Add the invite event to the roomserver.
if err = producer.SendInvite(httpReq.Context(), signedEvent); err != nil {
if err = producer.SendInvite(httpReq.Context(), signedEvent.Headered(inviteReq.RoomVersion())); err != nil {
util.GetLogger(httpReq.Context()).WithError(err).Error("producer.SendInvite failed")
return jsonerror.InternalServerError()
}

View file

@ -114,12 +114,37 @@ func (oq *destinationQueue) next() *gomatrixserverlib.Transaction {
if len(oq.pendingInvites) > 0 {
for _, invite := range oq.pendingInvites {
if _, err := oq.client.SendInvite(context.TODO(), oq.destination, invite.Unwrap()); err != nil {
// TODO: Get the correct stripped state here
inviteReq, err := gomatrixserverlib.NewInviteV2Request(
invite,
[]gomatrixserverlib.InviteV2StrippedState{},
)
if err != nil {
log.WithFields(log.Fields{
"event_id": invite.EventID(),
"state_key": invite.StateKey(),
"destination": oq.destination,
}).Info("failed to send invite")
}).WithError(err).Error("failed to create invite request")
continue
}
ev := inviteReq.Event()
log.WithFields(log.Fields{
"event_id": ev.EventID(),
"state_key": ev.StateKey(),
"room_version": inviteReq.RoomVersion(),
}).WithError(err).Info("created invite request")
if _, err := oq.client.SendInviteV2(
context.TODO(),
oq.destination,
inviteReq,
); err != nil {
log.WithFields(log.Fields{
"event_id": invite.EventID(),
"state_key": invite.StateKey(),
"destination": oq.destination,
}).WithError(err).Error("failed to send invite")
}
}
oq.pendingInvites = oq.pendingInvites[:0]

2
go.mod
View file

@ -9,7 +9,7 @@ require (
github.com/matrix-org/go-http-js-libp2p v0.0.0-20200318135427-31631a9ef51f
github.com/matrix-org/go-sqlite3-js v0.0.0-20200325174927-327088cdef10
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26
github.com/matrix-org/gomatrixserverlib v0.0.0-20200401161903-a3e8d6416da9
github.com/matrix-org/gomatrixserverlib v0.0.0-20200402091320-7d0d154abbc0
github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7
github.com/mattn/go-sqlite3 v2.0.2+incompatible

6
go.sum
View file

@ -134,6 +134,12 @@ github.com/matrix-org/gomatrixserverlib v0.0.0-20200401160213-1316eff9cef1 h1:7U
github.com/matrix-org/gomatrixserverlib v0.0.0-20200401160213-1316eff9cef1/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200401161903-a3e8d6416da9 h1:67k9RvLu4hss1gaBOmW1RDlEa3uVA63ffWdgpCjD4xk=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200401161903-a3e8d6416da9/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200402085213-cc2e2618557a h1:Y62i3CUxY0sc4u0bJ14nJcyre57oyQkhgNJcyYCpgZY=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200402085213-cc2e2618557a/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200402085826-4984d4aff1fe h1:kjWJTx0of+DHkCRF74vdLRKpGexMVas2Wo3lBn2vgmA=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200402085826-4984d4aff1fe/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200402091320-7d0d154abbc0 h1:D6rx1SGsGr76s9JaQ3OkNkA8wvJvSWOwAx3s5iEHnVg=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200402091320-7d0d154abbc0/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI=
github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1 h1:osLoFdOy+ChQqVUn2PeTDETFftVkl4w9t/OW18g3lnk=
github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1/go.mod h1:cXoYQIENbdWIQHt1SyCo6Bl3C3raHwJ0wgVrXHSqf+A=
github.com/matrix-org/util v0.0.0-20171127121716-2e2df66af2f5 h1:W7l5CP4V7wPyPb4tYE11dbmeAOwtFQBTW0rf4OonOS8=

View file

@ -26,6 +26,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/state/database"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus"
)
// A RoomEventDatabase has the storage APIs needed to store a room event.
@ -193,6 +194,12 @@ func processInviteEvent(
roomID := input.Event.RoomID()
targetUserID := *input.Event.StateKey()
log.WithFields(log.Fields{
"event_id": input.Event.EventID(),
"room_id": roomID,
"target_user_id": targetUserID,
}).Info("processing invite event")
updater, err := db.MembershipUpdater(ctx, roomID, targetUserID)
if err != nil {
return err
@ -237,7 +244,7 @@ func processInviteEvent(
}
event := input.Event.Unwrap()
outputUpdates, err := updateToInviteMembership(updater, &event, nil)
outputUpdates, err := updateToInviteMembership(updater, &event, nil, input.Event.RoomVersion)
if err != nil {
return err
}

View file

@ -112,7 +112,7 @@ func updateMembership(
switch newMembership {
case gomatrixserverlib.Invite:
return updateToInviteMembership(mu, add, updates)
return updateToInviteMembership(mu, add, updates, updater.RoomVersion())
case gomatrixserverlib.Join:
return updateToJoinMembership(mu, add, updates)
case gomatrixserverlib.Leave, gomatrixserverlib.Ban:
@ -126,6 +126,7 @@ func updateMembership(
func updateToInviteMembership(
mu types.MembershipUpdater, add *gomatrixserverlib.Event, updates []api.OutputEvent,
roomVersion gomatrixserverlib.RoomVersion,
) ([]api.OutputEvent, error) {
// We may have already sent the invite to the user, either because we are
// reprocessing this event, or because the we received this invite from a
@ -136,7 +137,6 @@ func updateToInviteMembership(
return nil, err
}
if needsSending {
roomVersion := gomatrixserverlib.RoomVersionV1
// We notify the consumers using a special event even though we will
// notify them about the change in current state as part of the normal
// room event stream. This ensures that the consumers only have to

View file

@ -393,6 +393,12 @@ type roomRecentEventsUpdater struct {
currentStateSnapshotNID types.StateSnapshotNID
}
// RoomVersion implements types.RoomRecentEventsUpdater
func (u *roomRecentEventsUpdater) RoomVersion() (version gomatrixserverlib.RoomVersion) {
version, _ = u.d.GetRoomVersionForRoomNID(u.ctx, u.roomNID)
return
}
// LatestEvents implements types.RoomRecentEventsUpdater
func (u *roomRecentEventsUpdater) LatestEvents() []types.StateAtEventAndReference {
return u.latestEvents

View file

@ -486,6 +486,12 @@ type roomRecentEventsUpdater struct {
currentStateSnapshotNID types.StateSnapshotNID
}
// RoomVersion implements types.RoomRecentEventsUpdater
func (u *roomRecentEventsUpdater) RoomVersion() (version gomatrixserverlib.RoomVersion) {
version, _ = u.d.GetRoomVersionForRoomNID(u.ctx, u.roomNID)
return
}
// LatestEvents implements types.RoomRecentEventsUpdater
func (u *roomRecentEventsUpdater) LatestEvents() []types.StateAtEventAndReference {
return u.latestEvents

View file

@ -140,6 +140,8 @@ type StateEntryList struct {
// (On postgresql this wraps a database transaction that holds a "FOR UPDATE"
// lock on the row in the rooms table holding the latest events for the room.)
type RoomRecentEventsUpdater interface {
// The room version of the room.
RoomVersion() gomatrixserverlib.RoomVersion
// The latest event IDs and state in the room.
LatestEvents() []StateAtEventAndReference
// The event ID of the latest event written to the output log in the room.