Merge branch 'main' of github.com:matrix-org/dendrite into s7evink/consent-tracking

This commit is contained in:
Till Faelligen 2022-02-22 07:48:36 +01:00
commit 2042303c6c
15 changed files with 552 additions and 63 deletions

View file

@ -29,6 +29,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/internal/input" "github.com/matrix-org/dendrite/roomserver/internal/input"
"github.com/matrix-org/dendrite/roomserver/internal/query" "github.com/matrix-org/dendrite/roomserver/internal/query"
"github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
@ -367,7 +368,15 @@ func buildEvent(
StateToFetch: eventsNeeded.Tuples(), StateToFetch: eventsNeeded.Tuples(),
}, &queryRes) }, &queryRes)
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("QueryLatestEventsAndState: %w", err) switch err.(type) {
case types.MissingStateError:
// We know something about the room but the state seems to be
// insufficient to actually build a new event, so in effect we
// had might as well treat the room as if it doesn't exist.
return nil, nil, eventutil.ErrRoomNoExists
default:
return nil, nil, fmt.Errorf("QueryLatestEventsAndState: %w", err)
}
} }
ev, err := eventutil.BuildEvent(ctx, builder, cfg, time.Now(), &eventsNeeded, &queryRes) ev, err := eventutil.BuildEvent(ctx, builder, cfg, time.Now(), &eventsNeeded, &queryRes)

View file

@ -134,7 +134,7 @@ func (s *stateSnapshotStatements) BulkSelectStateBlockNIDs(
return nil, err return nil, err
} }
if i != len(stateNIDs) { if i != len(stateNIDs) {
return nil, fmt.Errorf("storage: state NIDs missing from the database (%d != %d)", i, len(stateNIDs)) return nil, types.MissingStateError(fmt.Sprintf("storage: state NIDs missing from the database (%d != %d)", i, len(stateNIDs)))
} }
return results, nil return results, nil
} }

View file

@ -137,7 +137,7 @@ func (s *stateSnapshotStatements) BulkSelectStateBlockNIDs(
} }
} }
if i != len(stateNIDs) { if i != len(stateNIDs) {
return nil, fmt.Errorf("storage: state NIDs missing from the database (%d != %d)", i, len(stateNIDs)) return nil, types.MissingStateError(fmt.Sprintf("storage: state NIDs missing from the database (%d != %d)", i, len(stateNIDs)))
} }
return results, nil return results, nil
} }

View file

@ -213,6 +213,12 @@ type MissingEventError string
func (e MissingEventError) Error() string { return string(e) } func (e MissingEventError) Error() string { return string(e) }
// A MissingStateError is an error that happened because the roomserver was
// missing requested state snapshots from its databases.
type MissingStateError string
func (e MissingStateError) Error() string { return string(e) }
// A RejectedError is returned when an event is stored as rejected. The error // A RejectedError is returned when an event is stored as rejected. The error
// contains the reason why. // contains the reason why.
type RejectedError string type RejectedError string

191
syncapi/routing/context.go Normal file
View file

@ -0,0 +1,191 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package routing
import (
"database/sql"
"encoding/json"
"net/http"
"strconv"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
roomserver "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/storage"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
)
type ContextRespsonse struct {
End string `json:"end"`
Event gomatrixserverlib.ClientEvent `json:"event"`
EventsAfter []gomatrixserverlib.ClientEvent `json:"events_after,omitempty"`
EventsBefore []gomatrixserverlib.ClientEvent `json:"events_before,omitempty"`
Start string `json:"start"`
State []gomatrixserverlib.ClientEvent `json:"state"`
}
func Context(
req *http.Request, device *userapi.Device,
rsAPI roomserver.RoomserverInternalAPI,
syncDB storage.Database,
roomID, eventID string,
) util.JSONResponse {
filter, err := parseContextParams(req)
if err != nil {
errMsg := ""
switch err.(type) {
case *json.InvalidUnmarshalError:
errMsg = "unable to parse filter"
case *strconv.NumError:
errMsg = "unable to parse limit"
}
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.InvalidParam(errMsg),
Headers: nil,
}
}
filter.Rooms = append(filter.Rooms, roomID)
ctx := req.Context()
membershipRes := roomserver.QueryMembershipForUserResponse{}
membershipReq := roomserver.QueryMembershipForUserRequest{UserID: device.UserID, RoomID: roomID}
if err = rsAPI.QueryMembershipForUser(ctx, &membershipReq, &membershipRes); err != nil {
logrus.WithError(err).Error("unable to fo membership")
return jsonerror.InternalServerError()
}
stateFilter := gomatrixserverlib.StateFilter{
Limit: 100,
NotSenders: filter.NotSenders,
NotTypes: filter.NotTypes,
Senders: filter.Senders,
Types: filter.Types,
LazyLoadMembers: filter.LazyLoadMembers,
IncludeRedundantMembers: filter.IncludeRedundantMembers,
NotRooms: filter.NotRooms,
Rooms: filter.Rooms,
ContainsURL: filter.ContainsURL,
}
// TODO: Get the actual state at the last event returned by SelectContextAfterEvent
state, _ := syncDB.CurrentState(ctx, roomID, &stateFilter, nil)
// verify the user is allowed to see the context for this room/event
for _, x := range state {
var hisVis string
hisVis, err = x.HistoryVisibility()
if err != nil {
continue
}
allowed := hisVis == gomatrixserverlib.WorldReadable || membershipRes.Membership == gomatrixserverlib.Join
if !allowed {
return util.JSONResponse{
Code: http.StatusForbidden,
JSON: jsonerror.Forbidden("User is not allowed to query context"),
}
}
}
id, requestedEvent, err := syncDB.SelectContextEvent(ctx, roomID, eventID)
if err != nil {
logrus.WithError(err).WithField("eventID", eventID).Error("unable to find requested event")
return jsonerror.InternalServerError()
}
eventsBefore, err := syncDB.SelectContextBeforeEvent(ctx, id, roomID, filter)
if err != nil && err != sql.ErrNoRows {
logrus.WithError(err).Error("unable to fetch before events")
return jsonerror.InternalServerError()
}
_, eventsAfter, err := syncDB.SelectContextAfterEvent(ctx, id, roomID, filter)
if err != nil && err != sql.ErrNoRows {
logrus.WithError(err).Error("unable to fetch after events")
return jsonerror.InternalServerError()
}
eventsBeforeClient := gomatrixserverlib.HeaderedToClientEvents(eventsBefore, gomatrixserverlib.FormatAll)
eventsAfterClient := gomatrixserverlib.HeaderedToClientEvents(eventsAfter, gomatrixserverlib.FormatAll)
newState := applyLazyLoadMembers(filter, eventsAfterClient, eventsBeforeClient, state)
response := ContextRespsonse{
Event: gomatrixserverlib.HeaderedToClientEvent(&requestedEvent, gomatrixserverlib.FormatAll),
EventsAfter: eventsAfterClient,
EventsBefore: eventsBeforeClient,
State: gomatrixserverlib.HeaderedToClientEvents(newState, gomatrixserverlib.FormatAll),
}
if len(response.State) > filter.Limit {
response.State = response.State[len(response.State)-filter.Limit:]
}
return util.JSONResponse{
Code: http.StatusOK,
JSON: response,
}
}
func applyLazyLoadMembers(filter *gomatrixserverlib.RoomEventFilter, eventsAfter, eventsBefore []gomatrixserverlib.ClientEvent, state []*gomatrixserverlib.HeaderedEvent) []*gomatrixserverlib.HeaderedEvent {
if filter == nil || !filter.LazyLoadMembers {
return state
}
allEvents := append(eventsBefore, eventsAfter...)
x := make(map[string]bool)
// get members who actually send an event
for _, e := range allEvents {
x[e.Sender] = true
}
newState := []*gomatrixserverlib.HeaderedEvent{}
for _, event := range state {
if event.Type() != gomatrixserverlib.MRoomMember {
newState = append(newState, event)
} else {
// did the user send an event?
if x[event.Sender()] {
newState = append(newState, event)
}
}
}
return newState
}
func parseContextParams(req *http.Request) (*gomatrixserverlib.RoomEventFilter, error) {
// Default room filter
filter := &gomatrixserverlib.RoomEventFilter{Limit: 10}
l := req.URL.Query().Get("limit")
f := req.URL.Query().Get("filter")
if l != "" {
limit, err := strconv.Atoi(l)
if err != nil {
return nil, err
}
// NOTSPEC: feels like a good idea to have an upper bound limit
if limit > 100 {
limit = 100
}
filter.Limit = limit
}
if f != "" {
if err := json.Unmarshal([]byte(f), &filter); err != nil {
return nil, err
}
}
return filter, nil
}

View file

@ -0,0 +1,68 @@
package routing
import (
"net/http"
"reflect"
"testing"
"github.com/matrix-org/gomatrixserverlib"
)
func Test_parseContextParams(t *testing.T) {
noParamsReq, _ := http.NewRequest("GET", "https://localhost:8800/_matrix/client/r0/rooms/!hyi4UaxS9mUXpSG9:localhost:8800/context/%24um_T82QqAXN8PayGiBW7j9WExpqTIQ7-JRq-Q6xpIf8?access_token=5dMB0z4tiulyBvCaIKgyjuWG71ybDiYIwNJVJ2UmxRI", nil)
limit2Req, _ := http.NewRequest("GET", "https://localhost:8800/_matrix/client/r0/rooms/!hyi4UaxS9mUXpSG9:localhost:8800/context/%24um_T82QqAXN8PayGiBW7j9WExpqTIQ7-JRq-Q6xpIf8?access_token=5dMB0z4tiulyBvCaIKgyjuWG71ybDiYIwNJVJ2UmxRI&limit=2", nil)
limit10000Req, _ := http.NewRequest("GET", "https://localhost:8800/_matrix/client/r0/rooms/!hyi4UaxS9mUXpSG9:localhost:8800/context/%24um_T82QqAXN8PayGiBW7j9WExpqTIQ7-JRq-Q6xpIf8?access_token=5dMB0z4tiulyBvCaIKgyjuWG71ybDiYIwNJVJ2UmxRI&limit=10000", nil)
invalidLimitReq, _ := http.NewRequest("GET", "https://localhost:8800/_matrix/client/r0/rooms/!hyi4UaxS9mUXpSG9:localhost:8800/context/%24um_T82QqAXN8PayGiBW7j9WExpqTIQ7-JRq-Q6xpIf8?access_token=5dMB0z4tiulyBvCaIKgyjuWG71ybDiYIwNJVJ2UmxRI&limit=100as", nil)
lazyLoadReq, _ := http.NewRequest("GET", "https://localhost:8800//_matrix/client/r0/rooms/!kvEtX3rFamfwKHO3:localhost:8800/context/%24GjmkRbajRHy8_cxcSbUU4qF_njV8yHeLphI2azTrPaI?limit=2&filter=%7B+%22lazy_load_members%22+%3A+true+%7D&access_token=t1Njzm74w3G40CJ5xrlf1V2haXom0z0Iq1qyyVWhbVo", nil)
invalidFilterReq, _ := http.NewRequest("GET", "https://localhost:8800//_matrix/client/r0/rooms/!kvEtX3rFamfwKHO3:localhost:8800/context/%24GjmkRbajRHy8_cxcSbUU4qF_njV8yHeLphI2azTrPaI?limit=2&filter=%7B+%22lazy_load_members%22+%3A+true&access_token=t1Njzm74w3G40CJ5xrlf1V2haXom0z0Iq1qyyVWhbVo", nil)
tests := []struct {
name string
req *http.Request
wantFilter *gomatrixserverlib.RoomEventFilter
wantErr bool
}{
{
name: "no params set",
req: noParamsReq,
wantFilter: &gomatrixserverlib.RoomEventFilter{Limit: 10},
},
{
name: "limit 2 param set",
req: limit2Req,
wantFilter: &gomatrixserverlib.RoomEventFilter{Limit: 2},
},
{
name: "limit 10000 param set",
req: limit10000Req,
wantFilter: &gomatrixserverlib.RoomEventFilter{Limit: 100},
},
{
name: "filter lazy_load_members param set",
req: lazyLoadReq,
wantFilter: &gomatrixserverlib.RoomEventFilter{Limit: 2, LazyLoadMembers: true},
},
{
name: "invalid limit req",
req: invalidLimitReq,
wantErr: true,
},
{
name: "invalid filter req",
req: invalidFilterReq,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotFilter, err := parseContextParams(tt.req)
if (err != nil) != tt.wantErr {
t.Errorf("parseContextParams() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(gotFilter, tt.wantFilter) {
t.Errorf("parseContextParams() gotFilter = %v, want %v", gotFilter, tt.wantFilter)
}
})
}
}

View file

@ -77,4 +77,19 @@ func Setup(
v3mux.Handle("/keys/changes", httputil.MakeAuthAPI("keys_changes", userAPI, cfg.Matrix.UserConsentOptions, httputil.ConsentNotRequired, func(req *http.Request, device *userapi.Device) util.JSONResponse { v3mux.Handle("/keys/changes", httputil.MakeAuthAPI("keys_changes", userAPI, cfg.Matrix.UserConsentOptions, httputil.ConsentNotRequired, func(req *http.Request, device *userapi.Device) util.JSONResponse {
return srp.OnIncomingKeyChangeRequest(req, device) return srp.OnIncomingKeyChangeRequest(req, device)
})).Methods(http.MethodGet, http.MethodOptions) })).Methods(http.MethodGet, http.MethodOptions)
v3mux.Handle("/rooms/{roomId}/context/{eventId}",
httputil.MakeAuthAPI(gomatrixserverlib.Join, userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
if err != nil {
return util.ErrorResponse(err)
}
return Context(
req, device,
rsAPI, syncDB,
vars["roomId"], vars["eventId"],
)
}),
).Methods(http.MethodGet, http.MethodOptions)
} }

View file

@ -137,4 +137,8 @@ type Database interface {
StoreReceipt(ctx context.Context, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error) StoreReceipt(ctx context.Context, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error)
// GetRoomReceipts gets all receipts for a given roomID // GetRoomReceipts gets all receipts for a given roomID
GetRoomReceipts(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) ([]eduAPI.OutputReceiptEvent, error) GetRoomReceipts(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) ([]eduAPI.OutputReceiptEvent, error)
SelectContextEvent(ctx context.Context, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error)
SelectContextBeforeEvent(ctx context.Context, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) ([]*gomatrixserverlib.HeaderedEvent, error)
SelectContextAfterEvent(ctx context.Context, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error)
} }

View file

@ -130,6 +130,25 @@ const selectStateInRangeSQL = "" +
const deleteEventsForRoomSQL = "" + const deleteEventsForRoomSQL = "" +
"DELETE FROM syncapi_output_room_events WHERE room_id = $1" "DELETE FROM syncapi_output_room_events WHERE room_id = $1"
const selectContextEventSQL = "" +
"SELECT id, headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND event_id = $2"
const selectContextBeforeEventSQL = "" +
"SELECT headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id < $2" +
" AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
" AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
" AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" +
" AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" +
" ORDER BY id DESC LIMIT $3"
const selectContextAfterEventSQL = "" +
"SELECT id, headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id > $2" +
" AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
" AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
" AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" +
" AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" +
" ORDER BY id ASC LIMIT $3"
type outputRoomEventsStatements struct { type outputRoomEventsStatements struct {
insertEventStmt *sql.Stmt insertEventStmt *sql.Stmt
selectEventsStmt *sql.Stmt selectEventsStmt *sql.Stmt
@ -140,6 +159,9 @@ type outputRoomEventsStatements struct {
selectStateInRangeStmt *sql.Stmt selectStateInRangeStmt *sql.Stmt
updateEventJSONStmt *sql.Stmt updateEventJSONStmt *sql.Stmt
deleteEventsForRoomStmt *sql.Stmt deleteEventsForRoomStmt *sql.Stmt
selectContextEventStmt *sql.Stmt
selectContextBeforeEventStmt *sql.Stmt
selectContextAfterEventStmt *sql.Stmt
} }
func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) { func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
@ -148,34 +170,20 @@ func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
if s.insertEventStmt, err = db.Prepare(insertEventSQL); err != nil { return s, sqlutil.StatementList{
return nil, err {&s.insertEventStmt, insertEventSQL},
} {&s.selectEventsStmt, selectEventsSQL},
if s.selectEventsStmt, err = db.Prepare(selectEventsSQL); err != nil { {&s.selectMaxEventIDStmt, selectMaxEventIDSQL},
return nil, err {&s.selectRecentEventsStmt, selectRecentEventsSQL},
} {&s.selectRecentEventsForSyncStmt, selectRecentEventsForSyncSQL},
if s.selectMaxEventIDStmt, err = db.Prepare(selectMaxEventIDSQL); err != nil { {&s.selectEarlyEventsStmt, selectEarlyEventsSQL},
return nil, err {&s.selectStateInRangeStmt, selectStateInRangeSQL},
} {&s.updateEventJSONStmt, updateEventJSONSQL},
if s.selectRecentEventsStmt, err = db.Prepare(selectRecentEventsSQL); err != nil { {&s.deleteEventsForRoomStmt, deleteEventsForRoomSQL},
return nil, err {&s.selectContextEventStmt, selectContextEventSQL},
} {&s.selectContextBeforeEventStmt, selectContextBeforeEventSQL},
if s.selectRecentEventsForSyncStmt, err = db.Prepare(selectRecentEventsForSyncSQL); err != nil { {&s.selectContextAfterEventStmt, selectContextAfterEventSQL},
return nil, err }.Prepare(db)
}
if s.selectEarlyEventsStmt, err = db.Prepare(selectEarlyEventsSQL); err != nil {
return nil, err
}
if s.selectStateInRangeStmt, err = db.Prepare(selectStateInRangeSQL); err != nil {
return nil, err
}
if s.updateEventJSONStmt, err = db.Prepare(updateEventJSONSQL); err != nil {
return nil, err
}
if s.deleteEventsForRoomStmt, err = db.Prepare(deleteEventsForRoomSQL); err != nil {
return nil, err
}
return s, nil
} }
func (s *outputRoomEventsStatements) UpdateEventJSON(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error { func (s *outputRoomEventsStatements) UpdateEventJSON(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error {
@ -436,6 +444,84 @@ func (s *outputRoomEventsStatements) DeleteEventsForRoom(
return err return err
} }
func (s *outputRoomEventsStatements) SelectContextEvent(ctx context.Context, txn *sql.Tx, roomID, eventID string) (id int, evt gomatrixserverlib.HeaderedEvent, err error) {
row := sqlutil.TxStmt(txn, s.selectContextEventStmt).QueryRowContext(ctx, roomID, eventID)
var eventAsString string
if err = row.Scan(&id, &eventAsString); err != nil {
return 0, evt, err
}
if err = json.Unmarshal([]byte(eventAsString), &evt); err != nil {
return 0, evt, err
}
return id, evt, nil
}
func (s *outputRoomEventsStatements) SelectContextBeforeEvent(
ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter,
) (evts []*gomatrixserverlib.HeaderedEvent, err error) {
rows, err := sqlutil.TxStmt(txn, s.selectContextBeforeEventStmt).QueryContext(
ctx, roomID, id, filter.Limit,
pq.StringArray(filter.Senders),
pq.StringArray(filter.NotSenders),
pq.StringArray(filterConvertTypeWildcardToSQL(filter.Types)),
pq.StringArray(filterConvertTypeWildcardToSQL(filter.NotTypes)),
)
if err != nil {
return
}
defer rows.Close()
for rows.Next() {
var (
eventBytes []byte
evt *gomatrixserverlib.HeaderedEvent
)
if err = rows.Scan(&eventBytes); err != nil {
return evts, err
}
if err = json.Unmarshal(eventBytes, &evt); err != nil {
return evts, err
}
evts = append(evts, evt)
}
return evts, rows.Err()
}
func (s *outputRoomEventsStatements) SelectContextAfterEvent(
ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter,
) (lastID int, evts []*gomatrixserverlib.HeaderedEvent, err error) {
rows, err := sqlutil.TxStmt(txn, s.selectContextAfterEventStmt).QueryContext(
ctx, roomID, id, filter.Limit,
pq.StringArray(filter.Senders),
pq.StringArray(filter.NotSenders),
pq.StringArray(filterConvertTypeWildcardToSQL(filter.Types)),
pq.StringArray(filterConvertTypeWildcardToSQL(filter.NotTypes)),
)
if err != nil {
return
}
defer rows.Close()
for rows.Next() {
var (
eventBytes []byte
evt *gomatrixserverlib.HeaderedEvent
)
if err = rows.Scan(&lastID, &eventBytes); err != nil {
return 0, evts, err
}
if err = json.Unmarshal(eventBytes, &evt); err != nil {
return 0, evts, err
}
evts = append(evts, evt)
}
return lastID, evts, rows.Err()
}
func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) { func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
var result []types.StreamEvent var result []types.StreamEvent
for rows.Next() { for rows.Next() {

View file

@ -955,3 +955,14 @@ func (d *Database) GetRoomReceipts(ctx context.Context, roomIDs []string, stream
_, receipts, err := d.Receipts.SelectRoomReceiptsAfter(ctx, roomIDs, streamPos) _, receipts, err := d.Receipts.SelectRoomReceiptsAfter(ctx, roomIDs, streamPos)
return receipts, err return receipts, err
} }
func (s *Database) SelectContextEvent(ctx context.Context, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error) {
return s.OutputEvents.SelectContextEvent(ctx, nil, roomID, eventID)
}
func (s *Database) SelectContextBeforeEvent(ctx context.Context, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) ([]*gomatrixserverlib.HeaderedEvent, error) {
return s.OutputEvents.SelectContextBeforeEvent(ctx, nil, id, roomID, filter)
}
func (s *Database) SelectContextAfterEvent(ctx context.Context, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error) {
return s.OutputEvents.SelectContextAfterEvent(ctx, nil, id, roomID, filter)
}

View file

@ -68,7 +68,8 @@ const selectRoomIDsWithMembershipSQL = "" +
const selectCurrentStateSQL = "" + const selectCurrentStateSQL = "" +
"SELECT event_id, headered_event_json FROM syncapi_current_room_state WHERE room_id = $1" "SELECT event_id, headered_event_json FROM syncapi_current_room_state WHERE room_id = $1"
// WHEN, ORDER BY and LIMIT will be added by prepareWithFilter
// WHEN, ORDER BY and LIMIT will be added by prepareWithFilter
const selectJoinedUsersSQL = "" + const selectJoinedUsersSQL = "" +
"SELECT room_id, state_key FROM syncapi_current_room_state WHERE type = 'm.room.member' AND membership = 'join'" "SELECT room_id, state_key FROM syncapi_current_room_state WHERE type = 'm.room.member' AND membership = 'join'"

View file

@ -62,17 +62,17 @@ const selectEventsSQL = "" +
const selectRecentEventsSQL = "" + const selectRecentEventsSQL = "" +
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
" WHERE room_id = $1 AND id > $2 AND id <= $3" " WHERE room_id = $1 AND id > $2 AND id <= $3"
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters // WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
const selectRecentEventsForSyncSQL = "" + const selectRecentEventsForSyncSQL = "" +
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
" WHERE room_id = $1 AND id > $2 AND id <= $3 AND exclude_from_sync = FALSE" " WHERE room_id = $1 AND id > $2 AND id <= $3 AND exclude_from_sync = FALSE"
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters // WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
const selectEarlyEventsSQL = "" + const selectEarlyEventsSQL = "" +
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
" WHERE room_id = $1 AND id > $2 AND id <= $3" " WHERE room_id = $1 AND id > $2 AND id <= $3"
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters // WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
const selectMaxEventIDSQL = "" + const selectMaxEventIDSQL = "" +
"SELECT MAX(id) FROM syncapi_output_room_events" "SELECT MAX(id) FROM syncapi_output_room_events"
@ -85,19 +85,33 @@ const selectStateInRangeSQL = "" +
" FROM syncapi_output_room_events" + " FROM syncapi_output_room_events" +
" WHERE (id > $1 AND id <= $2)" + " WHERE (id > $1 AND id <= $2)" +
" AND ((add_state_ids IS NOT NULL AND add_state_ids != '') OR (remove_state_ids IS NOT NULL AND remove_state_ids != ''))" " AND ((add_state_ids IS NOT NULL AND add_state_ids != '') OR (remove_state_ids IS NOT NULL AND remove_state_ids != ''))"
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters // WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
const deleteEventsForRoomSQL = "" + const deleteEventsForRoomSQL = "" +
"DELETE FROM syncapi_output_room_events WHERE room_id = $1" "DELETE FROM syncapi_output_room_events WHERE room_id = $1"
const selectContextEventSQL = "" +
"SELECT id, headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND event_id = $2"
const selectContextBeforeEventSQL = "" +
"SELECT headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id < $2"
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
const selectContextAfterEventSQL = "" +
"SELECT id, headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id > $2"
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
type outputRoomEventsStatements struct { type outputRoomEventsStatements struct {
db *sql.DB db *sql.DB
streamIDStatements *streamIDStatements streamIDStatements *streamIDStatements
insertEventStmt *sql.Stmt insertEventStmt *sql.Stmt
selectEventsStmt *sql.Stmt selectEventsStmt *sql.Stmt
selectMaxEventIDStmt *sql.Stmt selectMaxEventIDStmt *sql.Stmt
updateEventJSONStmt *sql.Stmt updateEventJSONStmt *sql.Stmt
deleteEventsForRoomStmt *sql.Stmt deleteEventsForRoomStmt *sql.Stmt
selectContextEventStmt *sql.Stmt
selectContextBeforeEventStmt *sql.Stmt
selectContextAfterEventStmt *sql.Stmt
} }
func NewSqliteEventsTable(db *sql.DB, streamID *streamIDStatements) (tables.Events, error) { func NewSqliteEventsTable(db *sql.DB, streamID *streamIDStatements) (tables.Events, error) {
@ -109,22 +123,16 @@ func NewSqliteEventsTable(db *sql.DB, streamID *streamIDStatements) (tables.Even
if err != nil { if err != nil {
return nil, err return nil, err
} }
if s.insertEventStmt, err = db.Prepare(insertEventSQL); err != nil { return s, sqlutil.StatementList{
return nil, err {&s.insertEventStmt, insertEventSQL},
} {&s.selectEventsStmt, selectEventsSQL},
if s.selectEventsStmt, err = db.Prepare(selectEventsSQL); err != nil { {&s.selectMaxEventIDStmt, selectMaxEventIDSQL},
return nil, err {&s.updateEventJSONStmt, updateEventJSONSQL},
} {&s.deleteEventsForRoomStmt, deleteEventsForRoomSQL},
if s.selectMaxEventIDStmt, err = db.Prepare(selectMaxEventIDSQL); err != nil { {&s.selectContextEventStmt, selectContextEventSQL},
return nil, err {&s.selectContextBeforeEventStmt, selectContextBeforeEventSQL},
} {&s.selectContextAfterEventStmt, selectContextAfterEventSQL},
if s.updateEventJSONStmt, err = db.Prepare(updateEventJSONSQL); err != nil { }.Prepare(db)
return nil, err
}
if s.deleteEventsForRoomStmt, err = db.Prepare(deleteEventsForRoomSQL); err != nil {
return nil, err
}
return s, nil
} }
func (s *outputRoomEventsStatements) UpdateEventJSON(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error { func (s *outputRoomEventsStatements) UpdateEventJSON(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error {
@ -462,6 +470,91 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
} }
return result, nil return result, nil
} }
func (s *outputRoomEventsStatements) SelectContextEvent(
ctx context.Context, txn *sql.Tx, roomID, eventID string,
) (id int, evt gomatrixserverlib.HeaderedEvent, err error) {
row := sqlutil.TxStmt(txn, s.selectContextEventStmt).QueryRowContext(ctx, roomID, eventID)
var eventAsString string
if err = row.Scan(&id, &eventAsString); err != nil {
return 0, evt, err
}
if err = json.Unmarshal([]byte(eventAsString), &evt); err != nil {
return 0, evt, err
}
return id, evt, nil
}
func (s *outputRoomEventsStatements) SelectContextBeforeEvent(
ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter,
) (evts []*gomatrixserverlib.HeaderedEvent, err error) {
stmt, params, err := prepareWithFilters(
s.db, txn, selectContextBeforeEventSQL,
[]interface{}{
roomID, id,
},
filter.Senders, filter.NotSenders,
filter.Types, filter.NotTypes,
nil, filter.Limit, FilterOrderDesc,
)
rows, err := stmt.QueryContext(ctx, params...)
if err != nil {
return
}
defer rows.Close()
for rows.Next() {
var (
eventBytes []byte
evt *gomatrixserverlib.HeaderedEvent
)
if err = rows.Scan(&eventBytes); err != nil {
return evts, err
}
if err = json.Unmarshal(eventBytes, &evt); err != nil {
return evts, err
}
evts = append(evts, evt)
}
return evts, rows.Err()
}
func (s *outputRoomEventsStatements) SelectContextAfterEvent(
ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter,
) (lastID int, evts []*gomatrixserverlib.HeaderedEvent, err error) {
stmt, params, err := prepareWithFilters(
s.db, txn, selectContextAfterEventSQL,
[]interface{}{
roomID, id,
},
filter.Senders, filter.NotSenders,
filter.Types, filter.NotTypes,
nil, filter.Limit, FilterOrderAsc,
)
rows, err := stmt.QueryContext(ctx, params...)
if err != nil {
return
}
defer rows.Close()
for rows.Next() {
var (
eventBytes []byte
evt *gomatrixserverlib.HeaderedEvent
)
if err = rows.Scan(&lastID, &eventBytes); err != nil {
return 0, evts, err
}
if err = json.Unmarshal(eventBytes, &evt); err != nil {
return 0, evts, err
}
evts = append(evts, evt)
}
return lastID, evts, rows.Err()
}
func unmarshalStateIDs(addIDsJSON, delIDsJSON string) (addIDs []string, delIDs []string, err error) { func unmarshalStateIDs(addIDsJSON, delIDsJSON string) (addIDs []string, delIDs []string, err error) {
if len(addIDsJSON) > 0 { if len(addIDsJSON) > 0 {

View file

@ -63,6 +63,10 @@ type Events interface {
UpdateEventJSON(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error UpdateEventJSON(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error
// DeleteEventsForRoom removes all event information for a room. This should only be done when removing the room entirely. // DeleteEventsForRoom removes all event information for a room. This should only be done when removing the room entirely.
DeleteEventsForRoom(ctx context.Context, txn *sql.Tx, roomID string) (err error) DeleteEventsForRoom(ctx context.Context, txn *sql.Tx, roomID string) (err error)
SelectContextEvent(ctx context.Context, txn *sql.Tx, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error)
SelectContextBeforeEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) ([]*gomatrixserverlib.HeaderedEvent, error)
SelectContextAfterEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error)
} }
// Topology keeps track of the depths and stream positions for all events. // Topology keeps track of the depths and stream positions for all events.

View file

@ -24,11 +24,8 @@ Local device key changes get to remote servers with correct prev_id
# Flakey # Flakey
Local device key changes appear in /keys/changes Local device key changes appear in /keys/changes
Device list doesn't change if remote server is down
# we don't support groups # we don't support groups
Remove group category Remove group category
Remove group role Remove group role
# See https://github.com/matrix-org/sytest/pull/1142
Device list doesn't change if remote server is down

View file

@ -593,3 +593,7 @@ If a device list update goes missing, the server resyncs on the next one
uploading self-signing key notifies over federation uploading self-signing key notifies over federation
uploading signed devices gets propagated over federation uploading signed devices gets propagated over federation
Device list doesn't change if remote server is down Device list doesn't change if remote server is down
/context/ on joined room works
/context/ on non world readable room does not work
/context/ returns correct number of events
/context/ with lazy_load_members filter works