diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index 5b3a2c027..cfbb05327 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -155,6 +155,7 @@ func (s *OutputRoomEventConsumer) onRedactEvent( "event_id": msg.RedactedBecause.EventID(), "redacted_event_id": msg.RedactedEventID, }).WithError(err).Warn("Failed to redact relations") + return err } // fake a room event so we notify clients about the redaction, as if it were @@ -285,6 +286,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent( "event_id": ev.EventID(), "type": ev.Type(), }).WithError(err).Warn("Failed to update relations") + return err } s.pduStream.Advance(pduPos) @@ -337,6 +339,7 @@ func (s *OutputRoomEventConsumer) onOldRoomEvent( "event_id": ev.EventID(), "type": ev.Type(), }).WithError(err).Warn("Failed to update relations") + return err } if pduPos, err = s.notifyJoinedPeeks(ctx, ev, pduPos); err != nil { diff --git a/syncapi/storage/shared/storage_consumer.go b/syncapi/storage/shared/storage_consumer.go index d97ae667b..8292e81f7 100644 --- a/syncapi/storage/shared/storage_consumer.go +++ b/syncapi/storage/shared/storage_consumer.go @@ -604,14 +604,10 @@ func (d *Database) UpdateRelations(ctx context.Context, event *gomatrixserverlib return nil default: return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { - var err error - _, err = d.Relations.InsertRelation( + _, err := d.Relations.InsertRelation( ctx, txn, event.RoomID(), content.Relations.EventID, event.EventID(), content.Relations.RelationType, ) - if err != nil { - logrus.WithError(err).Errorf("Failed to update relations for room %s when processing event %s", event.RoomID(), event.EventID()) - } return err }) } diff --git a/syncapi/storage/sqlite3/relations_table.go b/syncapi/storage/sqlite3/relations_table.go index 991f7e471..2aac4d0c7 100644 --- a/syncapi/storage/sqlite3/relations_table.go +++ b/syncapi/storage/sqlite3/relations_table.go @@ -97,14 +97,13 @@ func NewSqliteRelationsTable(db *sql.DB, streamID *StreamIDStatements) (tables.R func (s *relationsStatements) InsertRelation( ctx context.Context, txn *sql.Tx, roomID, eventID, childEventID, relType string, ) (streamPos types.StreamPosition, err error) { - pos, err := s.streamIDStatements.nextRelationID(ctx, txn) - if err != nil { + if streamPos, err = s.streamIDStatements.nextRelationID(ctx, txn); err != nil { return } - _, err = sqlutil.TxStmt(txn, s.insertRelationStmt).ExecContext( - ctx, pos, roomID, eventID, childEventID, relType, - ) - return pos, err + err = sqlutil.TxStmt(txn, s.insertRelationStmt).QueryRowContext( + ctx, streamPos, roomID, eventID, childEventID, relType, + ).Scan(&streamPos) + return } func (s *relationsStatements) DeleteRelation(