Mangle everything so that room versions are now sent in the kafka events

This commit is contained in:
Neil Alexander 2020-03-11 17:34:06 +00:00
parent 059e45980b
commit 61dffdacf0
13 changed files with 205 additions and 114 deletions

View file

@ -15,8 +15,10 @@
package consumers package consumers
import ( import (
"bytes"
"context" "context"
"encoding/json" "encoding/json"
"errors"
"github.com/matrix-org/dendrite/appservice/storage" "github.com/matrix-org/dendrite/appservice/storage"
"github.com/matrix-org/dendrite/appservice/types" "github.com/matrix-org/dendrite/appservice/types"
@ -81,6 +83,30 @@ func (s *OutputRoomEventConsumer) Start() error {
func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
// Parse out the event JSON // Parse out the event JSON
var output api.OutputEvent var output api.OutputEvent
// See if the room version is present in the headers. If it isn't
// then we can't process the event as we don't know what the format
// will be
var roomVersion gomatrixserverlib.RoomVersion
for _, header := range msg.Headers {
if bytes.Equal(header.Key, []byte("room_version")) {
roomVersion = gomatrixserverlib.RoomVersion(header.Value)
break
}
}
if roomVersion == "" {
return errors.New("room version was not in sarama headers")
}
// Prepare the room event so that it has the correct field types
// for the room version
if err := output.NewRoomEvent.Event.PrepareAs(roomVersion); err != nil {
log.WithFields(log.Fields{
"room_version": roomVersion,
}).WithError(err).Errorf("can't prepare event to version")
return err
}
if err := json.Unmarshal(msg.Value, &output); err != nil { if err := json.Unmarshal(msg.Value, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream // If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("roomserver output log: message parse failure") log.WithError(err).Errorf("roomserver output log: message parse failure")
@ -94,31 +120,6 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
return nil return nil
} }
// Get the room version of the room
vQueryReq := api.QueryRoomVersionForRoomIDRequest{RoomID: string(msg.Key)}
vQueryRes := api.QueryRoomVersionForRoomIDResponse{}
if err := s.query.QueryRoomVersionForRoomID(context.Background(), &vQueryReq, &vQueryRes); err != nil {
log.WithFields(log.Fields{
"room_id": string(msg.Key),
}).WithError(err).Errorf("can't query room version")
return err
}
// Prepare the room event so that it has the correct field types
// for the room version
if err := output.NewRoomEvent.Event.PrepareAs(vQueryRes.RoomVersion); err != nil {
log.WithFields(log.Fields{
"room_version": vQueryRes.RoomVersion,
}).WithError(err).Errorf("can't prepare event to version")
return err
}
if err := json.Unmarshal(msg.Value, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("roomserver output log: message parse failure")
return nil
}
ev := output.NewRoomEvent.Event ev := output.NewRoomEvent.Event
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"event_id": ev.EventID(), "event_id": ev.EventID(),

View file

@ -15,8 +15,10 @@
package consumers package consumers
import ( import (
"bytes"
"context" "context"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common"
@ -74,6 +76,30 @@ func (s *OutputRoomEventConsumer) Start() error {
func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
// Parse out the event JSON // Parse out the event JSON
var output api.OutputEvent var output api.OutputEvent
// See if the room version is present in the headers. If it isn't
// then we can't process the event as we don't know what the format
// will be
var roomVersion gomatrixserverlib.RoomVersion
for _, header := range msg.Headers {
if bytes.Equal(header.Key, []byte("room_version")) {
roomVersion = gomatrixserverlib.RoomVersion(header.Value)
break
}
}
if roomVersion == "" {
return errors.New("room version was not in sarama headers")
}
// Prepare the room event so that it has the correct field types
// for the room version
if err := output.NewRoomEvent.Event.PrepareAs(roomVersion); err != nil {
log.WithFields(log.Fields{
"room_version": roomVersion,
}).WithError(err).Errorf("can't prepare event to version")
return err
}
if err := json.Unmarshal(msg.Value, &output); err != nil { if err := json.Unmarshal(msg.Value, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream // If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("roomserver output log: message parse failure") log.WithError(err).Errorf("roomserver output log: message parse failure")
@ -87,31 +113,6 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
return nil return nil
} }
// Get the room version of the room
vQueryReq := api.QueryRoomVersionForRoomIDRequest{RoomID: string(msg.Key)}
vQueryRes := api.QueryRoomVersionForRoomIDResponse{}
if err := s.query.QueryRoomVersionForRoomID(context.Background(), &vQueryReq, &vQueryRes); err != nil {
log.WithFields(log.Fields{
"room_id": string(msg.Key),
}).WithError(err).Errorf("can't query room version")
return err
}
// Prepare the room event so that it has the correct field types
// for the room version
if err := output.NewRoomEvent.Event.PrepareAs(vQueryRes.RoomVersion); err != nil {
log.WithFields(log.Fields{
"room_version": vQueryRes.RoomVersion,
}).WithError(err).Errorf("can't prepare event to version")
return err
}
if err := json.Unmarshal(msg.Value, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("roomserver output log: message parse failure")
return nil
}
ev := output.NewRoomEvent.Event ev := output.NewRoomEvent.Event
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"event_id": ev.EventID(), "event_id": ev.EventID(),

View file

@ -15,13 +15,16 @@
package consumers package consumers
import ( import (
"bytes"
"context" "context"
"encoding/json" "encoding/json"
"errors"
"github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/config" "github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/publicroomsapi/storage" "github.com/matrix-org/dendrite/publicroomsapi/storage"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
sarama "gopkg.in/Shopify/sarama.v1" sarama "gopkg.in/Shopify/sarama.v1"
) )
@ -63,6 +66,30 @@ func (s *OutputRoomEventConsumer) Start() error {
// onMessage is called when the sync server receives a new event from the room server output log. // onMessage is called when the sync server receives a new event from the room server output log.
func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
var output api.OutputEvent var output api.OutputEvent
// See if the room version is present in the headers. If it isn't
// then we can't process the event as we don't know what the format
// will be
var roomVersion gomatrixserverlib.RoomVersion
for _, header := range msg.Headers {
if bytes.Equal(header.Key, []byte("room_version")) {
roomVersion = gomatrixserverlib.RoomVersion(header.Value)
break
}
}
if roomVersion == "" {
return errors.New("room version was not in sarama headers")
}
// Prepare the room event so that it has the correct field types
// for the room version
if err := output.NewRoomEvent.Event.PrepareAs(roomVersion); err != nil {
log.WithFields(log.Fields{
"room_version": roomVersion,
}).WithError(err).Errorf("can't prepare event to version")
return err
}
if err := json.Unmarshal(msg.Value, &output); err != nil { if err := json.Unmarshal(msg.Value, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream // If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("roomserver output log: message parse failure") log.WithError(err).Errorf("roomserver output log: message parse failure")
@ -77,32 +104,6 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
return nil return nil
} }
// Get the room version of the room
vQueryReq := api.QueryRoomVersionForRoomIDRequest{RoomID: string(msg.Key)}
vQueryRes := api.QueryRoomVersionForRoomIDResponse{}
if err := s.query.QueryRoomVersionForRoomID(context.Background(), &vQueryReq, &vQueryRes); err != nil {
log.WithFields(log.Fields{
"room_id": string(msg.Key),
}).WithError(err).Errorf("can't query room version")
return err
}
// Prepare the room event so that it has the correct field types
// for the room version
if err := output.NewRoomEvent.Event.PrepareAs(vQueryRes.RoomVersion); err != nil {
log.WithFields(log.Fields{
"room_version": vQueryRes.RoomVersion,
}).WithError(err).Errorf("can't prepare event to version")
return err
}
// Parse out the event JSON
if err := json.Unmarshal(msg.Value, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("roomserver output log: message parse failure")
return nil
}
ev := output.NewRoomEvent.Event ev := output.NewRoomEvent.Event
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"event_id": ev.EventID(), "event_id": ev.EventID(),

View file

@ -73,7 +73,10 @@ type RoomEventDatabase interface {
sessionID int64, userID string, sessionID int64, userID string,
) (string, error) ) (string, error)
// Look up the room version from the database. // Look up the room version from the database.
GetRoomVersionForRoom( GetRoomVersionForRoomID(
ctx context.Context, roomID string,
) (gomatrixserverlib.RoomVersion, error)
GetRoomVersionForRoomNID(
ctx context.Context, roomNID types.RoomNID, ctx context.Context, roomNID types.RoomNID,
) (gomatrixserverlib.RoomVersion, error) ) (gomatrixserverlib.RoomVersion, error)
} }
@ -81,7 +84,7 @@ type RoomEventDatabase interface {
// OutputRoomEventWriter has the APIs needed to write an event to the output logs. // OutputRoomEventWriter has the APIs needed to write an event to the output logs.
type OutputRoomEventWriter interface { type OutputRoomEventWriter interface {
// Write a list of events for a room // Write a list of events for a room
WriteOutputEvents(roomID string, updates []api.OutputEvent) error WriteOutputEvents(roomID string, roomVersion gomatrixserverlib.RoomVersion, updates []api.OutputEvent) error
} }
// processRoomEvent can only be called once at a time // processRoomEvent can only be called once at a time
@ -156,7 +159,7 @@ func calculateAndSetState(
stateAtEvent *types.StateAtEvent, stateAtEvent *types.StateAtEvent,
event gomatrixserverlib.Event, event gomatrixserverlib.Event,
) error { ) error {
roomVersion, err := db.GetRoomVersionForRoom(ctx, roomNID) roomVersion, err := db.GetRoomVersionForRoomNID(ctx, roomNID)
if err != nil { if err != nil {
return err return err
} }
@ -196,6 +199,7 @@ func processInviteEvent(
} }
roomID := input.Event.RoomID() roomID := input.Event.RoomID()
roomVersion := gomatrixserverlib.RoomVersionV1 // TODO: Feeeeeeex
targetUserID := *input.Event.StateKey() targetUserID := *input.Event.StateKey()
updater, err := db.MembershipUpdater(ctx, roomID, targetUserID) updater, err := db.MembershipUpdater(ctx, roomID, targetUserID)
@ -246,7 +250,7 @@ func processInviteEvent(
return err return err
} }
if err = ow.WriteOutputEvents(roomID, outputUpdates); err != nil { if err = ow.WriteOutputEvents(roomID, roomVersion, outputUpdates); err != nil {
return err return err
} }

View file

@ -23,6 +23,7 @@ import (
"github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
sarama "gopkg.in/Shopify/sarama.v1" sarama "gopkg.in/Shopify/sarama.v1"
) )
@ -39,7 +40,9 @@ type RoomserverInputAPI struct {
} }
// WriteOutputEvents implements OutputRoomEventWriter // WriteOutputEvents implements OutputRoomEventWriter
func (r *RoomserverInputAPI) WriteOutputEvents(roomID string, updates []api.OutputEvent) error { func (r *RoomserverInputAPI) WriteOutputEvents(
roomID string, roomVersion gomatrixserverlib.RoomVersion, updates []api.OutputEvent,
) error {
messages := make([]*sarama.ProducerMessage, len(updates)) messages := make([]*sarama.ProducerMessage, len(updates))
for i := range updates { for i := range updates {
value, err := json.Marshal(updates[i]) value, err := json.Marshal(updates[i])
@ -50,6 +53,12 @@ func (r *RoomserverInputAPI) WriteOutputEvents(roomID string, updates []api.Outp
Topic: r.OutputRoomEventTopic, Topic: r.OutputRoomEventTopic,
Key: sarama.StringEncoder(roomID), Key: sarama.StringEncoder(roomID),
Value: sarama.ByteEncoder(value), Value: sarama.ByteEncoder(value),
Headers: []sarama.RecordHeader{
sarama.RecordHeader{
Key: []byte("room_version"),
Value: []byte(roomVersion),
},
},
} }
} }
return r.Producer.SendMessages(messages) return r.Producer.SendMessages(messages)

View file

@ -67,7 +67,7 @@ func updateLatestEvents(
} }
}() }()
roomVersion, err := db.GetRoomVersionForRoom(ctx, roomNID) roomVersion, err := db.GetRoomVersionForRoomNID(ctx, roomNID)
if err != nil { if err != nil {
return err return err
} }
@ -163,6 +163,8 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error {
} }
updates = append(updates, *update) updates = append(updates, *update)
roomVersion := gomatrixserverlib.RoomVersionV1
// Send the event to the output logs. // Send the event to the output logs.
// We do this inside the database transaction to ensure that we only mark an event as sent if we sent it. // We do this inside the database transaction to ensure that we only mark an event as sent if we sent it.
// (n.b. this means that it's possible that the same event will be sent twice if the transaction fails but // (n.b. this means that it's possible that the same event will be sent twice if the transaction fails but
@ -171,7 +173,7 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error {
// send the event asynchronously but we would need to ensure that 1) the events are written to the log in // send the event asynchronously but we would need to ensure that 1) the events are written to the log in
// the correct order, 2) that pending writes are resent across restarts. In order to avoid writing all the // the correct order, 2) that pending writes are resent across restarts. In order to avoid writing all the
// necessary bookkeeping we'll keep the event sending synchronous for now. // necessary bookkeeping we'll keep the event sending synchronous for now.
if err = u.ow.WriteOutputEvents(u.event.RoomID(), updates); err != nil { if err = u.ow.WriteOutputEvents(u.event.RoomID(), roomVersion, updates); err != nil {
return err return err
} }
@ -184,7 +186,7 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error {
func (u *latestEventsUpdater) latestState() error { func (u *latestEventsUpdater) latestState() error {
var err error var err error
roomVersion, err := u.db.GetRoomVersionForRoom(u.ctx, u.roomNID) roomVersion, err := u.db.GetRoomVersionForRoomNID(u.ctx, u.roomNID)
if err != nil { if err != nil {
return err return err
} }
@ -262,7 +264,7 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error)
latestEventIDs[i] = u.latest[i].EventID latestEventIDs[i] = u.latest[i].EventID
} }
roomVersion, err := u.db.GetRoomVersionForRoom(context.Background(), u.roomNID) roomVersion, err := u.db.GetRoomVersionForRoomNID(context.Background(), u.roomNID)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -90,7 +90,10 @@ type RoomserverQueryAPIDatabase interface {
context.Context, []types.EventStateKeyNID, context.Context, []types.EventStateKeyNID,
) (map[types.EventStateKeyNID]string, error) ) (map[types.EventStateKeyNID]string, error)
// Look up the room version from the database. // Look up the room version from the database.
GetRoomVersionForRoom( GetRoomVersionForRoomID(
ctx context.Context, roomID string,
) (gomatrixserverlib.RoomVersion, error)
GetRoomVersionForRoomNID(
ctx context.Context, roomNID types.RoomNID, ctx context.Context, roomNID types.RoomNID,
) (gomatrixserverlib.RoomVersion, error) ) (gomatrixserverlib.RoomVersion, error)
// Get the room NID for a given event ID. // Get the room NID for a given event ID.
@ -123,7 +126,7 @@ func (r *RoomserverQueryAPI) QueryLatestEventsAndState(
return nil return nil
} }
response.RoomExists = true response.RoomExists = true
roomVersion, err := r.DB.GetRoomVersionForRoom(ctx, roomNID) roomVersion, err := r.DB.GetRoomVersionForRoomID(ctx, request.RoomID)
if err != nil { if err != nil {
return err return err
} }
@ -170,7 +173,7 @@ func (r *RoomserverQueryAPI) QueryStateAfterEvents(
return nil return nil
} }
response.RoomExists = true response.RoomExists = true
roomVersion, err := r.DB.GetRoomVersionForRoom(ctx, roomNID) roomVersion, err := r.DB.GetRoomVersionForRoomID(ctx, request.RoomID)
if err != nil { if err != nil {
return err return err
} }
@ -350,7 +353,7 @@ func (r *RoomserverQueryAPI) getMembershipsBeforeEventNID(
if err != nil { if err != nil {
return nil, err return nil, err
} }
roomVersion, err := r.DB.GetRoomVersionForRoom(ctx, roomNID) roomVersion, err := r.DB.GetRoomVersionForRoomNID(ctx, roomNID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -464,7 +467,7 @@ func (r *RoomserverQueryAPI) checkServerAllowedToSeeEvent(
if err != nil { if err != nil {
return false, err return false, err
} }
roomVersion, err := r.DB.GetRoomVersionForRoom(ctx, roomNID) roomVersion, err := r.DB.GetRoomVersionForRoomNID(ctx, roomNID)
if err != nil { if err != nil {
return false, err return false, err
} }
@ -631,7 +634,7 @@ func (r *RoomserverQueryAPI) QueryStateAndAuthChain(
if err != nil { if err != nil {
return err return err
} }
roomVersion, err := r.DB.GetRoomVersionForRoom(ctx, roomNID) roomVersion, err := r.DB.GetRoomVersionForRoomID(ctx, request.RoomID)
if err != nil { if err != nil {
return err return err
} }
@ -786,14 +789,8 @@ func (r *RoomserverQueryAPI) QueryRoomVersionForRoomID(
request *api.QueryRoomVersionForRoomIDRequest, request *api.QueryRoomVersionForRoomIDRequest,
response *api.QueryRoomVersionForRoomIDResponse, response *api.QueryRoomVersionForRoomIDResponse,
) error { ) error {
// Get the room NID for the given room ID
roomNID, err := r.DB.RoomNID(ctx, request.RoomID)
if err != nil {
return err
}
// Then look up the room version for that room NID // Then look up the room version for that room NID
roomVersion, err := r.DB.GetRoomVersionForRoom(ctx, roomNID) roomVersion, err := r.DB.GetRoomVersionForRoomID(ctx, request.RoomID)
if err != nil { if err != nil {
return err return err
} }

View file

@ -45,7 +45,8 @@ type Database interface {
GetMembership(ctx context.Context, roomNID types.RoomNID, requestSenderUserID string) (membershipEventNID types.EventNID, stillInRoom bool, err error) GetMembership(ctx context.Context, roomNID types.RoomNID, requestSenderUserID string) (membershipEventNID types.EventNID, stillInRoom bool, err error)
GetMembershipEventNIDsForRoom(ctx context.Context, roomNID types.RoomNID, joinOnly bool) ([]types.EventNID, error) GetMembershipEventNIDsForRoom(ctx context.Context, roomNID types.RoomNID, joinOnly bool) ([]types.EventNID, error)
EventsFromIDs(ctx context.Context, eventIDs []string) ([]types.Event, error) EventsFromIDs(ctx context.Context, eventIDs []string) ([]types.Event, error)
GetRoomVersionForRoom(ctx context.Context, roomNID types.RoomNID) (gomatrixserverlib.RoomVersion, error) GetRoomVersionForRoomID(ctx context.Context, roomID string) (gomatrixserverlib.RoomVersion, error)
GetRoomVersionForRoomNID(ctx context.Context, roomNID types.RoomNID) (gomatrixserverlib.RoomVersion, error)
RoomNIDForEventID(ctx context.Context, eventID string) (types.RoomNID, error) RoomNIDForEventID(ctx context.Context, eventID string) (types.RoomNID, error)
RoomNIDForEventNID(ctx context.Context, eventNID types.EventNID) (types.RoomNID, error) RoomNIDForEventNID(ctx context.Context, eventNID types.EventNID) (types.RoomNID, error)
} }

View file

@ -65,6 +65,9 @@ const selectLatestEventNIDsForUpdateSQL = "" +
const updateLatestEventNIDsSQL = "" + const updateLatestEventNIDsSQL = "" +
"UPDATE roomserver_rooms SET latest_event_nids = $2, last_event_sent_nid = $3, state_snapshot_nid = $4 WHERE room_nid = $1" "UPDATE roomserver_rooms SET latest_event_nids = $2, last_event_sent_nid = $3, state_snapshot_nid = $4 WHERE room_nid = $1"
const selectRoomVersionForRoomIDSQL = "" +
"SELECT room_version FROM roomserver_rooms WHERE room_id = $1"
const selectRoomVersionForRoomNIDSQL = "" + const selectRoomVersionForRoomNIDSQL = "" +
"SELECT room_version FROM roomserver_rooms WHERE room_nid = $1" "SELECT room_version FROM roomserver_rooms WHERE room_nid = $1"
@ -74,6 +77,7 @@ type roomStatements struct {
selectLatestEventNIDsStmt *sql.Stmt selectLatestEventNIDsStmt *sql.Stmt
selectLatestEventNIDsForUpdateStmt *sql.Stmt selectLatestEventNIDsForUpdateStmt *sql.Stmt
updateLatestEventNIDsStmt *sql.Stmt updateLatestEventNIDsStmt *sql.Stmt
selectRoomVersionForRoomIDStmt *sql.Stmt
selectRoomVersionForRoomNIDStmt *sql.Stmt selectRoomVersionForRoomNIDStmt *sql.Stmt
} }
@ -88,6 +92,7 @@ func (s *roomStatements) prepare(db *sql.DB) (err error) {
{&s.selectLatestEventNIDsStmt, selectLatestEventNIDsSQL}, {&s.selectLatestEventNIDsStmt, selectLatestEventNIDsSQL},
{&s.selectLatestEventNIDsForUpdateStmt, selectLatestEventNIDsForUpdateSQL}, {&s.selectLatestEventNIDsForUpdateStmt, selectLatestEventNIDsForUpdateSQL},
{&s.updateLatestEventNIDsStmt, updateLatestEventNIDsSQL}, {&s.updateLatestEventNIDsStmt, updateLatestEventNIDsSQL},
{&s.selectRoomVersionForRoomIDStmt, selectRoomVersionForRoomIDSQL},
{&s.selectRoomVersionForRoomNIDStmt, selectRoomVersionForRoomNIDSQL}, {&s.selectRoomVersionForRoomNIDStmt, selectRoomVersionForRoomNIDSQL},
}.prepare(db) }.prepare(db)
} }
@ -173,3 +178,12 @@ func (s *roomStatements) selectRoomVersionForRoomNID(
err := stmt.QueryRowContext(ctx, roomNID).Scan(&roomVersion) err := stmt.QueryRowContext(ctx, roomNID).Scan(&roomVersion)
return roomVersion, err return roomVersion, err
} }
func (s *roomStatements) selectRoomVersionForRoomID(
ctx context.Context, txn *sql.Tx, roomID string,
) (gomatrixserverlib.RoomVersion, error) {
var roomVersion gomatrixserverlib.RoomVersion
stmt := common.TxStmt(txn, s.selectRoomVersionForRoomIDStmt)
err := stmt.QueryRowContext(ctx, roomID).Scan(&roomVersion)
return roomVersion, err
}

View file

@ -237,7 +237,7 @@ func (d *Database) Events(
if err != nil { if err != nil {
return nil, err return nil, err
} }
roomVersion, err := d.GetRoomVersionForRoom(ctx, roomNID) roomVersion, err := d.GetRoomVersionForRoomNID(ctx, roomNID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -743,7 +743,15 @@ func (d *Database) EventsFromIDs(ctx context.Context, eventIDs []string) ([]type
return d.Events(ctx, nids) return d.Events(ctx, nids)
} }
func (d *Database) GetRoomVersionForRoom( func (d *Database) GetRoomVersionForRoomID(
ctx context.Context, roomID string,
) (gomatrixserverlib.RoomVersion, error) {
return d.statements.selectRoomVersionForRoomID(
ctx, nil, roomID,
)
}
func (d *Database) GetRoomVersionForRoomNID(
ctx context.Context, roomNID types.RoomNID, ctx context.Context, roomNID types.RoomNID,
) (gomatrixserverlib.RoomVersion, error) { ) (gomatrixserverlib.RoomVersion, error) {
return d.statements.selectRoomVersionForRoomNID( return d.statements.selectRoomVersionForRoomNID(

View file

@ -54,6 +54,9 @@ const selectLatestEventNIDsForUpdateSQL = "" +
const updateLatestEventNIDsSQL = "" + const updateLatestEventNIDsSQL = "" +
"UPDATE roomserver_rooms SET latest_event_nids = $1, last_event_sent_nid = $2, state_snapshot_nid = $3 WHERE room_nid = $4" "UPDATE roomserver_rooms SET latest_event_nids = $1, last_event_sent_nid = $2, state_snapshot_nid = $3 WHERE room_nid = $4"
const selectRoomVersionForRoomIDSQL = "" +
"SELECT room_version FROM roomserver_rooms WHERE room_id = $1"
const selectRoomVersionForRoomNIDSQL = "" + const selectRoomVersionForRoomNIDSQL = "" +
"SELECT room_version FROM roomserver_rooms WHERE room_nid = $1" "SELECT room_version FROM roomserver_rooms WHERE room_nid = $1"
@ -63,6 +66,7 @@ type roomStatements struct {
selectLatestEventNIDsStmt *sql.Stmt selectLatestEventNIDsStmt *sql.Stmt
selectLatestEventNIDsForUpdateStmt *sql.Stmt selectLatestEventNIDsForUpdateStmt *sql.Stmt
updateLatestEventNIDsStmt *sql.Stmt updateLatestEventNIDsStmt *sql.Stmt
selectRoomVersionForRoomIDStmt *sql.Stmt
selectRoomVersionForRoomNIDStmt *sql.Stmt selectRoomVersionForRoomNIDStmt *sql.Stmt
} }
@ -77,6 +81,7 @@ func (s *roomStatements) prepare(db *sql.DB) (err error) {
{&s.selectLatestEventNIDsStmt, selectLatestEventNIDsSQL}, {&s.selectLatestEventNIDsStmt, selectLatestEventNIDsSQL},
{&s.selectLatestEventNIDsForUpdateStmt, selectLatestEventNIDsForUpdateSQL}, {&s.selectLatestEventNIDsForUpdateStmt, selectLatestEventNIDsForUpdateSQL},
{&s.updateLatestEventNIDsStmt, updateLatestEventNIDsSQL}, {&s.updateLatestEventNIDsStmt, updateLatestEventNIDsSQL},
{&s.selectRoomVersionForRoomIDStmt, selectRoomVersionForRoomIDSQL},
{&s.selectRoomVersionForRoomNIDStmt, selectRoomVersionForRoomNIDSQL}, {&s.selectRoomVersionForRoomNIDStmt, selectRoomVersionForRoomNIDSQL},
}.prepare(db) }.prepare(db)
} }
@ -157,6 +162,15 @@ func (s *roomStatements) updateLatestEventNIDs(
return err return err
} }
func (s *roomStatements) selectRoomVersionForRoomID(
ctx context.Context, txn *sql.Tx, roomID string,
) (gomatrixserverlib.RoomVersion, error) {
var roomVersion gomatrixserverlib.RoomVersion
stmt := common.TxStmt(txn, s.selectRoomVersionForRoomIDStmt)
err := stmt.QueryRowContext(ctx, roomID).Scan(&roomVersion)
return roomVersion, err
}
func (s *roomStatements) selectRoomVersionForRoomNID( func (s *roomStatements) selectRoomVersionForRoomNID(
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, ctx context.Context, txn *sql.Tx, roomNID types.RoomNID,
) (gomatrixserverlib.RoomVersion, error) { ) (gomatrixserverlib.RoomVersion, error) {

View file

@ -289,7 +289,7 @@ func (d *Database) Events(
if err != nil { if err != nil {
return err return err
} }
roomVersion, err := d.GetRoomVersionForRoom(ctx, roomNID) roomVersion, err := d.GetRoomVersionForRoomNID(ctx, roomNID)
if err != nil { if err != nil {
return err return err
} }
@ -896,7 +896,15 @@ func (d *Database) EventsFromIDs(ctx context.Context, eventIDs []string) ([]type
return d.Events(ctx, nids) return d.Events(ctx, nids)
} }
func (d *Database) GetRoomVersionForRoom( func (d *Database) GetRoomVersionForRoomID(
ctx context.Context, roomID string,
) (gomatrixserverlib.RoomVersion, error) {
return d.statements.selectRoomVersionForRoomID(
ctx, nil, roomID,
)
}
func (d *Database) GetRoomVersionForRoomNID(
ctx context.Context, roomNID types.RoomNID, ctx context.Context, roomNID types.RoomNID,
) (gomatrixserverlib.RoomVersion, error) { ) (gomatrixserverlib.RoomVersion, error) {
return d.statements.selectRoomVersionForRoomNID( return d.statements.selectRoomVersionForRoomNID(

View file

@ -15,8 +15,10 @@
package consumers package consumers
import ( import (
"bytes"
"context" "context"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common"
@ -74,18 +76,54 @@ func (s *OutputRoomEventConsumer) Start() error {
func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
// Parse out the event JSON // Parse out the event JSON
var output api.OutputEvent var output api.OutputEvent
if err := json.Unmarshal(msg.Value, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream // See if the room version is present in the headers. If it isn't
log.WithError(err).Errorf("roomserver output log: message parse failure") // then we can't process the event as we don't know what the format
return nil // will be
var roomVersion gomatrixserverlib.RoomVersion
for _, header := range msg.Headers {
if bytes.Equal(header.Key, []byte("room_version")) {
roomVersion = gomatrixserverlib.RoomVersion(header.Value)
break
}
}
if roomVersion == "" {
return errors.New("room version was not in sarama headers")
} }
switch output.Type { switch output.Type {
case api.OutputTypeNewRoomEvent: case api.OutputTypeNewRoomEvent:
if err := output.NewRoomEvent.Event.PrepareAs(roomVersion); err != nil {
log.WithFields(log.Fields{
"room_version": roomVersion,
}).WithError(err).Errorf("can't prepare event to version")
return err
}
if err := json.Unmarshal(msg.Value, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("roomserver output log: message parse failure")
return nil
}
return s.onNewRoomEvent(context.TODO(), *output.NewRoomEvent) return s.onNewRoomEvent(context.TODO(), *output.NewRoomEvent)
case api.OutputTypeNewInviteEvent: case api.OutputTypeNewInviteEvent:
if err := output.NewInviteEvent.Event.PrepareAs(roomVersion); err != nil {
log.WithFields(log.Fields{
"room_version": roomVersion,
}).WithError(err).Errorf("can't prepare event to version")
return err
}
if err := json.Unmarshal(msg.Value, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("roomserver output log: message parse failure")
return nil
}
return s.onNewInviteEvent(context.TODO(), *output.NewInviteEvent) return s.onNewInviteEvent(context.TODO(), *output.NewInviteEvent)
case api.OutputTypeRetireInviteEvent: case api.OutputTypeRetireInviteEvent:
if err := json.Unmarshal(msg.Value, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("roomserver output log: message parse failure")
return nil
}
return s.onRetireInviteEvent(context.TODO(), *output.RetireInviteEvent) return s.onRetireInviteEvent(context.TODO(), *output.RetireInviteEvent)
default: default:
log.WithField("type", output.Type).Debug( log.WithField("type", output.Type).Debug(
@ -100,13 +138,6 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
) error { ) error {
ev := msg.Event ev := msg.Event
if err := msg.Event.PrepareAs(msg.RoomVersion); err != nil {
log.WithFields(log.Fields{
"room_version": msg.RoomVersion,
}).WithError(err).Errorf("can't prepare event to version")
return err
}
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"event_id": ev.EventID(), "event_id": ev.EventID(),
"room_id": ev.RoomID(), "room_id": ev.RoomID(),