From 06d4d0b32b236088eb6209b373df214dc4d82702 Mon Sep 17 00:00:00 2001 From: Till Faelligen Date: Wed, 18 May 2022 08:12:11 +0200 Subject: [PATCH] Implement /search, partitially --- syncapi/consumers/clientapi.go | 103 +++++++++-- syncapi/routing/context.go | 7 +- syncapi/routing/routing.go | 2 +- syncapi/routing/search.go | 166 +++++++++++++++--- syncapi/storage/interface.go | 1 + .../postgres/output_room_events_table.go | 27 +++ syncapi/storage/shared/syncserver.go | 12 +- .../sqlite3/output_room_events_table.go | 27 +++ syncapi/storage/tables/interface.go | 1 + syncapi/syncapi.go | 6 +- 10 files changed, 304 insertions(+), 48 deletions(-) diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go index eec369c1a..7ae9aae7a 100644 --- a/syncapi/consumers/clientapi.go +++ b/syncapi/consumers/clientapi.go @@ -19,9 +19,12 @@ import ( "database/sql" "encoding/json" "fmt" + "strings" + "time" "github.com/getsentry/sentry-go" "github.com/matrix-org/dendrite/internal/eventutil" + "github.com/matrix-org/dendrite/internal/fulltext" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" @@ -33,19 +36,23 @@ import ( "github.com/nats-io/nats.go" "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus" + "github.com/tidwall/gjson" ) // OutputClientDataConsumer consumes events that originated in the client API server. type OutputClientDataConsumer struct { - ctx context.Context - jetstream nats.JetStreamContext - durable string - topic string - db storage.Database - stream types.StreamProvider - notifier *notifier.Notifier - serverName gomatrixserverlib.ServerName - producer *producers.UserAPIReadProducer + ctx context.Context + jetstream nats.JetStreamContext + nats *nats.Conn + durable string + topic string + topicReIndex string + db storage.Database + stream types.StreamProvider + notifier *notifier.Notifier + serverName gomatrixserverlib.ServerName + producer *producers.UserAPIReadProducer + fts *fulltext.Search } // NewOutputClientDataConsumer creates a new OutputClientData consumer. Call Start() to begin consuming from room servers. @@ -53,26 +60,86 @@ func NewOutputClientDataConsumer( process *process.ProcessContext, cfg *config.SyncAPI, js nats.JetStreamContext, + nats *nats.Conn, store storage.Database, notifier *notifier.Notifier, stream types.StreamProvider, producer *producers.UserAPIReadProducer, + fts *fulltext.Search, ) *OutputClientDataConsumer { return &OutputClientDataConsumer{ - ctx: process.Context(), - jetstream: js, - topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputClientData), - durable: cfg.Matrix.JetStream.Durable("SyncAPIAccountDataConsumer"), - db: store, - notifier: notifier, - stream: stream, - serverName: cfg.Matrix.ServerName, - producer: producer, + ctx: process.Context(), + jetstream: js, + topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputClientData), + topicReIndex: cfg.Matrix.JetStream.Prefixed(jetstream.InputFulltextReindex), + durable: cfg.Matrix.JetStream.Durable("SyncAPIAccountDataConsumer"), + nats: nats, + db: store, + notifier: notifier, + stream: stream, + serverName: cfg.Matrix.ServerName, + producer: producer, + fts: fts, } } // Start consuming from room servers func (s *OutputClientDataConsumer) Start() error { + _, err := s.nats.Subscribe(s.topicReIndex, func(msg *nats.Msg) { + if err := msg.Ack(); err != nil { + return + } + ctx := context.Background() + logrus.Debugf("Starting to index events") + var offset int + start := time.Now() + count := 0 + for { + evs, err := s.db.ReIndex(ctx, 1000, int64(offset)) + if err != nil { + logrus.WithError(err).Errorf("unable to get events to index") + return + } + if len(evs) == 0 { + break + } + logrus.Debugf("Indexing %d events", len(evs)) + elements := make([]fulltext.IndexElement, 0, len(evs)) + + for _, ev := range evs { + e := fulltext.IndexElement{ + EventID: ev.EventID(), + RoomID: ev.RoomID(), + } + + switch ev.Type() { + case "m.room.message": + e.Content = gjson.GetBytes(ev.Content(), "body").String() + case gomatrixserverlib.MRoomName: + e.Content = gjson.GetBytes(ev.Content(), "name").String() + case gomatrixserverlib.MRoomTopic: + e.Content = gjson.GetBytes(ev.Content(), "topic").String() + default: + continue + } + + if strings.TrimSpace(e.Content) == "" { + continue + } + elements = append(elements, e) + } + if err = s.fts.BatchIndex(elements); err != nil { + logrus.WithError(err).Error("unable to index events") + continue + } + offset += len(evs) + count += len(elements) + } + logrus.Debugf("Indexed %d events in %v", count, time.Since(start)) + }) + if err != nil { + return err + } return jetstream.JetStreamConsumer( s.ctx, s.jetstream, s.topic, s.durable, s.onMessage, nats.DeliverAll(), nats.ManualAck(), diff --git a/syncapi/routing/context.go b/syncapi/routing/context.go index 96438e184..0d63ccde4 100644 --- a/syncapi/routing/context.go +++ b/syncapi/routing/context.go @@ -33,11 +33,11 @@ import ( type ContextRespsonse struct { End string `json:"end"` - Event gomatrixserverlib.ClientEvent `json:"event"` + Event *gomatrixserverlib.ClientEvent `json:"event,omitempty"` EventsAfter []gomatrixserverlib.ClientEvent `json:"events_after,omitempty"` EventsBefore []gomatrixserverlib.ClientEvent `json:"events_before,omitempty"` Start string `json:"start"` - State []gomatrixserverlib.ClientEvent `json:"state"` + State []gomatrixserverlib.ClientEvent `json:"state,omitempty"` } func Context( @@ -139,8 +139,9 @@ func Context( eventsAfterClient := gomatrixserverlib.HeaderedToClientEvents(eventsAfter, gomatrixserverlib.FormatAll) newState := applyLazyLoadMembers(device, filter, eventsAfterClient, eventsBeforeClient, state, lazyLoadCache) + ev := gomatrixserverlib.HeaderedToClientEvent(&requestedEvent, gomatrixserverlib.FormatAll) response := ContextRespsonse{ - Event: gomatrixserverlib.HeaderedToClientEvent(&requestedEvent, gomatrixserverlib.FormatAll), + Event: &ev, EventsAfter: eventsAfterClient, EventsBefore: eventsBeforeClient, State: gomatrixserverlib.HeaderedToClientEvents(newState, gomatrixserverlib.FormatAll), diff --git a/syncapi/routing/routing.go b/syncapi/routing/routing.go index 9ccfa5017..6b95b511f 100644 --- a/syncapi/routing/routing.go +++ b/syncapi/routing/routing.go @@ -100,7 +100,7 @@ func Setup( v3mux.Handle("/search", httputil.MakeAuthAPI("search", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { - return Search(req, device, fts) + return Search(req, device, syncDB, fts, req.FormValue("next_batch")) }), ).Methods(http.MethodPost, http.MethodOptions) } diff --git a/syncapi/routing/search.go b/syncapi/routing/search.go index ac8d0c46f..4cac3ee12 100644 --- a/syncapi/routing/search.go +++ b/syncapi/routing/search.go @@ -1,47 +1,164 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package routing import ( "net/http" + "strconv" + "github.com/blevesearch/bleve/v2/search" "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/internal/fulltext" + "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" + "github.com/sirupsen/logrus" + "github.com/tidwall/gjson" ) -func Search(req *http.Request, device *api.Device, fts *fulltext.Search) util.JSONResponse { - var searchReq SearchRequest +func Search(req *http.Request, device *api.Device, syncDB storage.Database, fts *fulltext.Search, from string) util.JSONResponse { + var ( + searchReq SearchRequest + err error + ctx = req.Context() + ) resErr := httputil.UnmarshalJSONRequest(req, &searchReq) if resErr != nil { + logrus.Error("failed to unmarshal search request") return *resErr } - result, err := fts.Search(searchReq.SearchCategories.RoomEvents.SearchTerm) + nextBatch := 0 + if from != "" { + nextBatch, err = strconv.Atoi(from) + if err != nil { + return jsonerror.InternalServerError() + } + } + + if searchReq.SearchCategories.RoomEvents.Filter.Limit == 0 { + searchReq.SearchCategories.RoomEvents.Filter.Limit = 5 + } + + // only search rooms the user is actually joined to + joinedRooms, err := syncDB.RoomIDsWithMembership(ctx, device.UserID, "join") if err != nil { return jsonerror.InternalServerError() } + rooms := joinedRooms + if searchReq.SearchCategories.RoomEvents.Filter.Rooms != nil { + rooms = append(*searchReq.SearchCategories.RoomEvents.Filter.Rooms, joinedRooms...) + } + + logrus.Debugf("Searching FTS for rooms %v - %s", rooms, searchReq.SearchCategories.RoomEvents.SearchTerm) + + orderByTime := false + if searchReq.SearchCategories.RoomEvents.OrderBy == "recent" { + logrus.Debugf("Ordering by recently added") + orderByTime = true + } + + result, err := fts.Search( + searchReq.SearchCategories.RoomEvents.SearchTerm, + rooms, + searchReq.SearchCategories.RoomEvents.Filter.Limit, + nextBatch, + orderByTime, + ) + if err != nil { + logrus.WithError(err).Error("failed to search fulltext") + return jsonerror.InternalServerError() + } + logrus.Debugf("Search took %s", result.Took) results := []Results{} - for _, x := range result.Hits { + + wantEvents := make([]string, len(result.Hits)) + eventScore := make(map[string]*search.DocumentMatch) + for _, hit := range result.Hits { + logrus.Debugf("%+v\n", hit.Fields) + wantEvents = append(wantEvents, hit.ID) + eventScore[hit.ID] = hit + } + + roomFilter := &gomatrixserverlib.RoomEventFilter{ + Rooms: &rooms, + } + + evs, err := syncDB.Events(ctx, wantEvents) + if err != nil { + logrus.WithError(err).Error("failed to get events from database") + return jsonerror.InternalServerError() + } + + for _, event := range evs { + id, _, err := syncDB.SelectContextEvent(ctx, event.RoomID(), event.EventID()) + if err != nil { + logrus.WithError(err).Error("failed to query context event") + return jsonerror.InternalServerError() + } + roomFilter.Limit = searchReq.SearchCategories.RoomEvents.EventContext.BeforeLimit + eventsBefore, err := syncDB.SelectContextBeforeEvent(ctx, id, event.RoomID(), roomFilter) + if err != nil { + logrus.WithError(err).Error("failed to query before context event") + return jsonerror.InternalServerError() + } + roomFilter.Limit = searchReq.SearchCategories.RoomEvents.EventContext.AfterLimit + _, eventsAfter, err := syncDB.SelectContextAfterEvent(ctx, id, event.RoomID(), roomFilter) + if err != nil { + logrus.WithError(err).Error("failed to query after context event") + return jsonerror.InternalServerError() + } + results = append(results, Results{ - Rank: x.Score, + Context: ContextRespsonse{ + EventsAfter: gomatrixserverlib.HeaderedToClientEvents(eventsAfter, gomatrixserverlib.FormatSync), + EventsBefore: gomatrixserverlib.HeaderedToClientEvents(eventsBefore, gomatrixserverlib.FormatSync), + }, + Rank: eventScore[event.EventID()].Score, Result: Result{ Content: Content{ - Body: x.String(), + Body: gjson.GetBytes(event.Content(), "body").Str, + Format: gjson.GetBytes(event.Content(), "format").Str, + FormattedBody: gjson.GetBytes(event.Content(), "formatted_body").Str, + Msgtype: gjson.GetBytes(event.Content(), "msgtype").Str, }, - EventID: x.ID, + EventID: event.EventID(), + OriginServerTs: event.OriginServerTS(), + RoomID: event.RoomID(), + Sender: event.Sender(), + Type: event.Type(), + Unsigned: event.Unsigned(), }, }) } + nb := "" + if int(result.Total) > nextBatch+len(results) { + nb = strconv.Itoa(len(results) + nextBatch) + } + res := SearchResponse{ SearchCategories: SearchCategories{ RoomEvents: RoomEvents{ - Count: len(results), - Groups: Groups{}, - Results: results, + Count: int(result.Total), + Groups: Groups{}, + Results: results, + NextBatch: nb, }, }, } @@ -55,6 +172,11 @@ func Search(req *http.Request, device *api.Device, fts *fulltext.Search) util.JS type SearchRequest struct { SearchCategories struct { RoomEvents struct { + EventContext struct { + AfterLimit int `json:"after_limit,omitempty"` + BeforeLimit int `json:"before_limit,omitempty"` + IncludeProfile bool `json:"include_profile,omitempty"` + } `json:"event_context"` Filter gomatrixserverlib.StateFilter `json:"filter"` Groupings struct { GroupBy []struct { @@ -88,22 +210,22 @@ type Content struct { FormattedBody string `json:"formatted_body"` Msgtype string `json:"msgtype"` } -type Unsigned struct { - Age int `json:"age"` -} + type Result struct { - Content Content `json:"content"` - EventID string `json:"event_id"` - OriginServerTs int64 `json:"origin_server_ts"` - RoomID string `json:"room_id"` - Sender string `json:"sender"` - Type string `json:"type"` - Unsigned Unsigned `json:"unsigned"` + Content Content `json:"content"` + EventID string `json:"event_id"` + OriginServerTs gomatrixserverlib.Timestamp `json:"origin_server_ts"` + RoomID string `json:"room_id"` + Sender string `json:"sender"` + Type string `json:"type"` + Unsigned []byte `json:"unsigned,omitempty"` } type Results struct { - Rank float64 `json:"rank"` - Result Result `json:"result"` + Context ContextRespsonse `json:"context"` + Rank float64 `json:"rank"` + Result Result `json:"result"` } + type RoomEvents struct { Count int `json:"count"` Groups Groups `json:"groups"` diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index 5a036d889..cd1b8ad21 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -157,6 +157,7 @@ type Database interface { IgnoresForUser(ctx context.Context, userID string) (*types.IgnoredUsers, error) UpdateIgnoresForUser(ctx context.Context, userID string, ignores *types.IgnoredUsers) error + ReIndex(ctx context.Context, limit, offset int64) ([]gomatrixserverlib.HeaderedEvent, error) } type Presence interface { diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go index d84d0cfa2..003e39e47 100644 --- a/syncapi/storage/postgres/output_room_events_table.go +++ b/syncapi/storage/postgres/output_room_events_table.go @@ -164,6 +164,8 @@ const selectContextAfterEventSQL = "" + " AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" + " ORDER BY id ASC LIMIT $3" +const selectSearchSQL = "SELECT event_id, headered_event_json FROM syncapi_output_room_events WHERE type = ANY($1) ORDER BY id ASC LIMIT $2 OFFSET $3" + type outputRoomEventsStatements struct { insertEventStmt *sql.Stmt selectEventsStmt *sql.Stmt @@ -178,6 +180,7 @@ type outputRoomEventsStatements struct { selectContextEventStmt *sql.Stmt selectContextBeforeEventStmt *sql.Stmt selectContextAfterEventStmt *sql.Stmt + selectSearchStmt *sql.Stmt } func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) { @@ -200,6 +203,7 @@ func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) { {&s.selectContextEventStmt, selectContextEventSQL}, {&s.selectContextBeforeEventStmt, selectContextBeforeEventSQL}, {&s.selectContextAfterEventStmt, selectContextAfterEventSQL}, + {&s.selectSearchStmt, selectSearchSQL}, }.Prepare(db) } @@ -617,3 +621,26 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) { } return result, rows.Err() } + +func (s *outputRoomEventsStatements) ReIndex(ctx context.Context, txn *sql.Tx, limit, offset int64, types []string) ([]gomatrixserverlib.HeaderedEvent, error) { + rows, err := sqlutil.TxStmt(txn, s.selectSearchStmt).QueryContext(ctx, pq.StringArray(types), limit, offset) + if err != nil { + return nil, err + } + defer internal.CloseAndLogIfError(ctx, rows, "rows.close() failed") + + var result []gomatrixserverlib.HeaderedEvent + var eventID string + for rows.Next() { + var ev gomatrixserverlib.HeaderedEvent + var eventBytes []byte + if err = rows.Scan(&eventID, &eventBytes); err != nil { + return nil, err + } + if err = ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil { + return nil, err + } + result = append(result, ev) + } + return result, rows.Err() +} diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index ab8e36f04..c627bdae2 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -19,6 +19,7 @@ import ( "database/sql" "encoding/json" "fmt" + "time" "github.com/matrix-org/dendrite/internal/fulltext" userapi "github.com/matrix-org/dendrite/userapi/api" @@ -396,8 +397,8 @@ func (d *Database) WriteEvent( e := fulltext.IndexElement{ EventID: ev.EventID(), - Type: ev.Type(), RoomID: ev.RoomID(), + Time: time.Now(), } switch ev.Type() { @@ -413,6 +414,7 @@ func (d *Database) WriteEvent( } } if e.Content != "" { + logrus.Debugf("Indexing element: %+v", e) if err := d.FTS.Index(e); err != nil { logrus.WithError(err).Warn("failed to write to fulltext index") } @@ -1079,6 +1081,14 @@ func (s *Database) UpdateIgnoresForUser(ctx context.Context, userID string, igno return s.Ignores.UpsertIgnores(ctx, userID, ignores) } +func (s *Database) ReIndex(ctx context.Context, limit, offset int64) ([]gomatrixserverlib.HeaderedEvent, error) { + return s.OutputEvents.ReIndex(ctx, nil, limit, offset, []string{ + gomatrixserverlib.MRoomName, + gomatrixserverlib.MRoomTopic, + "m.room.message", + }) +} + func (s *Database) UpdatePresence(ctx context.Context, userID string, presence types.Presence, statusMsg *string, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (types.StreamPosition, error) { return s.Presence.UpsertPresence(ctx, nil, userID, statusMsg, presence, lastActiveTS, fromSync) } diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go index f9961a9d1..ee12719d5 100644 --- a/syncapi/storage/sqlite3/output_room_events_table.go +++ b/syncapi/storage/sqlite3/output_room_events_table.go @@ -114,6 +114,8 @@ const selectContextAfterEventSQL = "" + // WHEN, ORDER BY and LIMIT are appended by prepareWithFilters +const selectSearchSQL = "SELECT event_id, headered_event_json FROM syncapi_output_room_events WHERE type = ANY($1) LIMIT $2 OFFSET $3 ORDER BY id ASC" + type outputRoomEventsStatements struct { db *sql.DB streamIDStatements *StreamIDStatements @@ -124,6 +126,7 @@ type outputRoomEventsStatements struct { selectContextEventStmt *sql.Stmt selectContextBeforeEventStmt *sql.Stmt selectContextAfterEventStmt *sql.Stmt + selectSearchStmt *sql.Stmt } func NewSqliteEventsTable(db *sql.DB, streamID *StreamIDStatements) (tables.Events, error) { @@ -143,6 +146,7 @@ func NewSqliteEventsTable(db *sql.DB, streamID *StreamIDStatements) (tables.Even {&s.selectContextEventStmt, selectContextEventSQL}, {&s.selectContextBeforeEventStmt, selectContextBeforeEventSQL}, {&s.selectContextAfterEventStmt, selectContextAfterEventSQL}, + {&s.selectSearchStmt, selectSearchSQL}, }.Prepare(db) } @@ -613,3 +617,26 @@ func unmarshalStateIDs(addIDsJSON, delIDsJSON string) (addIDs []string, delIDs [ } return } + +func (s *outputRoomEventsStatements) ReIndex(ctx context.Context, txn *sql.Tx, limit, offset int64, types []string) ([]gomatrixserverlib.HeaderedEvent, error) { + rows, err := sqlutil.TxStmt(txn, s.selectSearchStmt).QueryContext(ctx, types, limit, offset) + if err != nil { + return nil, err + } + defer internal.CloseAndLogIfError(ctx, rows, "rows.close() failed") + + var result []gomatrixserverlib.HeaderedEvent + var eventID string + for rows.Next() { + var ev gomatrixserverlib.HeaderedEvent + var eventBytes []byte + if err = rows.Scan(&eventID, &eventBytes); err != nil { + return nil, err + } + if err = ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil { + return nil, err + } + result = append(result, ev) + } + return result, rows.Err() +} diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index ccdebfdbd..c4f657149 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -67,6 +67,7 @@ type Events interface { SelectContextEvent(ctx context.Context, txn *sql.Tx, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error) SelectContextBeforeEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) ([]*gomatrixserverlib.HeaderedEvent, error) SelectContextAfterEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error) + ReIndex(ctx context.Context, txn *sql.Tx, limit, offset int64, types []string) ([]gomatrixserverlib.HeaderedEvent, error) } // Topology keeps track of the depths and stream positions for all events. diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index a34450911..8587ada05 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -48,7 +48,7 @@ func AddPublicRoutes( js, natsClient := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) - fts, err := fulltext.New("./fts.bleve") + fts, err := fulltext.New("/work/fts.bleve") if err != nil { logrus.WithError(err).Panicf("failed to create full text") } @@ -101,8 +101,8 @@ func AddPublicRoutes( } clientConsumer := consumers.NewOutputClientDataConsumer( - base.ProcessContext, cfg, js, syncDB, notifier, streams.AccountDataStreamProvider, - userAPIReadUpdateProducer, + base.ProcessContext, cfg, js, natsClient, syncDB, notifier, streams.AccountDataStreamProvider, + userAPIReadUpdateProducer, fts, ) if err = clientConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start client data consumer")