diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index c7a11dbb4..37cf09af1 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -271,6 +271,13 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent( return err } + if err = s.db.UpdateRelations(ctx, ev); err != nil { + log.WithFields(log.Fields{ + "event_id": ev.EventID(), + "type": ev.Type(), + }).WithError(err).Warn("Failed to update relations") + } + s.pduStream.Advance(pduPos) s.notifier.OnNewEvent(ev, ev.RoomID(), nil, types.StreamingToken{PDUPosition: pduPos}) diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index 4a03aca74..9a9bf16ad 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -174,6 +174,7 @@ type Database interface { StoreReceipt(ctx context.Context, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error) UpdateIgnoresForUser(ctx context.Context, userID string, ignores *types.IgnoredUsers) error ReIndex(ctx context.Context, limit, afterID int64) (map[int64]gomatrixserverlib.HeaderedEvent, error) + UpdateRelations(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error } type Presence interface { diff --git a/syncapi/storage/postgres/relations_table.go b/syncapi/storage/postgres/relations_table.go index acc156e91..bf125b342 100644 --- a/syncapi/storage/postgres/relations_table.go +++ b/syncapi/storage/postgres/relations_table.go @@ -39,7 +39,9 @@ CREATE TABLE IF NOT EXISTS syncapi_relations ( const insertRelationSQL = "" + "INSERT INTO syncapi_relations (" + " room_id, event_id, child_event_id, rel_type" + - ") VALUES ($1, $2, $3, $4) RETURNING id" + ") VALUES ($1, $2, $3, $4) " + + " ON CONFLICT syncapi_relations_unique DO UPDATE SET event_id=EXCLUDED.event_id" + + " RETURNING id" const deleteRelationSQL = "" + "DELETE FROM syncapi_relations WHERE event_id = $1" diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go index 979ff6647..850d24a07 100644 --- a/syncapi/storage/postgres/syncserver.go +++ b/syncapi/storage/postgres/syncserver.go @@ -98,6 +98,10 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) if err != nil { return nil, err } + relations, err := NewPostgresRelationsTable(d.db) + if err != nil { + return nil, err + } // apply migrations which need multiple tables m := sqlutil.NewMigrator(d.db) @@ -129,6 +133,7 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) NotificationData: notificationData, Ignores: ignores, Presence: presence, + Relations: relations, } return &d, nil } diff --git a/syncapi/storage/shared/storage_consumer.go b/syncapi/storage/shared/storage_consumer.go index 937ced3a2..9b886c876 100644 --- a/syncapi/storage/shared/storage_consumer.go +++ b/syncapi/storage/shared/storage_consumer.go @@ -53,6 +53,7 @@ type Database struct { NotificationData tables.NotificationData Ignores tables.Ignores Presence tables.Presence + Relations tables.Relations } func (d *Database) NewDatabaseSnapshot(ctx context.Context) (*DatabaseTransaction, error) { @@ -579,10 +580,27 @@ func (d *Database) SelectMembershipForUser(ctx context.Context, roomID, userID s return d.Memberships.SelectMembershipForUser(ctx, nil, roomID, userID, pos) } -func (s *Database) ReIndex(ctx context.Context, limit, afterID int64) (map[int64]gomatrixserverlib.HeaderedEvent, error) { - return s.OutputEvents.ReIndex(ctx, nil, limit, afterID, []string{ +func (d *Database) ReIndex(ctx context.Context, limit, afterID int64) (map[int64]gomatrixserverlib.HeaderedEvent, error) { + return d.OutputEvents.ReIndex(ctx, nil, limit, afterID, []string{ gomatrixserverlib.MRoomName, gomatrixserverlib.MRoomTopic, "m.room.message", }) } + +func (d *Database) UpdateRelations(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error { + var content gomatrixserverlib.RelationContent + if err := json.Unmarshal(event.Content(), &content); err != nil { + return fmt.Errorf("json.Unmarshal: %w", err) + } + if content.Relations == nil { + return nil + } + return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { + _, err := d.Relations.InsertRelation( + ctx, txn, event.RoomID(), event.EventID(), + content.Relations.EventID, content.Relations.RelationType, + ) + return err + }) +}