Implementation for sqlite3

This commit is contained in:
Prateek Sachan 2020-03-17 21:13:59 +05:30
parent 4065c700be
commit 2de1e2a05d
2 changed files with 20 additions and 16 deletions

View file

@ -19,10 +19,10 @@ import (
"context" "context"
"database/sql" "database/sql"
"encoding/json" "encoding/json"
"sort"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
"sort"
"strings"
"github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
@ -58,12 +58,12 @@ const selectEventsSQL = "" +
const selectRecentEventsSQL = "" + const selectRecentEventsSQL = "" +
"SELECT id, event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + "SELECT id, event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
" WHERE room_id = $1 AND id > $2 AND id <= $3" + " WHERE room_id = $1 AND id > $2 AND id <= $3 AND sender NOT IN $5" +
" ORDER BY id DESC LIMIT $4" " ORDER BY id DESC LIMIT $4"
const selectRecentEventsForSyncSQL = "" + const selectRecentEventsForSyncSQL = "" +
"SELECT id, event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + "SELECT id, event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
" WHERE room_id = $1 AND id > $2 AND id <= $3 AND exclude_from_sync = FALSE" + " WHERE room_id = $1 AND id > $2 AND id <= $3 AND sender NOT IN $5 AND exclude_from_sync = FALSE" +
" ORDER BY id DESC LIMIT $4" " ORDER BY id DESC LIMIT $4"
const selectEarlyEventsSQL = "" + const selectEarlyEventsSQL = "" +
@ -302,16 +302,19 @@ func (s *outputRoomEventsStatements) insertEvent(
func (s *outputRoomEventsStatements) selectRecentEvents( func (s *outputRoomEventsStatements) selectRecentEvents(
ctx context.Context, txn *sql.Tx, ctx context.Context, txn *sql.Tx,
roomID string, fromPos, toPos types.StreamPosition, limit int, roomID string, fromPos, toPos types.StreamPosition, limit int,
chronologicalOrder bool, onlySyncEvents bool, chronologicalOrder bool, onlySyncEvents bool, ignoredUsers []string,
) ([]types.StreamEvent, error) { ) ([]types.StreamEvent, error) {
var stmt *sql.Stmt iIgnoredUsers := make([]interface{}, len(ignoredUsers))
if onlySyncEvents { for k, v := range ignoredUsers {
stmt = common.TxStmt(txn, s.selectRecentEventsForSyncStmt) iIgnoredUsers[k] = v
} else {
stmt = common.TxStmt(txn, s.selectRecentEventsStmt)
} }
var query string
rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, limit) if onlySyncEvents {
query = strings.Replace(selectRecentEventsForSyncSQL, "($5)", common.QueryVariadic(len(iIgnoredUsers)), 1)
} else {
query = strings.Replace(selectRecentEventsSQL, "($5)", common.QueryVariadic(len(iIgnoredUsers)), 1)
}
rows, err := txn.QueryContext(ctx, query, roomID, fromPos, toPos, limit, iIgnoredUsers)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -318,7 +318,7 @@ func (d *SyncServerDatasource) GetEventsInRange(
if backwardOrdering { if backwardOrdering {
// When using backward ordering, we want the most recent events first. // When using backward ordering, we want the most recent events first.
if events, err = d.events.selectRecentEvents( if events, err = d.events.selectRecentEvents(
ctx, nil, roomID, to.PDUPosition, from.PDUPosition, limit, false, false, ctx, nil, roomID, to.PDUPosition, from.PDUPosition, limit, false, false, nil,
); err != nil { ); err != nil {
return return
} }
@ -595,6 +595,7 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync(
ctx context.Context, ctx context.Context,
userID string, userID string,
numRecentEventsPerRoom int, numRecentEventsPerRoom int,
ignoredUsers []string,
) ( ) (
res *types.Response, res *types.Response,
toPos types.PaginationToken, toPos types.PaginationToken,
@ -647,7 +648,7 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync(
var recentStreamEvents []types.StreamEvent var recentStreamEvents []types.StreamEvent
recentStreamEvents, err = d.events.selectRecentEvents( recentStreamEvents, err = d.events.selectRecentEvents(
ctx, txn, roomID, types.StreamPosition(0), toPos.PDUPosition, ctx, txn, roomID, types.StreamPosition(0), toPos.PDUPosition,
numRecentEventsPerRoom, true, true, numRecentEventsPerRoom, true, true, ignoredUsers,
) )
if err != nil { if err != nil {
return return
@ -691,7 +692,7 @@ func (d *SyncServerDatasource) CompleteSync(
ctx context.Context, userID string, numRecentEventsPerRoom int, ignoredUsers []string, ctx context.Context, userID string, numRecentEventsPerRoom int, ignoredUsers []string,
) (*types.Response, error) { ) (*types.Response, error) {
res, toPos, joinedRoomIDs, err := d.getResponseWithPDUsForCompleteSync( res, toPos, joinedRoomIDs, err := d.getResponseWithPDUsForCompleteSync(
ctx, userID, numRecentEventsPerRoom, ctx, userID, numRecentEventsPerRoom, ignoredUsers,
) )
if err != nil { if err != nil {
return nil, err return nil, err
@ -856,7 +857,7 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse(
} }
recentStreamEvents, err := d.events.selectRecentEvents( recentStreamEvents, err := d.events.selectRecentEvents(
ctx, txn, delta.roomID, types.StreamPosition(fromPos), types.StreamPosition(endPos), ctx, txn, delta.roomID, types.StreamPosition(fromPos), types.StreamPosition(endPos),
numRecentEventsPerRoom, true, true, numRecentEventsPerRoom, true, true, ignoredUsers,
) )
if err != nil { if err != nil {
return err return err