mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-21 04:53:14 -06:00
Make SQLite work again
This commit is contained in:
parent
823540e861
commit
28690b72f9
|
|
@ -21,6 +21,7 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/storage/tables"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
|
|
@ -226,6 +227,7 @@ func (r *Queryer) QueryMembershipAtEvent(
|
||||||
return fmt.Errorf("no roomInfo found")
|
return fmt.Errorf("no roomInfo found")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// get the users stateKeyNID
|
||||||
stateKeyNIDs, err := r.DB.EventStateKeyNIDs(ctx, []string{request.UserID})
|
stateKeyNIDs, err := r.DB.EventStateKeyNIDs(ctx, []string{request.UserID})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("unable to get stateKeyNIDs for %s: %w", request.UserID, err)
|
return fmt.Errorf("unable to get stateKeyNIDs for %s: %w", request.UserID, err)
|
||||||
|
|
@ -238,74 +240,62 @@ func (r *Queryer) QueryMembershipAtEvent(
|
||||||
switch err {
|
switch err {
|
||||||
case nil:
|
case nil:
|
||||||
return nil
|
return nil
|
||||||
//case tables.OptimisationNotSupportedError: // fallthrough
|
case tables.OptimisationNotSupportedError: // fallthrough
|
||||||
default:
|
default:
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
response.Memberships = make(map[string]*gomatrixserverlib.HeaderedEvent)
|
||||||
// get the users stateKeyNID
|
stateEntries, err := helpers.MembershipAtEvent(ctx, r.DB, nil, request.EventIDs, stateKeyNIDs[request.UserID])
|
||||||
stateKeyNIDs, err := r.DB.EventStateKeyNIDs(ctx, []string{request.UserID})
|
if err != nil {
|
||||||
if err != nil {
|
return fmt.Errorf("unable to get state before event: %w", err)
|
||||||
return fmt.Errorf("unable to get stateKeyNIDs for %s: %w", request.UserID, err)
|
}
|
||||||
|
|
||||||
|
// If we only have one or less state entries, we can short circuit the below
|
||||||
|
// loop and avoid hitting the database
|
||||||
|
allStateEventNIDs := make(map[types.EventNID]types.StateEntry)
|
||||||
|
for _, eventID := range request.EventIDs {
|
||||||
|
stateEntry := stateEntries[eventID]
|
||||||
|
for _, s := range stateEntry {
|
||||||
|
allStateEventNIDs[s.EventNID] = s
|
||||||
}
|
}
|
||||||
if _, ok := stateKeyNIDs[request.UserID]; !ok {
|
}
|
||||||
return fmt.Errorf("requested stateKeyNID for %s was not found", request.UserID)
|
|
||||||
|
var canShortCircuit bool
|
||||||
|
if len(allStateEventNIDs) <= 1 {
|
||||||
|
canShortCircuit = true
|
||||||
|
}
|
||||||
|
|
||||||
|
var memberships []types.Event
|
||||||
|
for _, eventID := range request.EventIDs {
|
||||||
|
stateEntry, ok := stateEntries[eventID]
|
||||||
|
if !ok || len(stateEntry) == 0 {
|
||||||
|
response.Memberships[eventID] = nil
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
stateEntries, err := helpers.MembershipAtEvent(ctx, r.DB, nil, request.EventIDs, stateKeyNIDs[request.UserID])
|
// If we can short circuit, e.g. we only have 0 or 1 membership events, we only get the memberships
|
||||||
if err != nil {
|
// once. If we have more than one membership event, we need to get the state for each state entry.
|
||||||
return fmt.Errorf("unable to get state before event: %w", err)
|
if canShortCircuit {
|
||||||
}
|
if len(memberships) == 0 {
|
||||||
|
|
||||||
// If we only have one or less state entries, we can short circuit the below
|
|
||||||
// loop and avoid hitting the database
|
|
||||||
allStateEventNIDs := make(map[types.EventNID]types.StateEntry)
|
|
||||||
for _, eventID := range request.EventIDs {
|
|
||||||
stateEntry := stateEntries[eventID]
|
|
||||||
for _, s := range stateEntry {
|
|
||||||
allStateEventNIDs[s.EventNID] = s
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var canShortCircuit bool
|
|
||||||
if len(allStateEventNIDs) <= 1 {
|
|
||||||
canShortCircuit = true
|
|
||||||
}
|
|
||||||
|
|
||||||
var memberships []types.Event
|
|
||||||
for _, eventID := range request.EventIDs {
|
|
||||||
stateEntry, ok := stateEntries[eventID]
|
|
||||||
if !ok || len(stateEntry) == 0 {
|
|
||||||
response.Memberships[eventID] = nil
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// If we can short circuit, e.g. we only have 0 or 1 membership events, we only get the memberships
|
|
||||||
// once. If we have more than one membership event, we need to get the state for each state entry.
|
|
||||||
if canShortCircuit {
|
|
||||||
if len(memberships) == 0 {
|
|
||||||
memberships, err = helpers.GetMembershipsAtState(ctx, r.DB, stateEntry, false)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
memberships, err = helpers.GetMembershipsAtState(ctx, r.DB, stateEntry, false)
|
memberships, err = helpers.GetMembershipsAtState(ctx, r.DB, stateEntry, false)
|
||||||
}
|
}
|
||||||
if err != nil {
|
} else {
|
||||||
return fmt.Errorf("unable to get memberships at state: %w", err)
|
memberships, err = helpers.GetMembershipsAtState(ctx, r.DB, stateEntry, false)
|
||||||
}
|
}
|
||||||
|
if err != nil {
|
||||||
res := make([]*gomatrixserverlib.HeaderedEvent, 0, len(memberships))
|
return fmt.Errorf("unable to get memberships at state: %w", err)
|
||||||
|
|
||||||
for i := range memberships {
|
|
||||||
ev := memberships[i]
|
|
||||||
if ev.Type() == gomatrixserverlib.MRoomMember && ev.StateKeyEquals(request.UserID) {
|
|
||||||
res = append(res, ev.Headered(roomVersion))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
response.Memberships[eventID] = res
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil*/
|
for i := range memberships {
|
||||||
|
ev := memberships[i]
|
||||||
|
if ev.Type() == gomatrixserverlib.MRoomMember && ev.StateKeyEquals(request.UserID) {
|
||||||
|
response.Memberships[eventID] = ev.Event.Headered(info.RoomVersion)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// QueryMembershipsForRoom implements api.RoomserverInternalAPI
|
// QueryMembershipsForRoom implements api.RoomserverInternalAPI
|
||||||
|
|
|
||||||
|
|
@ -200,7 +200,7 @@ func visibilityForEvents(
|
||||||
visibility: event.Visibility,
|
visibility: event.Visibility,
|
||||||
}
|
}
|
||||||
ev, ok := membershipResp.Memberships[eventID]
|
ev, ok := membershipResp.Memberships[eventID]
|
||||||
if !ok {
|
if !ok || ev == nil {
|
||||||
result[eventID] = vis
|
result[eventID] = vis
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -407,7 +407,11 @@ func (s *outputRoomEventsStatements) InsertEvent(
|
||||||
// selectRecentEvents returns the most recent events in the given room, up to a maximum of 'limit'.
|
// selectRecentEvents returns the most recent events in the given room, up to a maximum of 'limit'.
|
||||||
// If onlySyncEvents has a value of true, only returns the events that aren't marked as to exclude
|
// If onlySyncEvents has a value of true, only returns the events that aren't marked as to exclude
|
||||||
// from sync.
|
// from sync.
|
||||||
func (s *outputRoomEventsStatements) SelectRecentEvents(ctx context.Context, txn *sql.Tx, roomIDs []string, ra types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) (map[string]types.RecentEvents, error) {
|
func (s *outputRoomEventsStatements) SelectRecentEvents(
|
||||||
|
ctx context.Context, txn *sql.Tx,
|
||||||
|
roomIDs []string, ra types.Range, eventFilter *gomatrixserverlib.RoomEventFilter,
|
||||||
|
chronologicalOrder bool, onlySyncEvents bool,
|
||||||
|
) (map[string]types.RecentEvents, error) {
|
||||||
var stmt *sql.Stmt
|
var stmt *sql.Stmt
|
||||||
if onlySyncEvents {
|
if onlySyncEvents {
|
||||||
stmt = sqlutil.TxStmt(txn, s.selectRecentEventsForSyncStmt)
|
stmt = sqlutil.TxStmt(txn, s.selectRecentEventsForSyncStmt)
|
||||||
|
|
|
||||||
|
|
@ -366,7 +366,11 @@ func (s *outputRoomEventsStatements) InsertEvent(
|
||||||
return streamPos, err
|
return streamPos, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *outputRoomEventsStatements) SelectRecentEvents(ctx context.Context, txn *sql.Tx, roomIDs []string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) (map[string]types.RecentEvents, error) {
|
func (s *outputRoomEventsStatements) SelectRecentEvents(
|
||||||
|
ctx context.Context, txn *sql.Tx,
|
||||||
|
roomIDs []string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter,
|
||||||
|
chronologicalOrder bool, onlySyncEvents bool,
|
||||||
|
) (map[string]types.RecentEvents, error) {
|
||||||
var query string
|
var query string
|
||||||
if onlySyncEvents {
|
if onlySyncEvents {
|
||||||
query = selectRecentEventsForSyncSQL
|
query = selectRecentEventsForSyncSQL
|
||||||
|
|
@ -374,47 +378,55 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(ctx context.Context, txn
|
||||||
query = selectRecentEventsSQL
|
query = selectRecentEventsSQL
|
||||||
}
|
}
|
||||||
|
|
||||||
stmt, params, err := prepareWithFilters(
|
result := make(map[string]types.RecentEvents, len(roomIDs))
|
||||||
s.db, txn, query,
|
for _, roomID := range roomIDs {
|
||||||
[]interface{}{
|
stmt, params, err := prepareWithFilters(
|
||||||
roomIDs, r.Low(), r.High(),
|
s.db, txn, query,
|
||||||
},
|
[]interface{}{
|
||||||
eventFilter.Senders, eventFilter.NotSenders,
|
roomID, r.Low(), r.High(),
|
||||||
eventFilter.Types, eventFilter.NotTypes,
|
},
|
||||||
nil, eventFilter.ContainsURL, eventFilter.Limit+1, FilterOrderDesc,
|
eventFilter.Senders, eventFilter.NotSenders,
|
||||||
)
|
eventFilter.Types, eventFilter.NotTypes,
|
||||||
if err != nil {
|
nil, eventFilter.ContainsURL, eventFilter.Limit+1, FilterOrderDesc,
|
||||||
return nil, fmt.Errorf("s.prepareWithFilters: %w", err)
|
)
|
||||||
}
|
if err != nil {
|
||||||
defer internal.CloseAndLogIfError(ctx, stmt, "selectRecentEvents: stmt.close() failed")
|
return nil, fmt.Errorf("s.prepareWithFilters: %w", err)
|
||||||
|
|
||||||
rows, err := stmt.QueryContext(ctx, params...)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
defer internal.CloseAndLogIfError(ctx, rows, "selectRecentEvents: rows.close() failed")
|
|
||||||
events, err := rowsToStreamEvents(rows)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if chronologicalOrder {
|
|
||||||
// The events need to be returned from oldest to latest, which isn't
|
|
||||||
// necessary the way the SQL query returns them, so a sort is necessary to
|
|
||||||
// ensure the events are in the right order in the slice.
|
|
||||||
sort.SliceStable(events, func(i int, j int) bool {
|
|
||||||
return events[i].StreamPosition < events[j].StreamPosition
|
|
||||||
})
|
|
||||||
}
|
|
||||||
// we queried for 1 more than the limit, so if we returned one more mark limited=true
|
|
||||||
if len(events) > eventFilter.Limit {
|
|
||||||
// re-slice the extra (oldest) event out: in chronological order this is the first entry, else the last.
|
|
||||||
if chronologicalOrder {
|
|
||||||
events = events[1:]
|
|
||||||
} else {
|
|
||||||
events = events[:len(events)-1]
|
|
||||||
}
|
}
|
||||||
|
defer internal.CloseAndLogIfError(ctx, stmt, "selectRecentEvents: stmt.close() failed")
|
||||||
|
|
||||||
|
rows, err := stmt.QueryContext(ctx, params...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer internal.CloseAndLogIfError(ctx, rows, "selectRecentEvents: rows.close() failed")
|
||||||
|
events, err := rowsToStreamEvents(rows)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if chronologicalOrder {
|
||||||
|
// The events need to be returned from oldest to latest, which isn't
|
||||||
|
// necessary the way the SQL query returns them, so a sort is necessary to
|
||||||
|
// ensure the events are in the right order in the slice.
|
||||||
|
sort.SliceStable(events, func(i int, j int) bool {
|
||||||
|
return events[i].StreamPosition < events[j].StreamPosition
|
||||||
|
})
|
||||||
|
}
|
||||||
|
res := types.RecentEvents{}
|
||||||
|
// we queried for 1 more than the limit, so if we returned one more mark limited=true
|
||||||
|
if len(events) > eventFilter.Limit {
|
||||||
|
res.Limited = true
|
||||||
|
// re-slice the extra (oldest) event out: in chronological order this is the first entry, else the last.
|
||||||
|
if chronologicalOrder {
|
||||||
|
events = events[1:]
|
||||||
|
} else {
|
||||||
|
events = events[:len(events)-1]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
res.Events = events
|
||||||
|
result[roomID] = res
|
||||||
}
|
}
|
||||||
return map[string]types.RecentEvents{}, nil
|
|
||||||
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *outputRoomEventsStatements) SelectEarlyEvents(
|
func (s *outputRoomEventsStatements) SelectEarlyEvents(
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue