From e0703522932ab4bf41b23d5229c7a4540b59d830 Mon Sep 17 00:00:00 2001 From: Till <2353100+S7evinK@users.noreply.github.com> Date: Wed, 5 Oct 2022 11:14:33 +0200 Subject: [PATCH 1/5] Side effect import bleve analyzer languages (#2763) ... to actually allow different languages. Fixes #2761 Binary size increases by ~1MB. --- internal/fulltext/bleve.go | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/internal/fulltext/bleve.go b/internal/fulltext/bleve.go index da8932f5c..7187861dd 100644 --- a/internal/fulltext/bleve.go +++ b/internal/fulltext/bleve.go @@ -21,6 +21,29 @@ import ( "strings" "github.com/blevesearch/bleve/v2" + + // side effect imports to allow all possible languages + _ "github.com/blevesearch/bleve/v2/analysis/lang/ar" + _ "github.com/blevesearch/bleve/v2/analysis/lang/cjk" + _ "github.com/blevesearch/bleve/v2/analysis/lang/ckb" + _ "github.com/blevesearch/bleve/v2/analysis/lang/da" + _ "github.com/blevesearch/bleve/v2/analysis/lang/de" + _ "github.com/blevesearch/bleve/v2/analysis/lang/en" + _ "github.com/blevesearch/bleve/v2/analysis/lang/es" + _ "github.com/blevesearch/bleve/v2/analysis/lang/fa" + _ "github.com/blevesearch/bleve/v2/analysis/lang/fi" + _ "github.com/blevesearch/bleve/v2/analysis/lang/fr" + _ "github.com/blevesearch/bleve/v2/analysis/lang/hi" + _ "github.com/blevesearch/bleve/v2/analysis/lang/hr" + _ "github.com/blevesearch/bleve/v2/analysis/lang/hu" + _ "github.com/blevesearch/bleve/v2/analysis/lang/it" + _ "github.com/blevesearch/bleve/v2/analysis/lang/nl" + _ "github.com/blevesearch/bleve/v2/analysis/lang/no" + _ "github.com/blevesearch/bleve/v2/analysis/lang/pt" + _ "github.com/blevesearch/bleve/v2/analysis/lang/ro" + _ "github.com/blevesearch/bleve/v2/analysis/lang/ru" + _ "github.com/blevesearch/bleve/v2/analysis/lang/sv" + _ "github.com/blevesearch/bleve/v2/analysis/lang/tr" "github.com/blevesearch/bleve/v2/mapping" "github.com/matrix-org/gomatrixserverlib" From ebd137cf6b2fbd767625dc5289b0bef6d1e51971 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 5 Oct 2022 11:07:17 +0100 Subject: [PATCH 2/5] Check PostgreSQL connection count (#2760) This PR queries PostgreSQL for the `max_connections` and `superuser_reserved_connections` settings and then ensures that Dendrite's `max_open_conns` doesn't exceed the allowed value. This is a really common source of configuration problems and can either result in blocking queries or deadlocks, so it seems reasonable that we complain as loudly as possible when it happens. --- internal/sqlutil/sqlutil.go | 39 +++++++++++++++++++++++++++++++------ 1 file changed, 33 insertions(+), 6 deletions(-) diff --git a/internal/sqlutil/sqlutil.go b/internal/sqlutil/sqlutil.go index 0cdae6d30..789bceeac 100644 --- a/internal/sqlutil/sqlutil.go +++ b/internal/sqlutil/sqlutil.go @@ -2,6 +2,7 @@ package sqlutil import ( "database/sql" + "flag" "fmt" "regexp" @@ -9,6 +10,8 @@ import ( "github.com/sirupsen/logrus" ) +var skipSanityChecks = flag.Bool("skip-db-sanity", false, "Ignore sanity checks on the database connections (NOT RECOMMENDED!)") + // Open opens a database specified by its database driver name and a driver-specific data source name, // usually consisting of at least a database name and connection information. Includes tracing driver // if DENDRITE_TRACE_SQL=1 @@ -37,15 +40,39 @@ func Open(dbProperties *config.DatabaseOptions, writer Writer) (*sql.DB, error) return nil, err } if driverName != "sqlite3" { - logrus.WithFields(logrus.Fields{ - "MaxOpenConns": dbProperties.MaxOpenConns(), - "MaxIdleConns": dbProperties.MaxIdleConns(), - "ConnMaxLifetime": dbProperties.ConnMaxLifetime(), - "dataSourceName": regexp.MustCompile(`://[^@]*@`).ReplaceAllLiteralString(dsn, "://"), - }).Debug("Setting DB connection limits") + logger := logrus.WithFields(logrus.Fields{ + "max_open_conns": dbProperties.MaxOpenConns(), + "max_idle_conns": dbProperties.MaxIdleConns(), + "conn_max_lifetime": dbProperties.ConnMaxLifetime(), + "data_source_name": regexp.MustCompile(`://[^@]*@`).ReplaceAllLiteralString(dsn, "://"), + }) + logger.Debug("Setting DB connection limits") db.SetMaxOpenConns(dbProperties.MaxOpenConns()) db.SetMaxIdleConns(dbProperties.MaxIdleConns()) db.SetConnMaxLifetime(dbProperties.ConnMaxLifetime()) + + if !*skipSanityChecks { + if dbProperties.MaxOpenConns() == 0 { + logrus.Warnf("WARNING: Configuring 'max_open_conns' to be unlimited is not recommended. This can result in bad performance or deadlocks.") + } + + switch driverName { + case "postgres": + // Perform a quick sanity check if possible that we aren't trying to use more database + // connections than PostgreSQL is willing to give us. + var max, reserved int + if err := db.QueryRow("SELECT setting::integer FROM pg_settings WHERE name='max_connections';").Scan(&max); err != nil { + return nil, fmt.Errorf("failed to find maximum connections: %w", err) + } + if err := db.QueryRow("SELECT setting::integer FROM pg_settings WHERE name='superuser_reserved_connections';").Scan(&reserved); err != nil { + return nil, fmt.Errorf("failed to find reserved connections: %w", err) + } + if configured, allowed := dbProperties.MaxOpenConns(), max-reserved; configured > allowed { + logrus.Errorf("ERROR: The configured 'max_open_conns' is greater than the %d non-superuser connections that PostgreSQL is configured to allow. This can result in bad performance or deadlocks. Please pay close attention to your configured database connection counts. If you REALLY know what you are doing and want to override this error, pass the --skip-db-sanity option to Dendrite.", allowed) + return nil, fmt.Errorf("database sanity checks failed") + } + } + } } return db, nil } From 8c0c3441d88a612ca8e9ba4f83c6ff29ca73f5d0 Mon Sep 17 00:00:00 2001 From: Till <2353100+S7evinK@users.noreply.github.com> Date: Wed, 5 Oct 2022 12:12:42 +0200 Subject: [PATCH 3/5] Add `RoomEventType` nats.Header to avoid unneeded unmarshalling (#2765) --- appservice/consumers/roomserver.go | 5 +++++ federationapi/consumers/roomserver.go | 7 +++++++ roomserver/producers/roomevent.go | 13 ++++++------- setup/jetstream/streams.go | 7 ++++--- test/testrig/jetstream.go | 9 ++++----- userapi/consumers/roomserver.go | 7 ++++--- 6 files changed, 30 insertions(+), 18 deletions(-) diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go index d44f32b38..ac68f4bd4 100644 --- a/appservice/consumers/roomserver.go +++ b/appservice/consumers/roomserver.go @@ -101,6 +101,11 @@ func (s *OutputRoomEventConsumer) onMessage( log.WithField("appservice", state.ID).Tracef("Appservice worker received %d message(s) from roomserver", len(msgs)) events := make([]*gomatrixserverlib.HeaderedEvent, 0, len(msgs)) for _, msg := range msgs { + // Only handle events we care about + receivedType := api.OutputType(msg.Header.Get(jetstream.RoomEventType)) + if receivedType != api.OutputTypeNewRoomEvent && receivedType != api.OutputTypeNewInviteEvent { + continue + } // Parse out the event JSON var output api.OutputEvent if err := json.Unmarshal(msg.Data, &output); err != nil { diff --git a/federationapi/consumers/roomserver.go b/federationapi/consumers/roomserver.go index 349b50b05..a42733628 100644 --- a/federationapi/consumers/roomserver.go +++ b/federationapi/consumers/roomserver.go @@ -79,6 +79,13 @@ func (s *OutputRoomEventConsumer) Start() error { // realises that it cannot update the room state using the deltas. func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool { msg := msgs[0] // Guaranteed to exist if onMessage is called + receivedType := api.OutputType(msg.Header.Get(jetstream.RoomEventType)) + + // Only handle events we care about + if receivedType != api.OutputTypeNewRoomEvent && receivedType != api.OutputTypeNewInboundPeek { + return true + } + // Parse out the event JSON var output api.OutputEvent if err := json.Unmarshal(msg.Data, &output); err != nil { diff --git a/roomserver/producers/roomevent.go b/roomserver/producers/roomevent.go index 987e6c942..9c4521986 100644 --- a/roomserver/producers/roomevent.go +++ b/roomserver/producers/roomevent.go @@ -17,12 +17,13 @@ package producers import ( "encoding/json" - "github.com/matrix-org/dendrite/roomserver/acls" - "github.com/matrix-org/dendrite/roomserver/api" - "github.com/matrix-org/dendrite/setup/jetstream" "github.com/nats-io/nats.go" log "github.com/sirupsen/logrus" "github.com/tidwall/gjson" + + "github.com/matrix-org/dendrite/roomserver/acls" + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/setup/jetstream" ) var keyContentFields = map[string]string{ @@ -40,10 +41,8 @@ type RoomEventProducer struct { func (r *RoomEventProducer) ProduceRoomEvents(roomID string, updates []api.OutputEvent) error { var err error for _, update := range updates { - msg := &nats.Msg{ - Subject: r.Topic, - Header: nats.Header{}, - } + msg := nats.NewMsg(r.Topic) + msg.Header.Set(jetstream.RoomEventType, string(update.Type)) msg.Header.Set(jetstream.RoomID, roomID) msg.Data, err = json.Marshal(update) if err != nil { diff --git a/setup/jetstream/streams.go b/setup/jetstream/streams.go index ee9810dae..590f0cbd9 100644 --- a/setup/jetstream/streams.go +++ b/setup/jetstream/streams.go @@ -9,9 +9,10 @@ import ( ) const ( - UserID = "user_id" - RoomID = "room_id" - EventID = "event_id" + UserID = "user_id" + RoomID = "room_id" + EventID = "event_id" + RoomEventType = "output_room_event_type" ) var ( diff --git a/test/testrig/jetstream.go b/test/testrig/jetstream.go index 74cf95062..b880eea43 100644 --- a/test/testrig/jetstream.go +++ b/test/testrig/jetstream.go @@ -4,10 +4,11 @@ import ( "encoding/json" "testing" + "github.com/nats-io/nats.go" + "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/jetstream" - "github.com/nats-io/nats.go" ) func MustPublishMsgs(t *testing.T, jsctx nats.JetStreamContext, msgs ...*nats.Msg) { @@ -21,10 +22,8 @@ func MustPublishMsgs(t *testing.T, jsctx nats.JetStreamContext, msgs ...*nats.Ms func NewOutputEventMsg(t *testing.T, base *base.BaseDendrite, roomID string, update api.OutputEvent) *nats.Msg { t.Helper() - msg := &nats.Msg{ - Subject: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent), - Header: nats.Header{}, - } + msg := nats.NewMsg(base.Cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent)) + msg.Header.Set(jetstream.RoomEventType, string(update.Type)) msg.Header.Set(jetstream.RoomID, roomID) var err error msg.Data, err = json.Marshal(update) diff --git a/userapi/consumers/roomserver.go b/userapi/consumers/roomserver.go index 952de98f7..a12876946 100644 --- a/userapi/consumers/roomserver.go +++ b/userapi/consumers/roomserver.go @@ -72,15 +72,16 @@ func (s *OutputRoomEventConsumer) Start() error { func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool { msg := msgs[0] // Guaranteed to exist if onMessage is called + // Only handle events we care about + if rsapi.OutputType(msg.Header.Get(jetstream.RoomEventType)) != rsapi.OutputTypeNewRoomEvent { + return true + } var output rsapi.OutputEvent if err := json.Unmarshal(msg.Data, &output); err != nil { // If the message was invalid, log it and move on to the next message in the stream log.WithError(err).Errorf("roomserver output log: message parse failure") return true } - if output.Type != rsapi.OutputTypeNewRoomEvent { - return true - } event := output.NewRoomEvent.Event if event == nil { log.Errorf("userapi consumer: expected event") From 6f602bb0969fac6d455de1088bf3980f77a4017f Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 5 Oct 2022 11:16:05 +0100 Subject: [PATCH 4/5] Demote `Failed to query device keys for some users` warning to `level=debug` Many of these warnings are due to dead servers and are quite annoying when they fill up the logs. --- keyserver/internal/device_list_update.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/keyserver/internal/device_list_update.go b/keyserver/internal/device_list_update.go index fcfcd092d..8b02f3d6c 100644 --- a/keyserver/internal/device_list_update.go +++ b/keyserver/internal/device_list_update.go @@ -424,7 +424,7 @@ func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerNam "succeeded": successCount, "failed": len(userIDs) - successCount, "wait_time": waitTime, - }).Warn("Failed to query device keys for some users") + }).Debug("Failed to query device keys for some users") } return waitTime, !allUsersSucceeded } From c85bc3434fc930423e9e27c8bca9b31b5ad7a441 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 5 Oct 2022 12:47:53 +0100 Subject: [PATCH 5/5] Optimise `QuerySharedUsers` so that we can only work on local users (#2766) Otherwise the sync API key change consumer wastes a lot of time trying to wake up the notifiers for non-local users. --- roomserver/api/query.go | 1 + roomserver/internal/query/query.go | 2 +- roomserver/storage/interface.go | 2 +- .../storage/postgres/membership_table.go | 17 +++++++++----- roomserver/storage/shared/storage.go | 4 ++-- .../storage/sqlite3/membership_table.go | 23 +++++++++++-------- roomserver/storage/tables/interface.go | 2 +- .../storage/tables/membership_table_test.go | 2 +- syncapi/consumers/keychange.go | 6 +++-- 9 files changed, 36 insertions(+), 23 deletions(-) diff --git a/roomserver/api/query.go b/roomserver/api/query.go index aa7dc4735..d63c24785 100644 --- a/roomserver/api/query.go +++ b/roomserver/api/query.go @@ -278,6 +278,7 @@ type QuerySharedUsersRequest struct { OtherUserIDs []string ExcludeRoomIDs []string IncludeRoomIDs []string + LocalOnly bool } type QuerySharedUsersResponse struct { diff --git a/roomserver/internal/query/query.go b/roomserver/internal/query/query.go index b41a92e94..ee8e1cfe7 100644 --- a/roomserver/internal/query/query.go +++ b/roomserver/internal/query/query.go @@ -799,7 +799,7 @@ func (r *Queryer) QuerySharedUsers(ctx context.Context, req *api.QuerySharedUser } roomIDs = roomIDs[:j] - users, err := r.DB.JoinedUsersSetInRooms(ctx, roomIDs, req.OtherUserIDs) + users, err := r.DB.JoinedUsersSetInRooms(ctx, roomIDs, req.OtherUserIDs, req.LocalOnly) if err != nil { return err } diff --git a/roomserver/storage/interface.go b/roomserver/storage/interface.go index 43e8da7bb..11e175f55 100644 --- a/roomserver/storage/interface.go +++ b/roomserver/storage/interface.go @@ -157,7 +157,7 @@ type Database interface { // If a tuple has the StateKey of '*' and allowWildcards=true then all state events with the EventType should be returned. GetBulkStateContent(ctx context.Context, roomIDs []string, tuples []gomatrixserverlib.StateKeyTuple, allowWildcards bool) ([]tables.StrippedEvent, error) // JoinedUsersSetInRooms returns how many times each of the given users appears across the given rooms. - JoinedUsersSetInRooms(ctx context.Context, roomIDs, userIDs []string) (map[string]int, error) + JoinedUsersSetInRooms(ctx context.Context, roomIDs, userIDs []string, localOnly bool) (map[string]int, error) // GetLocalServerInRoom returns true if we think we're in a given room or false otherwise. GetLocalServerInRoom(ctx context.Context, roomNID types.RoomNID) (bool, error) // GetServerInRoom returns true if we think a server is in a given room or false otherwise. diff --git a/roomserver/storage/postgres/membership_table.go b/roomserver/storage/postgres/membership_table.go index bd3fd5592..0150534e1 100644 --- a/roomserver/storage/postgres/membership_table.go +++ b/roomserver/storage/postgres/membership_table.go @@ -68,14 +68,18 @@ CREATE TABLE IF NOT EXISTS roomserver_membership ( var selectJoinedUsersSetForRoomsAndUserSQL = "" + "SELECT target_nid, COUNT(room_nid) FROM roomserver_membership" + - " WHERE room_nid = ANY($1) AND target_nid = ANY($2) AND" + - " membership_nid = " + fmt.Sprintf("%d", tables.MembershipStateJoin) + " and forgotten = false" + + " WHERE (target_local OR $1 = false)" + + " AND room_nid = ANY($2) AND target_nid = ANY($3)" + + " AND membership_nid = " + fmt.Sprintf("%d", tables.MembershipStateJoin) + + " AND forgotten = false" + " GROUP BY target_nid" var selectJoinedUsersSetForRoomsSQL = "" + "SELECT target_nid, COUNT(room_nid) FROM roomserver_membership" + - " WHERE room_nid = ANY($1) AND" + - " membership_nid = " + fmt.Sprintf("%d", tables.MembershipStateJoin) + " and forgotten = false" + + " WHERE (target_local OR $1 = false) " + + " AND room_nid = ANY($2)" + + " AND membership_nid = " + fmt.Sprintf("%d", tables.MembershipStateJoin) + + " AND forgotten = false" + " GROUP BY target_nid" // Insert a row in to membership table so that it can be locked by the @@ -334,6 +338,7 @@ func (s *membershipStatements) SelectJoinedUsersSetForRooms( ctx context.Context, txn *sql.Tx, roomNIDs []types.RoomNID, userNIDs []types.EventStateKeyNID, + localOnly bool, ) (map[types.EventStateKeyNID]int, error) { var ( rows *sql.Rows @@ -342,9 +347,9 @@ func (s *membershipStatements) SelectJoinedUsersSetForRooms( stmt := sqlutil.TxStmt(txn, s.selectJoinedUsersSetForRoomsStmt) if len(userNIDs) > 0 { stmt = sqlutil.TxStmt(txn, s.selectJoinedUsersSetForRoomsAndUserStmt) - rows, err = stmt.QueryContext(ctx, pq.Array(roomNIDs), pq.Array(userNIDs)) + rows, err = stmt.QueryContext(ctx, localOnly, pq.Array(roomNIDs), pq.Array(userNIDs)) } else { - rows, err = stmt.QueryContext(ctx, pq.Array(roomNIDs)) + rows, err = stmt.QueryContext(ctx, localOnly, pq.Array(roomNIDs)) } if err != nil { diff --git a/roomserver/storage/shared/storage.go b/roomserver/storage/shared/storage.go index 593abbea1..d83a1ff74 100644 --- a/roomserver/storage/shared/storage.go +++ b/roomserver/storage/shared/storage.go @@ -1280,7 +1280,7 @@ func (d *Database) GetBulkStateContent(ctx context.Context, roomIDs []string, tu } // JoinedUsersSetInRooms returns a map of how many times the given users appear in the specified rooms. -func (d *Database) JoinedUsersSetInRooms(ctx context.Context, roomIDs, userIDs []string) (map[string]int, error) { +func (d *Database) JoinedUsersSetInRooms(ctx context.Context, roomIDs, userIDs []string, localOnly bool) (map[string]int, error) { roomNIDs, err := d.RoomsTable.BulkSelectRoomNIDs(ctx, nil, roomIDs) if err != nil { return nil, err @@ -1295,7 +1295,7 @@ func (d *Database) JoinedUsersSetInRooms(ctx context.Context, roomIDs, userIDs [ userNIDs = append(userNIDs, nid) nidToUserID[nid] = id } - userNIDToCount, err := d.MembershipTable.SelectJoinedUsersSetForRooms(ctx, nil, roomNIDs, userNIDs) + userNIDToCount, err := d.MembershipTable.SelectJoinedUsersSetForRooms(ctx, nil, roomNIDs, userNIDs, localOnly) if err != nil { return nil, err } diff --git a/roomserver/storage/sqlite3/membership_table.go b/roomserver/storage/sqlite3/membership_table.go index f3303eb0e..cd149f0ed 100644 --- a/roomserver/storage/sqlite3/membership_table.go +++ b/roomserver/storage/sqlite3/membership_table.go @@ -44,14 +44,18 @@ const membershipSchema = ` var selectJoinedUsersSetForRoomsAndUserSQL = "" + "SELECT target_nid, COUNT(room_nid) FROM roomserver_membership" + - " WHERE room_nid IN ($1) AND target_nid IN ($2) AND" + - " membership_nid = " + fmt.Sprintf("%d", tables.MembershipStateJoin) + " and forgotten = false" + + " WHERE (target_local OR $1 = false)" + + " AND room_nid IN ($2) AND target_nid IN ($3)" + + " AND membership_nid = " + fmt.Sprintf("%d", tables.MembershipStateJoin) + + " AND forgotten = false" + " GROUP BY target_nid" var selectJoinedUsersSetForRoomsSQL = "" + "SELECT target_nid, COUNT(room_nid) FROM roomserver_membership" + - " WHERE room_nid IN ($1) AND " + - " membership_nid = " + fmt.Sprintf("%d", tables.MembershipStateJoin) + " and forgotten = false" + + " WHERE (target_local OR $1 = false)" + + " AND room_nid IN ($2)" + + " AND membership_nid = " + fmt.Sprintf("%d", tables.MembershipStateJoin) + + " AND forgotten = false" + " GROUP BY target_nid" // Insert a row in to membership table so that it can be locked by the @@ -305,8 +309,9 @@ func (s *membershipStatements) SelectRoomsWithMembership( return roomNIDs, nil } -func (s *membershipStatements) SelectJoinedUsersSetForRooms(ctx context.Context, txn *sql.Tx, roomNIDs []types.RoomNID, userNIDs []types.EventStateKeyNID) (map[types.EventStateKeyNID]int, error) { - params := make([]interface{}, 0, len(roomNIDs)+len(userNIDs)) +func (s *membershipStatements) SelectJoinedUsersSetForRooms(ctx context.Context, txn *sql.Tx, roomNIDs []types.RoomNID, userNIDs []types.EventStateKeyNID, localOnly bool) (map[types.EventStateKeyNID]int, error) { + params := make([]interface{}, 0, 1+len(roomNIDs)+len(userNIDs)) + params = append(params, localOnly) for _, v := range roomNIDs { params = append(params, v) } @@ -314,10 +319,10 @@ func (s *membershipStatements) SelectJoinedUsersSetForRooms(ctx context.Context, params = append(params, v) } - query := strings.Replace(selectJoinedUsersSetForRoomsSQL, "($1)", sqlutil.QueryVariadic(len(roomNIDs)), 1) + query := strings.Replace(selectJoinedUsersSetForRoomsSQL, "($2)", sqlutil.QueryVariadicOffset(len(roomNIDs), 1), 1) if len(userNIDs) > 0 { - query = strings.Replace(selectJoinedUsersSetForRoomsAndUserSQL, "($1)", sqlutil.QueryVariadic(len(roomNIDs)), 1) - query = strings.Replace(query, "($2)", sqlutil.QueryVariadicOffset(len(userNIDs), len(roomNIDs)), 1) + query = strings.Replace(selectJoinedUsersSetForRoomsAndUserSQL, "($2)", sqlutil.QueryVariadicOffset(len(roomNIDs), 1), 1) + query = strings.Replace(query, "($3)", sqlutil.QueryVariadicOffset(len(userNIDs), len(roomNIDs)+1), 1) } var rows *sql.Rows var err error diff --git a/roomserver/storage/tables/interface.go b/roomserver/storage/tables/interface.go index 68d30f994..d7bcc95ab 100644 --- a/roomserver/storage/tables/interface.go +++ b/roomserver/storage/tables/interface.go @@ -137,7 +137,7 @@ type Membership interface { UpdateMembership(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID, senderUserNID types.EventStateKeyNID, membership MembershipState, eventNID types.EventNID, forgotten bool) (bool, error) SelectRoomsWithMembership(ctx context.Context, txn *sql.Tx, userID types.EventStateKeyNID, membershipState MembershipState) ([]types.RoomNID, error) // SelectJoinedUsersSetForRooms returns how many times each of the given users appears across the given rooms. - SelectJoinedUsersSetForRooms(ctx context.Context, txn *sql.Tx, roomNIDs []types.RoomNID, userNIDs []types.EventStateKeyNID) (map[types.EventStateKeyNID]int, error) + SelectJoinedUsersSetForRooms(ctx context.Context, txn *sql.Tx, roomNIDs []types.RoomNID, userNIDs []types.EventStateKeyNID, localOnly bool) (map[types.EventStateKeyNID]int, error) SelectKnownUsers(ctx context.Context, txn *sql.Tx, userID types.EventStateKeyNID, searchString string, limit int) ([]string, error) UpdateForgetMembership(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID, forget bool) error SelectLocalServerInRoom(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) (bool, error) diff --git a/roomserver/storage/tables/membership_table_test.go b/roomserver/storage/tables/membership_table_test.go index f789ef4ac..c9541d9d2 100644 --- a/roomserver/storage/tables/membership_table_test.go +++ b/roomserver/storage/tables/membership_table_test.go @@ -79,7 +79,7 @@ func TestMembershipTable(t *testing.T) { assert.NoError(t, err) assert.True(t, inRoom) - userJoinedToRooms, err := tab.SelectJoinedUsersSetForRooms(ctx, nil, []types.RoomNID{1}, userNIDs) + userJoinedToRooms, err := tab.SelectJoinedUsersSetForRooms(ctx, nil, []types.RoomNID{1}, userNIDs, false) assert.NoError(t, err) assert.Equal(t, 1, len(userJoinedToRooms)) diff --git a/syncapi/consumers/keychange.go b/syncapi/consumers/keychange.go index dc7d9e207..96ebba7ef 100644 --- a/syncapi/consumers/keychange.go +++ b/syncapi/consumers/keychange.go @@ -111,7 +111,8 @@ func (s *OutputKeyChangeEventConsumer) onDeviceKeyMessage(m api.DeviceMessage, d // work out who we need to notify about the new key var queryRes roomserverAPI.QuerySharedUsersResponse err := s.rsAPI.QuerySharedUsers(s.ctx, &roomserverAPI.QuerySharedUsersRequest{ - UserID: output.UserID, + UserID: output.UserID, + LocalOnly: true, }, &queryRes) if err != nil { logrus.WithError(err).Error("syncapi: failed to QuerySharedUsers for key change event from key server") @@ -135,7 +136,8 @@ func (s *OutputKeyChangeEventConsumer) onCrossSigningMessage(m api.DeviceMessage // work out who we need to notify about the new key var queryRes roomserverAPI.QuerySharedUsersResponse err := s.rsAPI.QuerySharedUsers(s.ctx, &roomserverAPI.QuerySharedUsersRequest{ - UserID: output.UserID, + UserID: output.UserID, + LocalOnly: true, }, &queryRes) if err != nil { logrus.WithError(err).Error("syncapi: failed to QuerySharedUsers for key change event from key server")