try to be btter behaved when updating room_version in syncapi

This commit is contained in:
Neil Alexander 2020-03-10 17:31:53 +00:00
parent 6ba953efb8
commit 9016bbd44b
5 changed files with 40 additions and 7 deletions

View file

@ -27,6 +27,7 @@ import (
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/tidwall/gjson"
sarama "gopkg.in/Shopify/sarama.v1" sarama "gopkg.in/Shopify/sarama.v1"
) )
@ -98,8 +99,13 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
func (s *OutputRoomEventConsumer) onNewRoomEvent( func (s *OutputRoomEventConsumer) onNewRoomEvent(
ctx context.Context, msg api.OutputNewRoomEvent, ctx context.Context, msg api.OutputNewRoomEvent,
) error { ) error {
roomVersion := gomatrixserverlib.RoomVersionV1
if rv := gjson.Get(string(msg.Event), "content.room_version"); rv.Exists() {
roomVersion = gomatrixserverlib.RoomVersion(rv.String())
}
// TODO: Is this trusted here? // TODO: Is this trusted here?
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(msg.Event, false, msg.RoomVersion) ev, err := gomatrixserverlib.NewEventFromTrustedJSON(msg.Event, false, roomVersion)
if err != nil { if err != nil {
log.WithError(err).WithField("roomversion", msg.RoomVersion).Errorf( log.WithError(err).WithField("roomversion", msg.RoomVersion).Errorf(
"roomserver output log: couldn't create event from trusted JSON (%d bytes)", "roomserver output log: couldn't create event from trusted JSON (%d bytes)",

View file

@ -50,7 +50,7 @@ CREATE TABLE IF NOT EXISTS syncapi_current_room_state (
-- part of the current state of the room. -- part of the current state of the room.
added_at BIGINT, added_at BIGINT,
-- The version of the room -- The version of the room
room_version TEXT NOT NULL, room_version TEXT,
-- Clobber based on 3-uple of room_id, type and state_key -- Clobber based on 3-uple of room_id, type and state_key
CONSTRAINT syncapi_room_state_unique UNIQUE (room_id, type, state_key) CONSTRAINT syncapi_room_state_unique UNIQUE (room_id, type, state_key)
); );
@ -61,8 +61,8 @@ CREATE INDEX IF NOT EXISTS syncapi_membership_idx ON syncapi_current_room_state(
` `
const upsertRoomStateSQL = "" + const upsertRoomStateSQL = "" +
"INSERT INTO syncapi_current_room_state (room_id, event_id, type, sender, contains_url, state_key, event_json, membership, added_at)" + "INSERT INTO syncapi_current_room_state (room_id, event_id, type, sender, contains_url, state_key, event_json, membership, added_at, room_version)" +
" VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)" + " VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)" +
" ON CONFLICT ON CONSTRAINT syncapi_room_state_unique" + " ON CONFLICT ON CONSTRAINT syncapi_room_state_unique" +
" DO UPDATE SET event_id = $2, sender=$4, contains_url=$5, event_json = $7, membership = $8, added_at = $9" " DO UPDATE SET event_id = $2, sender=$4, contains_url=$5, event_json = $7, membership = $8, added_at = $9"
@ -229,6 +229,8 @@ func (s *currentRoomStateStatements) upsertRoomState(
ctx context.Context, txn *sql.Tx, ctx context.Context, txn *sql.Tx,
event gomatrixserverlib.Event, membership *string, addedAt types.StreamPosition, event gomatrixserverlib.Event, membership *string, addedAt types.StreamPosition,
) error { ) error {
roomVersionString := ""
// Parse content as JSON and search for an "url" key // Parse content as JSON and search for an "url" key
containsURL := false containsURL := false
var content map[string]interface{} var content map[string]interface{}
@ -237,6 +239,12 @@ func (s *currentRoomStateStatements) upsertRoomState(
_, containsURL = content["url"] _, containsURL = content["url"]
} }
if event.Type() == gomatrixserverlib.MRoomCreate {
if rv, hasRv := content["room_version"]; hasRv {
roomVersionString = rv.(string)
}
}
// upsert state event // upsert state event
stmt := common.TxStmt(txn, s.upsertRoomStateStmt) stmt := common.TxStmt(txn, s.upsertRoomStateStmt)
_, err := stmt.ExecContext( _, err := stmt.ExecContext(
@ -250,6 +258,7 @@ func (s *currentRoomStateStatements) upsertRoomState(
event.JSON(), event.JSON(),
membership, membership,
addedAt, addedAt,
roomVersionString,
) )
return err return err
} }

View file

@ -170,23 +170,28 @@ func (d *SyncServerDatasource) WriteEvent(
ctx, txn, ev, addStateEventIDs, removeStateEventIDs, transactionID, excludeFromSync, ctx, txn, ev, addStateEventIDs, removeStateEventIDs, transactionID, excludeFromSync,
) )
if err != nil { if err != nil {
fmt.Println("d.events.insertEvent:", err)
return err return err
} }
pduPosition = pos pduPosition = pos
if err = d.topology.insertEventInTopology(ctx, ev); err != nil { if err = d.topology.insertEventInTopology(ctx, ev); err != nil {
fmt.Println("d.topology.insertEventInTopology:", err)
return err return err
} }
if err = d.handleBackwardExtremities(ctx, ev); err != nil { if err = d.handleBackwardExtremities(ctx, ev); err != nil {
fmt.Println("d.handleBackwardExtremities:", err)
return err return err
} }
if len(addStateEvents) == 0 && len(removeStateEventIDs) == 0 { if len(addStateEvents) == 0 && len(removeStateEventIDs) == 0 {
// Nothing to do, the event may have just been a message event. // Nothing to do, the event may have just been a message event.
fmt.Println("not a state event")
return nil return nil
} }
fmt.Println("d.updateRoomState")
return d.updateRoomState(ctx, txn, removeStateEventIDs, addStateEvents, pduPosition) return d.updateRoomState(ctx, txn, removeStateEventIDs, addStateEvents, pduPosition)
}) })

View file

@ -38,7 +38,7 @@ CREATE TABLE IF NOT EXISTS syncapi_current_room_state (
event_json TEXT NOT NULL, event_json TEXT NOT NULL,
membership TEXT, membership TEXT,
added_at BIGINT, added_at BIGINT,
room_version TEXT NOT NULL, room_version TEXT, -- only set for m.room.create
UNIQUE (room_id, type, state_key) UNIQUE (room_id, type, state_key)
); );
-- for event deletion -- for event deletion
@ -48,8 +48,8 @@ CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_current_room_s
` `
const upsertRoomStateSQL = "" + const upsertRoomStateSQL = "" +
"INSERT INTO syncapi_current_room_state (room_id, event_id, type, sender, contains_url, state_key, event_json, membership, added_at)" + "INSERT INTO syncapi_current_room_state (room_id, event_id, type, sender, contains_url, state_key, event_json, membership, added_at, room_version)" +
" VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)" + " VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)" +
" ON CONFLICT (event_id, room_id, type, sender, contains_url)" + " ON CONFLICT (event_id, room_id, type, sender, contains_url)" +
" DO UPDATE SET event_id = $2, sender=$4, contains_url=$5, event_json = $7, membership = $8, added_at = $9" " DO UPDATE SET event_id = $2, sender=$4, contains_url=$5, event_json = $7, membership = $8, added_at = $9"
@ -208,12 +208,20 @@ func (s *currentRoomStateStatements) upsertRoomState(
ctx context.Context, txn *sql.Tx, ctx context.Context, txn *sql.Tx,
event gomatrixserverlib.Event, membership *string, addedAt types.StreamPosition, event gomatrixserverlib.Event, membership *string, addedAt types.StreamPosition,
) error { ) error {
roomVersionString := ""
// Parse content as JSON and search for an "url" key // Parse content as JSON and search for an "url" key
containsURL := false containsURL := false
var content map[string]interface{} var content map[string]interface{}
if json.Unmarshal(event.Content(), &content) != nil { if json.Unmarshal(event.Content(), &content) != nil {
// Set containsURL to true if url is present // Set containsURL to true if url is present
_, containsURL = content["url"] _, containsURL = content["url"]
if event.Type() == gomatrixserverlib.MRoomCreate {
if rv, hasRv := content["room_version"]; hasRv {
roomVersionString = rv.(string)
}
}
} }
// upsert state event // upsert state event
@ -229,6 +237,7 @@ func (s *currentRoomStateStatements) upsertRoomState(
event.JSON(), event.JSON(),
membership, membership,
addedAt, addedAt,
roomVersionString,
) )
return err return err
} }

View file

@ -196,15 +196,18 @@ func (d *SyncServerDatasource) WriteEvent(
ctx, txn, ev, addStateEventIDs, removeStateEventIDs, transactionID, excludeFromSync, ctx, txn, ev, addStateEventIDs, removeStateEventIDs, transactionID, excludeFromSync,
) )
if err != nil { if err != nil {
fmt.Println("d.events.insertEvent:", err)
return err return err
} }
pduPosition = pos pduPosition = pos
if err = d.topology.insertEventInTopology(ctx, txn, ev); err != nil { if err = d.topology.insertEventInTopology(ctx, txn, ev); err != nil {
fmt.Println("d.topology.insertEventInTopology:", err)
return err return err
} }
if err = d.handleBackwardExtremities(ctx, txn, ev); err != nil { if err = d.handleBackwardExtremities(ctx, txn, ev); err != nil {
fmt.Println("d.handleBackwardExtremities:", err)
return err return err
} }
@ -213,6 +216,7 @@ func (d *SyncServerDatasource) WriteEvent(
return nil return nil
} }
fmt.Println("d.updateRoomState...")
return d.updateRoomState(ctx, txn, removeStateEventIDs, addStateEvents, pduPosition) return d.updateRoomState(ctx, txn, removeStateEventIDs, addStateEvents, pduPosition)
}) })