Merge branch 'master' into matthew/peeking-over-fed

This commit is contained in:
Kegan Dougal 2021-01-22 13:59:41 +00:00
commit 552f583938
10 changed files with 335 additions and 113 deletions

View file

@ -239,33 +239,23 @@ func (w *walker) walk() *gomatrixserverlib.MSC2946SpacesResponse {
// Mark this room as processed. // Mark this room as processed.
processed[roomID] = true processed[roomID] = true
// Is the caller currently joined to the room or is the room `world_readable` // Collect rooms/events to send back (either locally or fetched via federation)
// If no, skip this room. If yes, continue. var discoveredRooms []gomatrixserverlib.MSC2946Room
if !w.roomExists(roomID) || !w.authorised(roomID) { var discoveredEvents []gomatrixserverlib.MSC2946StrippedEvent
// attempt to query this room over federation, as either we've never heard of it before
// or we've left it and hence are not authorised (but info may be exposed regardless) // If we know about this room and the caller is authorised (joined/world_readable) then pull
fedRes, err := w.federatedRoomInfo(roomID) // events locally
if w.roomExists(roomID) && w.authorised(roomID) {
// Get all `m.space.child` and `m.space.parent` state events for the room. *In addition*, get
// all `m.space.child` and `m.space.parent` state events which *point to* (via `state_key` or `content.room_id`)
// this room. This requires servers to store reverse lookups.
events, err := w.references(roomID)
if err != nil { if err != nil {
util.GetLogger(w.ctx).WithError(err).WithField("room_id", roomID).Errorf("failed to query federated spaces") util.GetLogger(w.ctx).WithError(err).WithField("room_id", roomID).Error("failed to extract references for room")
continue continue
} }
if fedRes != nil { discoveredEvents = events
res = combineResponses(res, *fedRes)
}
continue
}
// Get all `m.space.child` and `m.space.parent` state events for the room. *In addition*, get
// all `m.space.child` and `m.space.parent` state events which *point to* (via `state_key` or `content.room_id`)
// this room. This requires servers to store reverse lookups.
refs, err := w.references(roomID)
if err != nil {
util.GetLogger(w.ctx).WithError(err).WithField("room_id", roomID).Error("failed to extract references for room")
continue
}
// If this room has not ever been in `rooms` (across multiple requests), extract the
// `PublicRoomsChunk` for this room.
if !w.alreadySent(roomID) && !w.roomIsExcluded(roomID) {
pubRoom := w.publicRoomsChunk(roomID) pubRoom := w.publicRoomsChunk(roomID)
roomType := "" roomType := ""
create := w.stateEvent(roomID, gomatrixserverlib.MRoomCreate, "") create := w.stateEvent(roomID, gomatrixserverlib.MRoomCreate, "")
@ -275,12 +265,31 @@ func (w *walker) walk() *gomatrixserverlib.MSC2946SpacesResponse {
} }
// Add the total number of events to `PublicRoomsChunk` under `num_refs`. Add `PublicRoomsChunk` to `rooms`. // Add the total number of events to `PublicRoomsChunk` under `num_refs`. Add `PublicRoomsChunk` to `rooms`.
res.Rooms = append(res.Rooms, gomatrixserverlib.MSC2946Room{ discoveredRooms = append(discoveredRooms, gomatrixserverlib.MSC2946Room{
PublicRoom: *pubRoom, PublicRoom: *pubRoom,
NumRefs: refs.len(), NumRefs: len(discoveredEvents),
RoomType: roomType, RoomType: roomType,
}) })
w.markSent(roomID) } else {
// attempt to query this room over federation, as either we've never heard of it before
// or we've left it and hence are not authorised (but info may be exposed regardless)
fedRes, err := w.federatedRoomInfo(roomID)
if err != nil {
util.GetLogger(w.ctx).WithError(err).WithField("room_id", roomID).Errorf("failed to query federated spaces")
continue
}
if fedRes != nil {
discoveredRooms = fedRes.Rooms
discoveredEvents = fedRes.Events
}
}
// If this room has not ever been in `rooms` (across multiple requests), send it now
for _, room := range discoveredRooms {
if !w.alreadySent(room.RoomID) && !w.roomIsExcluded(room.RoomID) {
res.Rooms = append(res.Rooms, room)
w.markSent(room.RoomID)
}
} }
uniqueRooms := make(set) uniqueRooms := make(set)
@ -288,45 +297,37 @@ func (w *walker) walk() *gomatrixserverlib.MSC2946SpacesResponse {
// If this is the root room from the original request, insert all these events into `events` if // If this is the root room from the original request, insert all these events into `events` if
// they haven't been added before (across multiple requests). // they haven't been added before (across multiple requests).
if w.rootRoomID == roomID { if w.rootRoomID == roomID {
for _, ev := range refs.events() { for _, ev := range discoveredEvents {
if !w.alreadySent(ev.EventID()) { if !w.alreadySent(eventKey(&ev)) {
strip := stripped(ev.Event) res.Events = append(res.Events, ev)
if strip == nil { uniqueRooms[ev.RoomID] = true
continue uniqueRooms[spaceTargetStripped(&ev)] = true
} w.markSent(eventKey(&ev))
res.Events = append(res.Events, *strip)
uniqueRooms[ev.RoomID()] = true
uniqueRooms[SpaceTarget(ev)] = true
w.markSent(ev.EventID())
} }
} }
} else { } else {
// Else add them to `events` honouring the `limit` and `max_rooms_per_space` values. If either // Else add them to `events` honouring the `limit` and `max_rooms_per_space` values. If either
// are exceeded, stop adding events. If the event has already been added, do not add it again. // are exceeded, stop adding events. If the event has already been added, do not add it again.
numAdded := 0 numAdded := 0
for _, ev := range refs.events() { for _, ev := range discoveredEvents {
if w.req.Limit > 0 && len(res.Events) >= w.req.Limit { if w.req.Limit > 0 && len(res.Events) >= w.req.Limit {
break break
} }
if w.req.MaxRoomsPerSpace > 0 && numAdded >= w.req.MaxRoomsPerSpace { if w.req.MaxRoomsPerSpace > 0 && numAdded >= w.req.MaxRoomsPerSpace {
break break
} }
if w.alreadySent(ev.EventID()) { if w.alreadySent(eventKey(&ev)) {
continue continue
} }
// Skip the room if it's part of exclude_rooms but ONLY IF the source matches, as we still // Skip the room if it's part of exclude_rooms but ONLY IF the source matches, as we still
// want to catch arrows which point to excluded rooms. // want to catch arrows which point to excluded rooms.
if w.roomIsExcluded(ev.RoomID()) { if w.roomIsExcluded(ev.RoomID) {
continue continue
} }
strip := stripped(ev.Event) res.Events = append(res.Events, ev)
if strip == nil { uniqueRooms[ev.RoomID] = true
continue uniqueRooms[spaceTargetStripped(&ev)] = true
} w.markSent(eventKey(&ev))
res.Events = append(res.Events, *strip)
uniqueRooms[ev.RoomID()] = true
uniqueRooms[SpaceTarget(ev)] = true
w.markSent(ev.EventID())
// we don't distinguish between child state events and parent state events for the purposes of // we don't distinguish between child state events and parent state events for the purposes of
// max_rooms_per_space, maybe we should? // max_rooms_per_space, maybe we should?
numAdded++ numAdded++
@ -521,51 +522,27 @@ func (w *walker) authorisedUser(roomID string) bool {
} }
// references returns all references pointing to or from this room. // references returns all references pointing to or from this room.
func (w *walker) references(roomID string) (eventLookup, error) { func (w *walker) references(roomID string) ([]gomatrixserverlib.MSC2946StrippedEvent, error) {
events, err := w.db.References(w.ctx, roomID) events, err := w.db.References(w.ctx, roomID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
el := make(eventLookup) el := make([]gomatrixserverlib.MSC2946StrippedEvent, 0, len(events))
for _, ev := range events { for _, ev := range events {
// only return events that have a `via` key as per MSC1772 // only return events that have a `via` key as per MSC1772
// else we'll incorrectly walk redacted events (as the link // else we'll incorrectly walk redacted events (as the link
// is in the state_key) // is in the state_key)
if gjson.GetBytes(ev.Content(), "via").Exists() { if gjson.GetBytes(ev.Content(), "via").Exists() {
el.set(ev) strip := stripped(ev.Event)
if strip == nil {
continue
}
el = append(el, *strip)
} }
} }
return el, nil return el, nil
} }
// state event lookup across multiple rooms keyed on event type
// NOT THREAD SAFE
type eventLookup map[string][]*gomatrixserverlib.HeaderedEvent
func (el eventLookup) set(ev *gomatrixserverlib.HeaderedEvent) {
evs := el[ev.Type()]
if evs == nil {
evs = make([]*gomatrixserverlib.HeaderedEvent, 0)
}
evs = append(evs, ev)
el[ev.Type()] = evs
}
func (el eventLookup) len() int {
sum := 0
for _, evs := range el {
sum += len(evs)
}
return sum
}
func (el eventLookup) events() (events []*gomatrixserverlib.HeaderedEvent) {
for _, evs := range el {
events = append(events, evs...)
}
return
}
type set map[string]bool type set map[string]bool
func stripped(ev *gomatrixserverlib.Event) *gomatrixserverlib.MSC2946StrippedEvent { func stripped(ev *gomatrixserverlib.Event) *gomatrixserverlib.MSC2946StrippedEvent {
@ -581,27 +558,19 @@ func stripped(ev *gomatrixserverlib.Event) *gomatrixserverlib.MSC2946StrippedEve
} }
} }
func combineResponses(local, remote gomatrixserverlib.MSC2946SpacesResponse) gomatrixserverlib.MSC2946SpacesResponse { func eventKey(event *gomatrixserverlib.MSC2946StrippedEvent) string {
knownRooms := make(set) return event.RoomID + "|" + event.Type + "|" + event.StateKey
for _, room := range local.Rooms { }
knownRooms[room.RoomID] = true
} func spaceTargetStripped(event *gomatrixserverlib.MSC2946StrippedEvent) string {
knownEvents := make(set) if event.StateKey == "" {
for _, event := range local.Events { return "" // no-op
knownEvents[event.RoomID+event.Type+event.StateKey] = true }
} switch event.Type {
// mux in remote entries if and only if they aren't present already case ConstSpaceParentEventType:
for _, room := range remote.Rooms { return event.StateKey
if knownRooms[room.RoomID] { case ConstSpaceChildEventType:
continue return event.StateKey
} }
local.Rooms = append(local.Rooms, room) return ""
}
for _, event := range remote.Events {
if knownEvents[event.RoomID+event.Type+event.StateKey] {
continue
}
local.Events = append(local.Events, event)
}
return local
} }

View file

@ -0,0 +1,111 @@
// Copyright 2021 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package postgres
import (
"context"
"database/sql"
"fmt"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
)
// The memberships table is designed to track the last time that
// the user was a given state. This allows us to find out the
// most recent time that a user was invited to, joined or left
// a room, either by choice or otherwise. This is important for
// building history visibility.
const membershipsSchema = `
CREATE TABLE IF NOT EXISTS syncapi_memberships (
-- The 'room_id' key for the state event.
room_id TEXT NOT NULL,
-- The state event ID
user_id TEXT NOT NULL,
-- The status of the membership
membership TEXT NOT NULL,
-- The event ID that last changed the membership
event_id TEXT NOT NULL,
-- The stream position of the change
stream_pos BIGINT NOT NULL,
-- The topological position of the change in the room
topological_pos BIGINT NOT NULL,
-- Unique index
CONSTRAINT syncapi_memberships_unique UNIQUE (room_id, user_id, membership)
);
`
const upsertMembershipSQL = "" +
"INSERT INTO syncapi_memberships (room_id, user_id, membership, event_id, stream_pos, topological_pos)" +
" VALUES ($1, $2, $3, $4, $5, $6)" +
" ON CONFLICT ON CONSTRAINT syncapi_memberships_unique" +
" DO UPDATE SET event_id = $4, stream_pos = $5, topological_pos = $6"
const selectMembershipSQL = "" +
"SELECT event_id, stream_pos, topological_pos FROM syncapi_memberships" +
" WHERE room_id = $1 AND user_id = $2 AND membership = ANY($3)" +
" ORDER BY stream_pos DESC" +
" LIMIT 1"
type membershipsStatements struct {
upsertMembershipStmt *sql.Stmt
selectMembershipStmt *sql.Stmt
}
func NewPostgresMembershipsTable(db *sql.DB) (tables.Memberships, error) {
s := &membershipsStatements{}
_, err := db.Exec(membershipsSchema)
if err != nil {
return nil, err
}
if s.upsertMembershipStmt, err = db.Prepare(upsertMembershipSQL); err != nil {
return nil, err
}
if s.selectMembershipStmt, err = db.Prepare(selectMembershipSQL); err != nil {
return nil, err
}
return s, nil
}
func (s *membershipsStatements) UpsertMembership(
ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent,
streamPos, topologicalPos types.StreamPosition,
) error {
membership, err := event.Membership()
if err != nil {
return fmt.Errorf("event.Membership: %w", err)
}
_, err = sqlutil.TxStmt(txn, s.upsertMembershipStmt).ExecContext(
ctx,
event.RoomID(),
*event.StateKey(),
membership,
event.EventID(),
streamPos,
topologicalPos,
)
return err
}
func (s *membershipsStatements) SelectMembership(
ctx context.Context, txn *sql.Tx, roomID, userID, memberships []string,
) (eventID string, streamPos, topologyPos types.StreamPosition, err error) {
stmt := sqlutil.TxStmt(txn, s.selectMembershipStmt)
err = stmt.QueryRowContext(ctx, roomID, userID, memberships).Scan(&eventID, &streamPos, &topologyPos)
return
}

View file

@ -44,7 +44,8 @@ CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_topological_position_idx ON sync
const insertEventInTopologySQL = "" + const insertEventInTopologySQL = "" +
"INSERT INTO syncapi_output_room_events_topology (event_id, topological_position, room_id, stream_position)" + "INSERT INTO syncapi_output_room_events_topology (event_id, topological_position, room_id, stream_position)" +
" VALUES ($1, $2, $3, $4)" + " VALUES ($1, $2, $3, $4)" +
" ON CONFLICT (topological_position, stream_position, room_id) DO UPDATE SET event_id = $1" " ON CONFLICT (topological_position, stream_position, room_id) DO UPDATE SET event_id = $1" +
" RETURNING topological_position"
const selectEventIDsInRangeASCSQL = "" + const selectEventIDsInRangeASCSQL = "" +
"SELECT event_id FROM syncapi_output_room_events_topology" + "SELECT event_id FROM syncapi_output_room_events_topology" +
@ -115,10 +116,10 @@ func NewPostgresTopologyTable(db *sql.DB) (tables.Topology, error) {
// on the event's depth. // on the event's depth.
func (s *outputRoomEventsTopologyStatements) InsertEventInTopology( func (s *outputRoomEventsTopologyStatements) InsertEventInTopology(
ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition, ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition,
) (err error) { ) (topoPos types.StreamPosition, err error) {
_, err = s.insertEventInTopologyStmt.ExecContext( err = sqlutil.TxStmt(txn, s.insertEventInTopologyStmt).QueryRowContext(
ctx, event.EventID(), event.Depth(), event.RoomID(), pos, ctx, event.EventID(), event.Depth(), event.RoomID(), pos,
) ).Scan(&topoPos)
return return
} }

View file

@ -87,6 +87,10 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e
if err != nil { if err != nil {
return nil, err return nil, err
} }
memberships, err := NewPostgresMembershipsTable(d.db)
if err != nil {
return nil, err
}
m := sqlutil.NewMigrations() m := sqlutil.NewMigrations()
deltas.LoadFixSequences(m) deltas.LoadFixSequences(m)
deltas.LoadRemoveSendToDeviceSentColumn(m) deltas.LoadRemoveSendToDeviceSentColumn(m)
@ -106,6 +110,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e
Filter: filter, Filter: filter,
SendToDevice: sendToDevice, SendToDevice: sendToDevice,
Receipts: receipts, Receipts: receipts,
Memberships: memberships,
} }
return &d, nil return &d, nil
} }

View file

@ -48,6 +48,7 @@ type Database struct {
SendToDevice tables.SendToDevice SendToDevice tables.SendToDevice
Filter tables.Filter Filter tables.Filter
Receipts tables.Receipts Receipts tables.Receipts
Memberships tables.Memberships
} }
func (d *Database) readOnlySnapshot(ctx context.Context) (*sql.Tx, error) { func (d *Database) readOnlySnapshot(ctx context.Context) (*sql.Tx, error) {
@ -383,8 +384,8 @@ func (d *Database) WriteEvent(
return fmt.Errorf("d.OutputEvents.InsertEvent: %w", err) return fmt.Errorf("d.OutputEvents.InsertEvent: %w", err)
} }
pduPosition = pos pduPosition = pos
var topoPosition types.StreamPosition
if err = d.Topology.InsertEventInTopology(ctx, txn, ev, pos); err != nil { if topoPosition, err = d.Topology.InsertEventInTopology(ctx, txn, ev, pos); err != nil {
return fmt.Errorf("d.Topology.InsertEventInTopology: %w", err) return fmt.Errorf("d.Topology.InsertEventInTopology: %w", err)
} }
@ -397,7 +398,7 @@ func (d *Database) WriteEvent(
return nil return nil
} }
return d.updateRoomState(ctx, txn, removeStateEventIDs, addStateEvents, pduPosition) return d.updateRoomState(ctx, txn, removeStateEventIDs, addStateEvents, pduPosition, topoPosition)
}) })
return pduPosition, returnErr return pduPosition, returnErr
@ -409,6 +410,7 @@ func (d *Database) updateRoomState(
removedEventIDs []string, removedEventIDs []string,
addedEvents []*gomatrixserverlib.HeaderedEvent, addedEvents []*gomatrixserverlib.HeaderedEvent,
pduPosition types.StreamPosition, pduPosition types.StreamPosition,
topoPosition types.StreamPosition,
) error { ) error {
// remove first, then add, as we do not ever delete state, but do replace state which is a remove followed by an add. // remove first, then add, as we do not ever delete state, but do replace state which is a remove followed by an add.
for _, eventID := range removedEventIDs { for _, eventID := range removedEventIDs {
@ -429,6 +431,9 @@ func (d *Database) updateRoomState(
return fmt.Errorf("event.Membership: %w", err) return fmt.Errorf("event.Membership: %w", err)
} }
membership = &value membership = &value
if err = d.Memberships.UpsertMembership(ctx, txn, event, pduPosition, topoPosition); err != nil {
return fmt.Errorf("d.Memberships.UpsertMembership: %w", err)
}
} }
if err := d.CurrentRoomState.UpsertRoomState(ctx, txn, event, membership, pduPosition); err != nil { if err := d.CurrentRoomState.UpsertRoomState(ctx, txn, event, membership, pduPosition); err != nil {

View file

@ -0,0 +1,119 @@
// Copyright 2021 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sqlite3
import (
"context"
"database/sql"
"fmt"
"strings"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
)
// The memberships table is designed to track the last time that
// the user was a given state. This allows us to find out the
// most recent time that a user was invited to, joined or left
// a room, either by choice or otherwise. This is important for
// building history visibility.
const membershipsSchema = `
CREATE TABLE IF NOT EXISTS syncapi_memberships (
-- The 'room_id' key for the state event.
room_id TEXT NOT NULL,
-- The state event ID
user_id TEXT NOT NULL,
-- The status of the membership
membership TEXT NOT NULL,
-- The event ID that last changed the membership
event_id TEXT NOT NULL,
-- The stream position of the change
stream_pos BIGINT NOT NULL,
-- The topological position of the change in the room
topological_pos BIGINT NOT NULL,
-- Unique index
UNIQUE (room_id, user_id, membership)
);
`
const upsertMembershipSQL = "" +
"INSERT INTO syncapi_memberships (room_id, user_id, membership, event_id, stream_pos, topological_pos)" +
" VALUES ($1, $2, $3, $4, $5, $6)" +
" ON CONFLICT (room_id, user_id, membership)" +
" DO UPDATE SET event_id = $4, stream_pos = $5, topological_pos = $6"
const selectMembershipSQL = "" +
"SELECT event_id, stream_pos, topological_pos FROM syncapi_memberships" +
" WHERE room_id = $1 AND user_id = $2 AND membership IN ($3)" +
" ORDER BY stream_pos DESC" +
" LIMIT 1"
type membershipsStatements struct {
db *sql.DB
upsertMembershipStmt *sql.Stmt
}
func NewSqliteMembershipsTable(db *sql.DB) (tables.Memberships, error) {
s := &membershipsStatements{
db: db,
}
_, err := db.Exec(membershipsSchema)
if err != nil {
return nil, err
}
if s.upsertMembershipStmt, err = db.Prepare(upsertMembershipSQL); err != nil {
return nil, err
}
return s, nil
}
func (s *membershipsStatements) UpsertMembership(
ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent,
streamPos, topologicalPos types.StreamPosition,
) error {
membership, err := event.Membership()
if err != nil {
return fmt.Errorf("event.Membership: %w", err)
}
_, err = sqlutil.TxStmt(txn, s.upsertMembershipStmt).ExecContext(
ctx,
event.RoomID(),
*event.StateKey(),
membership,
event.EventID(),
streamPos,
topologicalPos,
)
return err
}
func (s *membershipsStatements) SelectMembership(
ctx context.Context, txn *sql.Tx, roomID, userID, memberships []string,
) (eventID string, streamPos, topologyPos types.StreamPosition, err error) {
params := []interface{}{roomID, userID}
for _, membership := range memberships {
params = append(params, membership)
}
orig := strings.Replace(selectMembershipSQL, "($3)", sqlutil.QueryVariadicOffset(len(memberships), 2), 1)
stmt, err := s.db.Prepare(orig)
if err != nil {
return "", 0, 0, err
}
err = sqlutil.TxStmt(txn, stmt).QueryRowContext(ctx, params...).Scan(&eventID, &streamPos, &topologyPos)
return
}

View file

@ -111,12 +111,11 @@ func NewSqliteTopologyTable(db *sql.DB) (tables.Topology, error) {
// on the event's depth. // on the event's depth.
func (s *outputRoomEventsTopologyStatements) InsertEventInTopology( func (s *outputRoomEventsTopologyStatements) InsertEventInTopology(
ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition, ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition,
) (err error) { ) (types.StreamPosition, error) {
stmt := sqlutil.TxStmt(txn, s.insertEventInTopologyStmt) _, err := sqlutil.TxStmt(txn, s.insertEventInTopologyStmt).ExecContext(
_, err = stmt.ExecContext(
ctx, event.EventID(), event.Depth(), event.RoomID(), pos, ctx, event.EventID(), event.Depth(), event.RoomID(), pos,
) )
return return types.StreamPosition(event.Depth()), err
} }
func (s *outputRoomEventsTopologyStatements) SelectEventIDsInRange( func (s *outputRoomEventsTopologyStatements) SelectEventIDsInRange(

View file

@ -100,6 +100,10 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er
if err != nil { if err != nil {
return err return err
} }
memberships, err := NewSqliteMembershipsTable(d.db)
if err != nil {
return err
}
m := sqlutil.NewMigrations() m := sqlutil.NewMigrations()
deltas.LoadFixSequences(m) deltas.LoadFixSequences(m)
deltas.LoadRemoveSendToDeviceSentColumn(m) deltas.LoadRemoveSendToDeviceSentColumn(m)
@ -119,6 +123,7 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er
Filter: filter, Filter: filter,
SendToDevice: sendToDevice, SendToDevice: sendToDevice,
Receipts: receipts, Receipts: receipts,
Memberships: memberships,
} }
return nil return nil
} }

View file

@ -70,7 +70,7 @@ type Events interface {
type Topology interface { type Topology interface {
// InsertEventInTopology inserts the given event in the room's topology, based on the event's depth. // InsertEventInTopology inserts the given event in the room's topology, based on the event's depth.
// `pos` is the stream position of this event in the events table, and is used to order events which have the same depth. // `pos` is the stream position of this event in the events table, and is used to order events which have the same depth.
InsertEventInTopology(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition) (err error) InsertEventInTopology(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition) (topoPos types.StreamPosition, err error)
// SelectEventIDsInRange selects the IDs of events whose depths are within a given range in a given room's topological order. // SelectEventIDsInRange selects the IDs of events whose depths are within a given range in a given room's topological order.
// Events with `minDepth` are *exclusive*, as is the event which has exactly `minDepth`,`maxStreamPos`. // Events with `minDepth` are *exclusive*, as is the event which has exactly `minDepth`,`maxStreamPos`.
// `maxStreamPos` is only used when events have the same depth as `maxDepth`, which results in events less than `maxStreamPos` being returned. // `maxStreamPos` is only used when events have the same depth as `maxDepth`, which results in events less than `maxStreamPos` being returned.
@ -162,3 +162,8 @@ type Receipts interface {
SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []eduAPI.OutputReceiptEvent, error) SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []eduAPI.OutputReceiptEvent, error)
SelectMaxReceiptID(ctx context.Context, txn *sql.Tx) (id int64, err error) SelectMaxReceiptID(ctx context.Context, txn *sql.Tx) (id int64, err error)
} }
type Memberships interface {
UpsertMembership(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, streamPos, topologicalPos types.StreamPosition) error
SelectMembership(ctx context.Context, txn *sql.Tx, roomID, userID, memberships []string) (eventID string, streamPos, topologyPos types.StreamPosition, err error)
}

View file

@ -508,3 +508,6 @@ State from remote users is included in the state in the initial sync
Changes to state are included in an gapped incremental sync Changes to state are included in an gapped incremental sync
A full_state incremental update returns all state A full_state incremental update returns all state
Can pass a JSON filter as a query parameter Can pass a JSON filter as a query parameter
Local room members can get room messages
Remote room members can get room messages
Guest users can send messages to guest_access rooms if joined