Track most recent membership events for each membership state

This commit is contained in:
Neil Alexander 2021-01-20 17:39:15 +00:00
parent b70238f2d5
commit a65a5ca0fd
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
8 changed files with 220 additions and 12 deletions

View file

@ -0,0 +1,95 @@
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package postgres
import (
"context"
"database/sql"
"fmt"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
)
// The memberships table is designed to track the last time that
// the user was a given state. This allows us to find out the
// most recent time that a user was invited to, joined or left
// a room, either by choice or otherwise. This is important for
// building history visibility.
const membershipsSchema = `
CREATE TABLE IF NOT EXISTS syncapi_memberships (
-- The 'room_id' key for the state event.
room_id TEXT NOT NULL,
-- The state event ID
user_id TEXT NOT NULL,
-- The status of the membership
membership TEXT NOT NULL,
-- The event ID that last changed the membership
event_id TEXT NOT NULL,
-- The stream position of the change
stream_pos BIGINT NOT NULL,
-- The topological position of the change in the room
topological_pos BIGINT NOT NULL
);
CREATE UNIQUE INDEX IF NOT EXISTS syncapi_memberships_unique
ON syncapi_memberships(room_id, user_id, membership);
`
const upsertMembershipSQL = "" +
"INSERT INTO syncapi_memberships (room_id, user_id, membership, event_id, stream_pos, topological_pos)" +
" VALUES ($1, $2, $3, $4, $5, $6)" +
" ON CONFLICT ON CONSTRAINT syncapi_memberships_unique" +
" DO UPDATE SET event_id = $4, stream_pos = $5, topological_pos = $6"
type membershipsStatements struct {
upsertMembershipStmt *sql.Stmt
}
func NewPostgresMembershipsTable(db *sql.DB) (tables.Memberships, error) {
s := &membershipsStatements{}
_, err := db.Exec(membershipsSchema)
if err != nil {
return nil, err
}
if s.upsertMembershipStmt, err = db.Prepare(upsertMembershipSQL); err != nil {
return nil, err
}
return s, nil
}
func (s *membershipsStatements) UpsertMembership(
ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent,
streamPos, topologicalPos types.StreamPosition,
) error {
membership, err := event.Membership()
if err != nil {
return fmt.Errorf("event.Membership: %w", err)
}
_, err = sqlutil.TxStmt(txn, s.upsertMembershipStmt).ExecContext(
ctx,
event.RoomID(),
event.EventID(),
membership,
event.EventID(),
streamPos,
topologicalPos,
)
return err
}

View file

@ -44,7 +44,8 @@ CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_topological_position_idx ON sync
const insertEventInTopologySQL = "" + const insertEventInTopologySQL = "" +
"INSERT INTO syncapi_output_room_events_topology (event_id, topological_position, room_id, stream_position)" + "INSERT INTO syncapi_output_room_events_topology (event_id, topological_position, room_id, stream_position)" +
" VALUES ($1, $2, $3, $4)" + " VALUES ($1, $2, $3, $4)" +
" ON CONFLICT (topological_position, stream_position, room_id) DO UPDATE SET event_id = $1" " ON CONFLICT (topological_position, stream_position, room_id) DO UPDATE SET event_id = $1" +
" RETURNING topological_position"
const selectEventIDsInRangeASCSQL = "" + const selectEventIDsInRangeASCSQL = "" +
"SELECT event_id FROM syncapi_output_room_events_topology" + "SELECT event_id FROM syncapi_output_room_events_topology" +
@ -115,10 +116,10 @@ func NewPostgresTopologyTable(db *sql.DB) (tables.Topology, error) {
// on the event's depth. // on the event's depth.
func (s *outputRoomEventsTopologyStatements) InsertEventInTopology( func (s *outputRoomEventsTopologyStatements) InsertEventInTopology(
ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition, ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition,
) (err error) { ) (topoPos types.StreamPosition, err error) {
_, err = s.insertEventInTopologyStmt.ExecContext( err = sqlutil.TxStmt(txn, s.insertEventInTopologyStmt).QueryRowContext(
ctx, event.EventID(), event.Depth(), event.RoomID(), pos, ctx, event.EventID(), event.Depth(), event.RoomID(), pos,
) ).Scan(&topoPos)
return return
} }

View file

@ -87,6 +87,10 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e
if err != nil { if err != nil {
return nil, err return nil, err
} }
memberships, err := NewPostgresMembershipsTable(d.db)
if err != nil {
return nil, err
}
m := sqlutil.NewMigrations() m := sqlutil.NewMigrations()
deltas.LoadFixSequences(m) deltas.LoadFixSequences(m)
deltas.LoadRemoveSendToDeviceSentColumn(m) deltas.LoadRemoveSendToDeviceSentColumn(m)
@ -106,6 +110,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e
Filter: filter, Filter: filter,
SendToDevice: sendToDevice, SendToDevice: sendToDevice,
Receipts: receipts, Receipts: receipts,
Memberships: memberships,
} }
return &d, nil return &d, nil
} }

View file

@ -48,6 +48,7 @@ type Database struct {
SendToDevice tables.SendToDevice SendToDevice tables.SendToDevice
Filter tables.Filter Filter tables.Filter
Receipts tables.Receipts Receipts tables.Receipts
Memberships tables.Memberships
} }
func (d *Database) readOnlySnapshot(ctx context.Context) (*sql.Tx, error) { func (d *Database) readOnlySnapshot(ctx context.Context) (*sql.Tx, error) {
@ -383,8 +384,8 @@ func (d *Database) WriteEvent(
return fmt.Errorf("d.OutputEvents.InsertEvent: %w", err) return fmt.Errorf("d.OutputEvents.InsertEvent: %w", err)
} }
pduPosition = pos pduPosition = pos
var topoPosition types.StreamPosition
if err = d.Topology.InsertEventInTopology(ctx, txn, ev, pos); err != nil { if topoPosition, err = d.Topology.InsertEventInTopology(ctx, txn, ev, pos); err != nil {
return fmt.Errorf("d.Topology.InsertEventInTopology: %w", err) return fmt.Errorf("d.Topology.InsertEventInTopology: %w", err)
} }
@ -397,7 +398,7 @@ func (d *Database) WriteEvent(
return nil return nil
} }
return d.updateRoomState(ctx, txn, removeStateEventIDs, addStateEvents, pduPosition) return d.updateRoomState(ctx, txn, removeStateEventIDs, addStateEvents, pduPosition, topoPosition)
}) })
return pduPosition, returnErr return pduPosition, returnErr
@ -409,6 +410,7 @@ func (d *Database) updateRoomState(
removedEventIDs []string, removedEventIDs []string,
addedEvents []*gomatrixserverlib.HeaderedEvent, addedEvents []*gomatrixserverlib.HeaderedEvent,
pduPosition types.StreamPosition, pduPosition types.StreamPosition,
topoPosition types.StreamPosition,
) error { ) error {
// remove first, then add, as we do not ever delete state, but do replace state which is a remove followed by an add. // remove first, then add, as we do not ever delete state, but do replace state which is a remove followed by an add.
for _, eventID := range removedEventIDs { for _, eventID := range removedEventIDs {
@ -429,6 +431,9 @@ func (d *Database) updateRoomState(
return fmt.Errorf("event.Membership: %w", err) return fmt.Errorf("event.Membership: %w", err)
} }
membership = &value membership = &value
if err = d.Memberships.UpsertMembership(ctx, txn, event, pduPosition, topoPosition); err != nil {
return fmt.Errorf("d.Memberships.UpsertMembership: %w", err)
}
} }
if err := d.CurrentRoomState.UpsertRoomState(ctx, txn, event, membership, pduPosition); err != nil { if err := d.CurrentRoomState.UpsertRoomState(ctx, txn, event, membership, pduPosition); err != nil {

View file

@ -0,0 +1,94 @@
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sqlite3
import (
"context"
"database/sql"
"fmt"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
)
// The memberships table is designed to track the last time that
// the user was a given state. This allows us to find out the
// most recent time that a user was invited to, joined or left
// a room, either by choice or otherwise. This is important for
// building history visibility.
const membershipsSchema = `
CREATE TABLE IF NOT EXISTS syncapi_memberships (
-- The 'room_id' key for the state event.
room_id TEXT NOT NULL,
-- The state event ID
user_id TEXT NOT NULL,
-- The status of the membership
membership TEXT NOT NULL,
-- The event ID that last changed the membership
event_id TEXT NOT NULL,
-- The stream position of the change
stream_pos BIGINT NOT NULL,
-- The topological position of the change in the room
topological_pos BIGINT NOT NULL,
-- Unique index
UNIQUE (room_id, user_id, membership)
);
`
const upsertMembershipSQL = "" +
"INSERT INTO syncapi_memberships (room_id, user_id, membership, event_id, stream_pos, topological_pos)" +
" VALUES ($1, $2, $3, $4, $5, $6)" +
" ON CONFLICT (room_id, user_id, membership)" +
" DO UPDATE SET event_id = $4, stream_pos = $5, topological_pos = $6"
type membershipsStatements struct {
upsertMembershipStmt *sql.Stmt
}
func NewSqliteMembershipsTable(db *sql.DB) (tables.Memberships, error) {
s := &membershipsStatements{}
_, err := db.Exec(membershipsSchema)
if err != nil {
return nil, err
}
if s.upsertMembershipStmt, err = db.Prepare(upsertMembershipSQL); err != nil {
return nil, err
}
return s, nil
}
func (s *membershipsStatements) UpsertMembership(
ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent,
streamPos, topologicalPos types.StreamPosition,
) error {
membership, err := event.Membership()
if err != nil {
return fmt.Errorf("event.Membership: %w", err)
}
_, err = sqlutil.TxStmt(txn, s.upsertMembershipStmt).ExecContext(
ctx,
event.RoomID(),
event.EventID(),
membership,
event.EventID(),
streamPos,
topologicalPos,
)
return err
}

View file

@ -111,12 +111,11 @@ func NewSqliteTopologyTable(db *sql.DB) (tables.Topology, error) {
// on the event's depth. // on the event's depth.
func (s *outputRoomEventsTopologyStatements) InsertEventInTopology( func (s *outputRoomEventsTopologyStatements) InsertEventInTopology(
ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition, ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition,
) (err error) { ) (types.StreamPosition, error) {
stmt := sqlutil.TxStmt(txn, s.insertEventInTopologyStmt) _, err := sqlutil.TxStmt(txn, s.insertEventInTopologyStmt).ExecContext(
_, err = stmt.ExecContext(
ctx, event.EventID(), event.Depth(), event.RoomID(), pos, ctx, event.EventID(), event.Depth(), event.RoomID(), pos,
) )
return return types.StreamPosition(event.Depth()), err
} }
func (s *outputRoomEventsTopologyStatements) SelectEventIDsInRange( func (s *outputRoomEventsTopologyStatements) SelectEventIDsInRange(

View file

@ -100,6 +100,10 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er
if err != nil { if err != nil {
return err return err
} }
memberships, err := NewSqliteMembershipsTable(d.db)
if err != nil {
return err
}
m := sqlutil.NewMigrations() m := sqlutil.NewMigrations()
deltas.LoadFixSequences(m) deltas.LoadFixSequences(m)
deltas.LoadRemoveSendToDeviceSentColumn(m) deltas.LoadRemoveSendToDeviceSentColumn(m)
@ -119,6 +123,7 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er
Filter: filter, Filter: filter,
SendToDevice: sendToDevice, SendToDevice: sendToDevice,
Receipts: receipts, Receipts: receipts,
Memberships: memberships,
} }
return nil return nil
} }

View file

@ -70,7 +70,7 @@ type Events interface {
type Topology interface { type Topology interface {
// InsertEventInTopology inserts the given event in the room's topology, based on the event's depth. // InsertEventInTopology inserts the given event in the room's topology, based on the event's depth.
// `pos` is the stream position of this event in the events table, and is used to order events which have the same depth. // `pos` is the stream position of this event in the events table, and is used to order events which have the same depth.
InsertEventInTopology(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition) (err error) InsertEventInTopology(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition) (topoPos types.StreamPosition, err error)
// SelectEventIDsInRange selects the IDs of events whose depths are within a given range in a given room's topological order. // SelectEventIDsInRange selects the IDs of events whose depths are within a given range in a given room's topological order.
// Events with `minDepth` are *exclusive*, as is the event which has exactly `minDepth`,`maxStreamPos`. // Events with `minDepth` are *exclusive*, as is the event which has exactly `minDepth`,`maxStreamPos`.
// `maxStreamPos` is only used when events have the same depth as `maxDepth`, which results in events less than `maxStreamPos` being returned. // `maxStreamPos` is only used when events have the same depth as `maxDepth`, which results in events less than `maxStreamPos` being returned.
@ -162,3 +162,7 @@ type Receipts interface {
SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []eduAPI.OutputReceiptEvent, error) SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []eduAPI.OutputReceiptEvent, error)
SelectMaxReceiptID(ctx context.Context, txn *sql.Tx) (id int64, err error) SelectMaxReceiptID(ctx context.Context, txn *sql.Tx) (id int64, err error)
} }
type Memberships interface {
UpsertMembership(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, streamPos, topologicalPos types.StreamPosition) error
}