Try to wire in relation updates

This commit is contained in:
Neil Alexander 2022-10-11 14:11:29 +01:00
parent a5f6da72f3
commit 303155beba
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
5 changed files with 36 additions and 3 deletions

View file

@ -271,6 +271,13 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
return err return err
} }
if err = s.db.UpdateRelations(ctx, ev); err != nil {
log.WithFields(log.Fields{
"event_id": ev.EventID(),
"type": ev.Type(),
}).WithError(err).Warn("Failed to update relations")
}
s.pduStream.Advance(pduPos) s.pduStream.Advance(pduPos)
s.notifier.OnNewEvent(ev, ev.RoomID(), nil, types.StreamingToken{PDUPosition: pduPos}) s.notifier.OnNewEvent(ev, ev.RoomID(), nil, types.StreamingToken{PDUPosition: pduPos})

View file

@ -174,6 +174,7 @@ type Database interface {
StoreReceipt(ctx context.Context, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error) StoreReceipt(ctx context.Context, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error)
UpdateIgnoresForUser(ctx context.Context, userID string, ignores *types.IgnoredUsers) error UpdateIgnoresForUser(ctx context.Context, userID string, ignores *types.IgnoredUsers) error
ReIndex(ctx context.Context, limit, afterID int64) (map[int64]gomatrixserverlib.HeaderedEvent, error) ReIndex(ctx context.Context, limit, afterID int64) (map[int64]gomatrixserverlib.HeaderedEvent, error)
UpdateRelations(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error
} }
type Presence interface { type Presence interface {

View file

@ -39,7 +39,9 @@ CREATE TABLE IF NOT EXISTS syncapi_relations (
const insertRelationSQL = "" + const insertRelationSQL = "" +
"INSERT INTO syncapi_relations (" + "INSERT INTO syncapi_relations (" +
" room_id, event_id, child_event_id, rel_type" + " room_id, event_id, child_event_id, rel_type" +
") VALUES ($1, $2, $3, $4) RETURNING id" ") VALUES ($1, $2, $3, $4) " +
" ON CONFLICT syncapi_relations_unique DO UPDATE SET event_id=EXCLUDED.event_id" +
" RETURNING id"
const deleteRelationSQL = "" + const deleteRelationSQL = "" +
"DELETE FROM syncapi_relations WHERE event_id = $1" "DELETE FROM syncapi_relations WHERE event_id = $1"

View file

@ -98,6 +98,10 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions)
if err != nil { if err != nil {
return nil, err return nil, err
} }
relations, err := NewPostgresRelationsTable(d.db)
if err != nil {
return nil, err
}
// apply migrations which need multiple tables // apply migrations which need multiple tables
m := sqlutil.NewMigrator(d.db) m := sqlutil.NewMigrator(d.db)
@ -129,6 +133,7 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions)
NotificationData: notificationData, NotificationData: notificationData,
Ignores: ignores, Ignores: ignores,
Presence: presence, Presence: presence,
Relations: relations,
} }
return &d, nil return &d, nil
} }

View file

@ -53,6 +53,7 @@ type Database struct {
NotificationData tables.NotificationData NotificationData tables.NotificationData
Ignores tables.Ignores Ignores tables.Ignores
Presence tables.Presence Presence tables.Presence
Relations tables.Relations
} }
func (d *Database) NewDatabaseSnapshot(ctx context.Context) (*DatabaseTransaction, error) { func (d *Database) NewDatabaseSnapshot(ctx context.Context) (*DatabaseTransaction, error) {
@ -579,10 +580,27 @@ func (d *Database) SelectMembershipForUser(ctx context.Context, roomID, userID s
return d.Memberships.SelectMembershipForUser(ctx, nil, roomID, userID, pos) return d.Memberships.SelectMembershipForUser(ctx, nil, roomID, userID, pos)
} }
func (s *Database) ReIndex(ctx context.Context, limit, afterID int64) (map[int64]gomatrixserverlib.HeaderedEvent, error) { func (d *Database) ReIndex(ctx context.Context, limit, afterID int64) (map[int64]gomatrixserverlib.HeaderedEvent, error) {
return s.OutputEvents.ReIndex(ctx, nil, limit, afterID, []string{ return d.OutputEvents.ReIndex(ctx, nil, limit, afterID, []string{
gomatrixserverlib.MRoomName, gomatrixserverlib.MRoomName,
gomatrixserverlib.MRoomTopic, gomatrixserverlib.MRoomTopic,
"m.room.message", "m.room.message",
}) })
} }
func (d *Database) UpdateRelations(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error {
var content gomatrixserverlib.RelationContent
if err := json.Unmarshal(event.Content(), &content); err != nil {
return fmt.Errorf("json.Unmarshal: %w", err)
}
if content.Relations == nil {
return nil
}
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
_, err := d.Relations.InsertRelation(
ctx, txn, event.RoomID(), event.EventID(),
content.Relations.EventID, content.Relations.RelationType,
)
return err
})
}