mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-21 13:53:09 -06:00
WIP syncapi work
This commit is contained in:
parent
d03f6ac725
commit
b1c4426a12
|
|
@ -23,7 +23,6 @@ import (
|
||||||
"github.com/matrix-org/dendrite/internal/eventutil"
|
"github.com/matrix-org/dendrite/internal/eventutil"
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/matrix-org/util"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type Database struct {
|
type Database struct {
|
||||||
|
|
@ -45,10 +44,7 @@ func (d *Database) RedactEvent(ctx context.Context, redactedEventID string, reda
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if len(events) != 1 {
|
if len(events) != 1 {
|
||||||
// this should never happen but is non-fatal
|
// this will happen for all non-state events
|
||||||
util.GetLogger(ctx).WithField("redacted_event_id", redactedEventID).WithField("redaction_event_id", redactedBecause.EventID()).Warnf(
|
|
||||||
"RedactEvent: missing redacted event",
|
|
||||||
)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
redactionEvent := redactedBecause.Unwrap()
|
redactionEvent := redactedBecause.Unwrap()
|
||||||
|
|
|
||||||
|
|
@ -86,6 +86,8 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
||||||
return s.onNewInviteEvent(context.TODO(), *output.NewInviteEvent)
|
return s.onNewInviteEvent(context.TODO(), *output.NewInviteEvent)
|
||||||
case api.OutputTypeRetireInviteEvent:
|
case api.OutputTypeRetireInviteEvent:
|
||||||
return s.onRetireInviteEvent(context.TODO(), *output.RetireInviteEvent)
|
return s.onRetireInviteEvent(context.TODO(), *output.RetireInviteEvent)
|
||||||
|
case api.OutputTypeRedactedEvent:
|
||||||
|
return s.onRedactEvent(context.TODO(), *output.RedactedEvent)
|
||||||
default:
|
default:
|
||||||
log.WithField("type", output.Type).Debug(
|
log.WithField("type", output.Type).Debug(
|
||||||
"roomserver output log: ignoring unknown output type",
|
"roomserver output log: ignoring unknown output type",
|
||||||
|
|
@ -94,6 +96,12 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *OutputRoomEventConsumer) onRedactEvent(
|
||||||
|
ctx context.Context, msg api.OutputRedactedEvent,
|
||||||
|
) error {
|
||||||
|
return c.db.RedactEvent(ctx, msg.RedactedEventID, &msg.RedactedBecause)
|
||||||
|
}
|
||||||
|
|
||||||
func (s *OutputRoomEventConsumer) onNewRoomEvent(
|
func (s *OutputRoomEventConsumer) onNewRoomEvent(
|
||||||
ctx context.Context, msg api.OutputNewRoomEvent,
|
ctx context.Context, msg api.OutputNewRoomEvent,
|
||||||
) error {
|
) error {
|
||||||
|
|
@ -173,12 +181,10 @@ func (s *OutputRoomEventConsumer) onRetireInviteEvent(
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *OutputRoomEventConsumer) updateStateEvent(event gomatrixserverlib.HeaderedEvent) (gomatrixserverlib.HeaderedEvent, error) {
|
func (s *OutputRoomEventConsumer) updateStateEvent(event gomatrixserverlib.HeaderedEvent) (gomatrixserverlib.HeaderedEvent, error) {
|
||||||
var stateKey string
|
|
||||||
if event.StateKey() == nil {
|
if event.StateKey() == nil {
|
||||||
stateKey = ""
|
return event, nil
|
||||||
} else {
|
|
||||||
stateKey = *event.StateKey()
|
|
||||||
}
|
}
|
||||||
|
stateKey := *event.StateKey()
|
||||||
|
|
||||||
prevEvent, err := s.db.GetStateEvent(
|
prevEvent, err := s.db.GetStateEvent(
|
||||||
context.TODO(), event.RoomID(), event.Type(), stateKey,
|
context.TODO(), event.RoomID(), event.Type(), stateKey,
|
||||||
|
|
|
||||||
|
|
@ -136,4 +136,6 @@ type Database interface {
|
||||||
// Returns the filterID as a string. Otherwise returns an error if something
|
// Returns the filterID as a string. Otherwise returns an error if something
|
||||||
// goes wrong.
|
// goes wrong.
|
||||||
PutFilter(ctx context.Context, localpart string, filter *gomatrixserverlib.Filter) (string, error)
|
PutFilter(ctx context.Context, localpart string, filter *gomatrixserverlib.Filter) (string, error)
|
||||||
|
// RedactEvent wipes an event in the database and sets the unsigned.redacted_because key to the redaction event
|
||||||
|
RedactEvent(ctx context.Context, redactedEventID string, redactedBecause *gomatrixserverlib.HeaderedEvent) error
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -99,6 +99,9 @@ const selectEarlyEventsSQL = "" +
|
||||||
const selectMaxEventIDSQL = "" +
|
const selectMaxEventIDSQL = "" +
|
||||||
"SELECT MAX(id) FROM syncapi_output_room_events"
|
"SELECT MAX(id) FROM syncapi_output_room_events"
|
||||||
|
|
||||||
|
const updateEventJSONSQL = "" +
|
||||||
|
"UPDATE syncapi_output_room_events SET headered_event_json=$1 WHERE event_id=$2"
|
||||||
|
|
||||||
// In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id).
|
// In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id).
|
||||||
const selectStateInRangeSQL = "" +
|
const selectStateInRangeSQL = "" +
|
||||||
"SELECT id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids" +
|
"SELECT id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids" +
|
||||||
|
|
@ -120,6 +123,7 @@ type outputRoomEventsStatements struct {
|
||||||
selectRecentEventsForSyncStmt *sql.Stmt
|
selectRecentEventsForSyncStmt *sql.Stmt
|
||||||
selectEarlyEventsStmt *sql.Stmt
|
selectEarlyEventsStmt *sql.Stmt
|
||||||
selectStateInRangeStmt *sql.Stmt
|
selectStateInRangeStmt *sql.Stmt
|
||||||
|
updateEventJSONStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
|
func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
|
||||||
|
|
@ -149,9 +153,21 @@ func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
|
||||||
if s.selectStateInRangeStmt, err = db.Prepare(selectStateInRangeSQL); err != nil {
|
if s.selectStateInRangeStmt, err = db.Prepare(selectStateInRangeSQL); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if s.updateEventJSONStmt, err = db.Prepare(updateEventJSONSQL); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *outputRoomEventsStatements) UpdateEventJSON(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error {
|
||||||
|
headeredJSON, err := json.Marshal(event)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
_, err = s.updateEventJSONStmt.ExecContext(ctx, event.EventID(), headeredJSON)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// selectStateInRange returns the state events between the two given PDU stream positions, exclusive of oldPos, inclusive of newPos.
|
// selectStateInRange returns the state events between the two given PDU stream positions, exclusive of oldPos, inclusive of newPos.
|
||||||
// Results are bucketed based on the room ID. If the same state is overwritten multiple times between the
|
// Results are bucketed based on the room ID. If the same state is overwritten multiple times between the
|
||||||
// two positions, only the most recent state is returned.
|
// two positions, only the most recent state is returned.
|
||||||
|
|
|
||||||
|
|
@ -24,6 +24,7 @@ import (
|
||||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/eduserver/cache"
|
"github.com/matrix-org/dendrite/eduserver/cache"
|
||||||
|
"github.com/matrix-org/dendrite/internal/eventutil"
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage/tables"
|
"github.com/matrix-org/dendrite/syncapi/storage/tables"
|
||||||
|
|
@ -597,6 +598,26 @@ func (d *Database) IncrementalSync(
|
||||||
return res, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *Database) RedactEvent(ctx context.Context, redactedEventID string, redactedBecause *gomatrixserverlib.HeaderedEvent) error {
|
||||||
|
redactedEvents, err := d.Events(ctx, []string{redactedEventID})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if len(redactedEvents) == 0 {
|
||||||
|
logrus.WithField("event_id", redactedEventID).WithField("redaction_event", redactedBecause.EventID()).Warnf("missing redacted event for redaction")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
eventToRedact := redactedEvents[0].Unwrap()
|
||||||
|
redactionEvent := redactedBecause.Unwrap()
|
||||||
|
ev, err := eventutil.RedactEvent(&redactionEvent, &eventToRedact)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
newEvent := ev.Headered(redactedBecause.RoomVersion)
|
||||||
|
return d.OutputEvents.UpdateEventJSON(ctx, &newEvent)
|
||||||
|
}
|
||||||
|
|
||||||
// getResponseWithPDUsForCompleteSync creates a response and adds all PDUs needed
|
// getResponseWithPDUsForCompleteSync creates a response and adds all PDUs needed
|
||||||
// to it. It returns toPos and joinedRoomIDs for use of adding EDUs.
|
// to it. It returns toPos and joinedRoomIDs for use of adding EDUs.
|
||||||
// nolint:nakedret
|
// nolint:nakedret
|
||||||
|
|
@ -619,6 +640,7 @@ func (d *Database) getResponseWithPDUsForCompleteSync(
|
||||||
}
|
}
|
||||||
var succeeded bool
|
var succeeded bool
|
||||||
defer func() {
|
defer func() {
|
||||||
|
logrus.Infof("getResponseWithPDUsForCompleteSync: limit:%d rooms:%v", numRecentEventsPerRoom, joinedRoomIDs)
|
||||||
txerr := sqlutil.EndTransaction(txn, &succeeded)
|
txerr := sqlutil.EndTransaction(txn, &succeeded)
|
||||||
if err == nil && txerr != nil {
|
if err == nil && txerr != nil {
|
||||||
err = txerr
|
err = txerr
|
||||||
|
|
@ -790,6 +812,7 @@ func (d *Database) addRoomDeltaToResponse(
|
||||||
// This is all "okay" assuming history_visibility == "shared" which it is by default.
|
// This is all "okay" assuming history_visibility == "shared" which it is by default.
|
||||||
r.To = delta.membershipPos
|
r.To = delta.membershipPos
|
||||||
}
|
}
|
||||||
|
logrus.Infof("addRoomDeltaToResponse range:%+v roomID:%s limit:%d", r, delta.roomID, numRecentEventsPerRoom)
|
||||||
recentStreamEvents, limited, err := d.OutputEvents.SelectRecentEvents(
|
recentStreamEvents, limited, err := d.OutputEvents.SelectRecentEvents(
|
||||||
ctx, txn, delta.roomID, r,
|
ctx, txn, delta.roomID, r,
|
||||||
numRecentEventsPerRoom, true, true,
|
numRecentEventsPerRoom, true, true,
|
||||||
|
|
@ -797,6 +820,7 @@ func (d *Database) addRoomDeltaToResponse(
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
logrus.Infof("addRoomDeltaToResponse produced %d events, limited:%v", len(recentStreamEvents), limited)
|
||||||
recentEvents := d.StreamEventsToEvents(device, recentStreamEvents)
|
recentEvents := d.StreamEventsToEvents(device, recentStreamEvents)
|
||||||
delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back
|
delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back
|
||||||
prevBatch, err := d.getBackwardTopologyPos(ctx, txn, recentStreamEvents)
|
prevBatch, err := d.getBackwardTopologyPos(ctx, txn, recentStreamEvents)
|
||||||
|
|
|
||||||
|
|
@ -76,6 +76,9 @@ const selectEarlyEventsSQL = "" +
|
||||||
const selectMaxEventIDSQL = "" +
|
const selectMaxEventIDSQL = "" +
|
||||||
"SELECT MAX(id) FROM syncapi_output_room_events"
|
"SELECT MAX(id) FROM syncapi_output_room_events"
|
||||||
|
|
||||||
|
const updateEventJSONSQL = "" +
|
||||||
|
"UPDATE syncapi_output_room_events SET headered_event_json=$1 WHERE event_id=$2"
|
||||||
|
|
||||||
// In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id).
|
// In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id).
|
||||||
/*
|
/*
|
||||||
$1 = oldPos,
|
$1 = oldPos,
|
||||||
|
|
@ -109,6 +112,7 @@ type outputRoomEventsStatements struct {
|
||||||
selectRecentEventsForSyncStmt *sql.Stmt
|
selectRecentEventsForSyncStmt *sql.Stmt
|
||||||
selectEarlyEventsStmt *sql.Stmt
|
selectEarlyEventsStmt *sql.Stmt
|
||||||
selectStateInRangeStmt *sql.Stmt
|
selectStateInRangeStmt *sql.Stmt
|
||||||
|
updateEventJSONStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSqliteEventsTable(db *sql.DB, streamID *streamIDStatements) (tables.Events, error) {
|
func NewSqliteEventsTable(db *sql.DB, streamID *streamIDStatements) (tables.Events, error) {
|
||||||
|
|
@ -140,9 +144,21 @@ func NewSqliteEventsTable(db *sql.DB, streamID *streamIDStatements) (tables.Even
|
||||||
if s.selectStateInRangeStmt, err = db.Prepare(selectStateInRangeSQL); err != nil {
|
if s.selectStateInRangeStmt, err = db.Prepare(selectStateInRangeSQL); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if s.updateEventJSONStmt, err = db.Prepare(updateEventJSONSQL); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *outputRoomEventsStatements) UpdateEventJSON(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error {
|
||||||
|
headeredJSON, err := json.Marshal(event)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
_, err = s.updateEventJSONStmt.ExecContext(ctx, event.EventID(), headeredJSON)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// selectStateInRange returns the state events between the two given PDU stream positions, exclusive of oldPos, inclusive of newPos.
|
// selectStateInRange returns the state events between the two given PDU stream positions, exclusive of oldPos, inclusive of newPos.
|
||||||
// Results are bucketed based on the room ID. If the same state is overwritten multiple times between the
|
// Results are bucketed based on the room ID. If the same state is overwritten multiple times between the
|
||||||
// two positions, only the most recent state is returned.
|
// two positions, only the most recent state is returned.
|
||||||
|
|
|
||||||
|
|
@ -49,6 +49,7 @@ type Events interface {
|
||||||
// SelectEarlyEvents returns the earliest events in the given room.
|
// SelectEarlyEvents returns the earliest events in the given room.
|
||||||
SelectEarlyEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, limit int) ([]types.StreamEvent, error)
|
SelectEarlyEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, limit int) ([]types.StreamEvent, error)
|
||||||
SelectEvents(ctx context.Context, txn *sql.Tx, eventIDs []string) ([]types.StreamEvent, error)
|
SelectEvents(ctx context.Context, txn *sql.Tx, eventIDs []string) ([]types.StreamEvent, error)
|
||||||
|
UpdateEventJSON(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// Topology keeps track of the depths and stream positions for all events.
|
// Topology keeps track of the depths and stream positions for all events.
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue