mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-17 11:53:09 -06:00
Remove remaining code
This commit is contained in:
parent
f5909f6f96
commit
a1056c156d
|
|
@ -711,18 +711,20 @@ func (d *Database) AddInvitesToResponse(
|
||||||
// Retrieve the backward topology position, i.e. the position of the
|
// Retrieve the backward topology position, i.e. the position of the
|
||||||
// oldest event in the room's topology.
|
// oldest event in the room's topology.
|
||||||
func (d *Database) getBackwardTopologyPos(
|
func (d *Database) getBackwardTopologyPos(
|
||||||
ctx context.Context,
|
ctx context.Context, txn *sql.Tx,
|
||||||
events []types.StreamEvent,
|
events []types.StreamEvent,
|
||||||
) (pos, spos types.StreamPosition) {
|
) (types.TopologyToken, error) {
|
||||||
if len(events) > 0 {
|
zeroToken := types.NewTopologyToken(0, 0)
|
||||||
pos, spos, _ = d.Topology.SelectPositionInTopology(ctx, nil, events[0].EventID())
|
if len(events) == 0 {
|
||||||
|
return zeroToken, nil
|
||||||
}
|
}
|
||||||
if pos-1 <= 0 {
|
pos, spos, err := d.Topology.SelectPositionInTopology(ctx, txn, events[0].EventID())
|
||||||
pos = types.StreamPosition(1)
|
if err != nil {
|
||||||
} else {
|
return zeroToken, err
|
||||||
pos = pos - 1
|
|
||||||
}
|
}
|
||||||
return
|
tok := types.NewTopologyToken(pos, spos)
|
||||||
|
tok.Decrement()
|
||||||
|
return tok, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// addRoomDeltaToResponse adds a room state delta to a sync response
|
// addRoomDeltaToResponse adds a room state delta to a sync response
|
||||||
|
|
@ -754,10 +756,10 @@ func (d *Database) addRoomDeltaToResponse(
|
||||||
}
|
}
|
||||||
recentEvents := d.StreamEventsToEvents(device, recentStreamEvents)
|
recentEvents := d.StreamEventsToEvents(device, recentStreamEvents)
|
||||||
delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back
|
delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back
|
||||||
backwardTopologyPos, backwardStreamPos := d.getBackwardTopologyPos(ctx, recentStreamEvents)
|
prevBatch, err := d.getBackwardTopologyPos(ctx, txn, recentStreamEvents)
|
||||||
prevBatch := types.NewTopologyToken(
|
if err != nil {
|
||||||
backwardTopologyPos, backwardStreamPos,
|
return err
|
||||||
)
|
}
|
||||||
|
|
||||||
switch delta.membership {
|
switch delta.membership {
|
||||||
case gomatrixserverlib.Join:
|
case gomatrixserverlib.Join:
|
||||||
|
|
|
||||||
|
|
@ -16,13 +16,10 @@
|
||||||
package sqlite3
|
package sqlite3
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
|
||||||
"net/url"
|
"net/url"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
|
|
||||||
// Import the sqlite3 package
|
// Import the sqlite3 package
|
||||||
|
|
@ -31,19 +28,8 @@ import (
|
||||||
"github.com/matrix-org/dendrite/common"
|
"github.com/matrix-org/dendrite/common"
|
||||||
"github.com/matrix-org/dendrite/eduserver/cache"
|
"github.com/matrix-org/dendrite/eduserver/cache"
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage/shared"
|
"github.com/matrix-org/dendrite/syncapi/storage/shared"
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type stateDelta struct {
|
|
||||||
roomID string
|
|
||||||
stateEvents []gomatrixserverlib.HeaderedEvent
|
|
||||||
membership string
|
|
||||||
// The PDU stream position of the latest membership event for this user, if applicable.
|
|
||||||
// Can be 0 if there is no membership event in this delta.
|
|
||||||
membershipPos types.StreamPosition
|
|
||||||
}
|
|
||||||
|
|
||||||
// SyncServerDatasource represents a sync server datasource which manages
|
// SyncServerDatasource represents a sync server datasource which manages
|
||||||
// both the database for PDUs and caches for EDUs.
|
// both the database for PDUs and caches for EDUs.
|
||||||
type SyncServerDatasource struct {
|
type SyncServerDatasource struct {
|
||||||
|
|
@ -53,9 +39,9 @@ type SyncServerDatasource struct {
|
||||||
streamID streamIDStatements
|
streamID streamIDStatements
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewSyncServerDatasource creates a new sync server database
|
// NewDatabase creates a new sync server database
|
||||||
// nolint: gocyclo
|
// nolint: gocyclo
|
||||||
func NewSyncServerDatasource(dataSourceName string) (*SyncServerDatasource, error) {
|
func NewDatabase(dataSourceName string) (*SyncServerDatasource, error) {
|
||||||
var d SyncServerDatasource
|
var d SyncServerDatasource
|
||||||
uri, err := url.Parse(dataSourceName)
|
uri, err := url.Parse(dataSourceName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -121,464 +107,3 @@ func (d *SyncServerDatasource) prepare() (err error) {
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// addPDUDeltaToResponse adds all PDU deltas to a sync response.
|
|
||||||
// IDs of all rooms the user joined are returned so EDU deltas can be added for them.
|
|
||||||
func (d *SyncServerDatasource) addPDUDeltaToResponse(
|
|
||||||
ctx context.Context,
|
|
||||||
device authtypes.Device,
|
|
||||||
fromPos, toPos types.StreamPosition,
|
|
||||||
numRecentEventsPerRoom int,
|
|
||||||
wantFullState bool,
|
|
||||||
res *types.Response,
|
|
||||||
) (joinedRoomIDs []string, err error) {
|
|
||||||
txn, err := d.db.BeginTx(ctx, &txReadOnlySnapshot)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
var succeeded bool
|
|
||||||
defer func() {
|
|
||||||
txerr := common.EndTransaction(txn, &succeeded)
|
|
||||||
if err == nil && txerr != nil {
|
|
||||||
err = txerr
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
stateFilterPart := gomatrixserverlib.DefaultStateFilter() // TODO: use filter provided in request
|
|
||||||
|
|
||||||
// Work out which rooms to return in the response. This is done by getting not only the currently
|
|
||||||
// joined rooms, but also which rooms have membership transitions for this user between the 2 PDU stream positions.
|
|
||||||
// This works out what the 'state' key should be for each room as well as which membership block
|
|
||||||
// to put the room into.
|
|
||||||
var deltas []stateDelta
|
|
||||||
if !wantFullState {
|
|
||||||
deltas, joinedRoomIDs, err = d.getStateDeltas(
|
|
||||||
ctx, &device, txn, fromPos, toPos, device.UserID, &stateFilterPart,
|
|
||||||
)
|
|
||||||
} else {
|
|
||||||
deltas, joinedRoomIDs, err = d.getStateDeltasForFullStateSync(
|
|
||||||
ctx, &device, txn, fromPos, toPos, device.UserID, &stateFilterPart,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, delta := range deltas {
|
|
||||||
err = d.addRoomDeltaToResponse(ctx, &device, txn, fromPos, toPos, delta, numRecentEventsPerRoom, res)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: This should be done in getStateDeltas
|
|
||||||
if err = d.Database.AddInvitesToResponse(ctx, txn, device.UserID, fromPos, toPos, res); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
succeeded = true
|
|
||||||
return joinedRoomIDs, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// IncrementalSync returns all the data needed in order to create an incremental
|
|
||||||
// sync response for the given user. Events returned will include any client
|
|
||||||
// transaction IDs associated with the given device. These transaction IDs come
|
|
||||||
// from when the device sent the event via an API that included a transaction
|
|
||||||
// ID.
|
|
||||||
func (d *SyncServerDatasource) IncrementalSync(
|
|
||||||
ctx context.Context,
|
|
||||||
device authtypes.Device,
|
|
||||||
fromPos, toPos types.StreamingToken,
|
|
||||||
numRecentEventsPerRoom int,
|
|
||||||
wantFullState bool,
|
|
||||||
) (*types.Response, error) {
|
|
||||||
nextBatchPos := fromPos.WithUpdates(toPos)
|
|
||||||
res := types.NewResponse(nextBatchPos)
|
|
||||||
|
|
||||||
var joinedRoomIDs []string
|
|
||||||
var err error
|
|
||||||
fmt.Println("from", fromPos.PDUPosition(), "to", toPos.PDUPosition())
|
|
||||||
if fromPos.PDUPosition() != toPos.PDUPosition() || wantFullState {
|
|
||||||
joinedRoomIDs, err = d.addPDUDeltaToResponse(
|
|
||||||
ctx, device, fromPos.PDUPosition(), toPos.PDUPosition(), numRecentEventsPerRoom, wantFullState, res,
|
|
||||||
)
|
|
||||||
} else {
|
|
||||||
joinedRoomIDs, err = d.Database.CurrentRoomState.SelectRoomIDsWithMembership(
|
|
||||||
ctx, nil, device.UserID, gomatrixserverlib.Join,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = d.Database.AddEDUDeltaToResponse(
|
|
||||||
fromPos, toPos, joinedRoomIDs, res,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return res, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
var txReadOnlySnapshot = sql.TxOptions{
|
|
||||||
// Set the isolation level so that we see a snapshot of the database.
|
|
||||||
// In PostgreSQL repeatable read transactions will see a snapshot taken
|
|
||||||
// at the first query, and since the transaction is read-only it can't
|
|
||||||
// run into any serialisation errors.
|
|
||||||
// https://www.postgresql.org/docs/9.5/static/transaction-iso.html#XACT-REPEATABLE-READ
|
|
||||||
Isolation: sql.LevelRepeatableRead,
|
|
||||||
ReadOnly: true,
|
|
||||||
}
|
|
||||||
|
|
||||||
// Retrieve the backward topology position, i.e. the position of the
|
|
||||||
// oldest event in the room's topology.
|
|
||||||
func (d *SyncServerDatasource) getBackwardTopologyPos(
|
|
||||||
ctx context.Context, txn *sql.Tx,
|
|
||||||
events []types.StreamEvent,
|
|
||||||
) (pos, spos types.StreamPosition) {
|
|
||||||
if len(events) > 0 {
|
|
||||||
pos, spos, _ = d.Database.Topology.SelectPositionInTopology(ctx, txn, events[0].EventID())
|
|
||||||
}
|
|
||||||
// go to the previous position so we don't pull out the same event twice
|
|
||||||
// FIXME: This could be done more nicely by being explicit with inclusive/exclusive rules
|
|
||||||
if pos-1 <= 0 {
|
|
||||||
pos = types.StreamPosition(1)
|
|
||||||
} else {
|
|
||||||
pos = pos - 1
|
|
||||||
spos += 1000 // this has to be bigger than the number of events we backfill per request
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// addRoomDeltaToResponse adds a room state delta to a sync response
|
|
||||||
func (d *SyncServerDatasource) addRoomDeltaToResponse(
|
|
||||||
ctx context.Context,
|
|
||||||
device *authtypes.Device,
|
|
||||||
txn *sql.Tx,
|
|
||||||
fromPos, toPos types.StreamPosition,
|
|
||||||
delta stateDelta,
|
|
||||||
numRecentEventsPerRoom int,
|
|
||||||
res *types.Response,
|
|
||||||
) error {
|
|
||||||
endPos := toPos
|
|
||||||
if delta.membershipPos > 0 && delta.membership == gomatrixserverlib.Leave {
|
|
||||||
// make sure we don't leak recent events after the leave event.
|
|
||||||
// TODO: History visibility makes this somewhat complex to handle correctly. For example:
|
|
||||||
// TODO: This doesn't work for join -> leave in a single /sync request (see events prior to join).
|
|
||||||
// TODO: This will fail on join -> leave -> sensitive msg -> join -> leave
|
|
||||||
// in a single /sync request
|
|
||||||
// This is all "okay" assuming history_visibility == "shared" which it is by default.
|
|
||||||
endPos = delta.membershipPos
|
|
||||||
}
|
|
||||||
recentStreamEvents, err := d.Database.OutputEvents.SelectRecentEvents(
|
|
||||||
ctx, txn, delta.roomID, types.StreamPosition(fromPos), types.StreamPosition(endPos),
|
|
||||||
numRecentEventsPerRoom, true, true,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
recentEvents := d.StreamEventsToEvents(device, recentStreamEvents)
|
|
||||||
delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents)
|
|
||||||
backwardTopologyPos, backwardStreamPos := d.getBackwardTopologyPos(ctx, txn, recentStreamEvents)
|
|
||||||
prevBatch := types.NewTopologyToken(
|
|
||||||
backwardTopologyPos, backwardStreamPos,
|
|
||||||
)
|
|
||||||
|
|
||||||
switch delta.membership {
|
|
||||||
case gomatrixserverlib.Join:
|
|
||||||
jr := types.NewJoinResponse()
|
|
||||||
jr.Timeline.PrevBatch = prevBatch.String()
|
|
||||||
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
|
||||||
jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
|
|
||||||
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
|
|
||||||
res.Rooms.Join[delta.roomID] = *jr
|
|
||||||
case gomatrixserverlib.Leave:
|
|
||||||
fallthrough // transitions to leave are the same as ban
|
|
||||||
case gomatrixserverlib.Ban:
|
|
||||||
// TODO: recentEvents may contain events that this user is not allowed to see because they are
|
|
||||||
// no longer in the room.
|
|
||||||
lr := types.NewLeaveResponse()
|
|
||||||
lr.Timeline.PrevBatch = prevBatch.String()
|
|
||||||
lr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
|
||||||
lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
|
|
||||||
lr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
|
|
||||||
res.Rooms.Leave[delta.roomID] = *lr
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// fetchStateEvents converts the set of event IDs into a set of events. It will fetch any which are missing from the database.
|
|
||||||
// Returns a map of room ID to list of events.
|
|
||||||
func (d *SyncServerDatasource) fetchStateEvents(
|
|
||||||
ctx context.Context, txn *sql.Tx,
|
|
||||||
roomIDToEventIDSet map[string]map[string]bool,
|
|
||||||
eventIDToEvent map[string]types.StreamEvent,
|
|
||||||
) (map[string][]types.StreamEvent, error) {
|
|
||||||
stateBetween := make(map[string][]types.StreamEvent)
|
|
||||||
missingEvents := make(map[string][]string)
|
|
||||||
for roomID, ids := range roomIDToEventIDSet {
|
|
||||||
events := stateBetween[roomID]
|
|
||||||
for id, need := range ids {
|
|
||||||
if !need {
|
|
||||||
continue // deleted state
|
|
||||||
}
|
|
||||||
e, ok := eventIDToEvent[id]
|
|
||||||
if ok {
|
|
||||||
events = append(events, e)
|
|
||||||
} else {
|
|
||||||
m := missingEvents[roomID]
|
|
||||||
m = append(m, id)
|
|
||||||
missingEvents[roomID] = m
|
|
||||||
}
|
|
||||||
}
|
|
||||||
stateBetween[roomID] = events
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(missingEvents) > 0 {
|
|
||||||
// This happens when add_state_ids has an event ID which is not in the provided range.
|
|
||||||
// We need to explicitly fetch them.
|
|
||||||
allMissingEventIDs := []string{}
|
|
||||||
for _, missingEvIDs := range missingEvents {
|
|
||||||
allMissingEventIDs = append(allMissingEventIDs, missingEvIDs...)
|
|
||||||
}
|
|
||||||
evs, err := d.fetchMissingStateEvents(ctx, txn, allMissingEventIDs)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
// we know we got them all otherwise an error would've been returned, so just loop the events
|
|
||||||
for _, ev := range evs {
|
|
||||||
roomID := ev.RoomID()
|
|
||||||
stateBetween[roomID] = append(stateBetween[roomID], ev)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return stateBetween, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *SyncServerDatasource) fetchMissingStateEvents(
|
|
||||||
ctx context.Context, txn *sql.Tx, eventIDs []string,
|
|
||||||
) ([]types.StreamEvent, error) {
|
|
||||||
// Fetch from the events table first so we pick up the stream ID for the
|
|
||||||
// event.
|
|
||||||
events, err := d.Database.OutputEvents.SelectEvents(ctx, txn, eventIDs)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
have := map[string]bool{}
|
|
||||||
for _, event := range events {
|
|
||||||
have[event.EventID()] = true
|
|
||||||
}
|
|
||||||
var missing []string
|
|
||||||
for _, eventID := range eventIDs {
|
|
||||||
if !have[eventID] {
|
|
||||||
missing = append(missing, eventID)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if len(missing) == 0 {
|
|
||||||
return events, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// If they are missing from the events table then they should be state
|
|
||||||
// events that we received from outside the main event stream.
|
|
||||||
// These should be in the room state table.
|
|
||||||
stateEvents, err := d.Database.CurrentRoomState.SelectEventsWithEventIDs(ctx, txn, missing)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if len(stateEvents) != len(missing) {
|
|
||||||
return nil, fmt.Errorf("failed to map all event IDs to events: (got %d, wanted %d)", len(stateEvents), len(missing))
|
|
||||||
}
|
|
||||||
events = append(events, stateEvents...)
|
|
||||||
return events, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// getStateDeltas returns the state deltas between fromPos and toPos,
|
|
||||||
// exclusive of oldPos, inclusive of newPos, for the rooms in which
|
|
||||||
// the user has new membership events.
|
|
||||||
// A list of joined room IDs is also returned in case the caller needs it.
|
|
||||||
func (d *SyncServerDatasource) getStateDeltas(
|
|
||||||
ctx context.Context, device *authtypes.Device, txn *sql.Tx,
|
|
||||||
fromPos, toPos types.StreamPosition, userID string,
|
|
||||||
stateFilterPart *gomatrixserverlib.StateFilter,
|
|
||||||
) ([]stateDelta, []string, error) {
|
|
||||||
// Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821
|
|
||||||
// - Get membership list changes for this user in this sync response
|
|
||||||
// - For each room which has membership list changes:
|
|
||||||
// * Check if the room is 'newly joined' (insufficient to just check for a join event because we allow dupe joins TODO).
|
|
||||||
// If it is, then we need to send the full room state down (and 'limited' is always true).
|
|
||||||
// * Check if user is still CURRENTLY invited to the room. If so, add room to 'invited' block.
|
|
||||||
// * Check if the user is CURRENTLY (TODO) left/banned. If so, add room to 'archived' block.
|
|
||||||
// - Get all CURRENTLY joined rooms, and add them to 'joined' block.
|
|
||||||
var deltas []stateDelta
|
|
||||||
|
|
||||||
// get all the state events ever between these two positions
|
|
||||||
stateNeeded, eventMap, err := d.Database.OutputEvents.SelectStateInRange(ctx, txn, fromPos, toPos, stateFilterPart)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
state, err := d.fetchStateEvents(ctx, txn, stateNeeded, eventMap)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
for roomID, stateStreamEvents := range state {
|
|
||||||
for _, ev := range stateStreamEvents {
|
|
||||||
// TODO: Currently this will incorrectly add rooms which were ALREADY joined but they sent another no-op join event.
|
|
||||||
// We should be checking if the user was already joined at fromPos and not proceed if so. As a result of this,
|
|
||||||
// dupe join events will result in the entire room state coming down to the client again. This is added in
|
|
||||||
// the 'state' part of the response though, so is transparent modulo bandwidth concerns as it is not added to
|
|
||||||
// the timeline.
|
|
||||||
if membership := getMembershipFromEvent(&ev.HeaderedEvent, userID); membership != "" {
|
|
||||||
if membership == gomatrixserverlib.Join {
|
|
||||||
// send full room state down instead of a delta
|
|
||||||
var s []types.StreamEvent
|
|
||||||
s, err = d.currentStateStreamEventsForRoom(ctx, txn, roomID, stateFilterPart)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
state[roomID] = s
|
|
||||||
continue // we'll add this room in when we do joined rooms
|
|
||||||
}
|
|
||||||
|
|
||||||
deltas = append(deltas, stateDelta{
|
|
||||||
membership: membership,
|
|
||||||
membershipPos: ev.StreamPosition,
|
|
||||||
stateEvents: d.StreamEventsToEvents(device, stateStreamEvents),
|
|
||||||
roomID: roomID,
|
|
||||||
})
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add in currently joined rooms
|
|
||||||
joinedRoomIDs, err := d.Database.CurrentRoomState.SelectRoomIDsWithMembership(ctx, txn, userID, gomatrixserverlib.Join)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
for _, joinedRoomID := range joinedRoomIDs {
|
|
||||||
deltas = append(deltas, stateDelta{
|
|
||||||
membership: gomatrixserverlib.Join,
|
|
||||||
stateEvents: d.StreamEventsToEvents(device, state[joinedRoomID]),
|
|
||||||
roomID: joinedRoomID,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
return deltas, joinedRoomIDs, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// getStateDeltasForFullStateSync is a variant of getStateDeltas used for /sync
|
|
||||||
// requests with full_state=true.
|
|
||||||
// Fetches full state for all joined rooms and uses selectStateInRange to get
|
|
||||||
// updates for other rooms.
|
|
||||||
func (d *SyncServerDatasource) getStateDeltasForFullStateSync(
|
|
||||||
ctx context.Context, device *authtypes.Device, txn *sql.Tx,
|
|
||||||
fromPos, toPos types.StreamPosition, userID string,
|
|
||||||
stateFilterPart *gomatrixserverlib.StateFilter,
|
|
||||||
) ([]stateDelta, []string, error) {
|
|
||||||
joinedRoomIDs, err := d.Database.CurrentRoomState.SelectRoomIDsWithMembership(ctx, txn, userID, gomatrixserverlib.Join)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Use a reasonable initial capacity
|
|
||||||
deltas := make([]stateDelta, 0, len(joinedRoomIDs))
|
|
||||||
|
|
||||||
// Add full states for all joined rooms
|
|
||||||
for _, joinedRoomID := range joinedRoomIDs {
|
|
||||||
s, stateErr := d.currentStateStreamEventsForRoom(ctx, txn, joinedRoomID, stateFilterPart)
|
|
||||||
if stateErr != nil {
|
|
||||||
return nil, nil, stateErr
|
|
||||||
}
|
|
||||||
deltas = append(deltas, stateDelta{
|
|
||||||
membership: gomatrixserverlib.Join,
|
|
||||||
stateEvents: d.StreamEventsToEvents(device, s),
|
|
||||||
roomID: joinedRoomID,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get all the state events ever between these two positions
|
|
||||||
stateNeeded, eventMap, err := d.Database.OutputEvents.SelectStateInRange(ctx, txn, fromPos, toPos, stateFilterPart)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
state, err := d.fetchStateEvents(ctx, txn, stateNeeded, eventMap)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
for roomID, stateStreamEvents := range state {
|
|
||||||
for _, ev := range stateStreamEvents {
|
|
||||||
if membership := getMembershipFromEvent(&ev.HeaderedEvent, userID); membership != "" {
|
|
||||||
if membership != gomatrixserverlib.Join { // We've already added full state for all joined rooms above.
|
|
||||||
deltas = append(deltas, stateDelta{
|
|
||||||
membership: membership,
|
|
||||||
membershipPos: ev.StreamPosition,
|
|
||||||
stateEvents: d.StreamEventsToEvents(device, stateStreamEvents),
|
|
||||||
roomID: roomID,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return deltas, joinedRoomIDs, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *SyncServerDatasource) currentStateStreamEventsForRoom(
|
|
||||||
ctx context.Context, txn *sql.Tx, roomID string,
|
|
||||||
stateFilterPart *gomatrixserverlib.StateFilter,
|
|
||||||
) ([]types.StreamEvent, error) {
|
|
||||||
allState, err := d.Database.CurrentRoomState.SelectCurrentState(ctx, txn, roomID, stateFilterPart)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
s := make([]types.StreamEvent, len(allState))
|
|
||||||
for i := 0; i < len(s); i++ {
|
|
||||||
s[i] = types.StreamEvent{HeaderedEvent: allState[i], StreamPosition: 0}
|
|
||||||
}
|
|
||||||
return s, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// There may be some overlap where events in stateEvents are already in recentEvents, so filter
|
|
||||||
// them out so we don't include them twice in the /sync response. They should be in recentEvents
|
|
||||||
// only, so clients get to the correct state once they have rolled forward.
|
|
||||||
func removeDuplicates(stateEvents, recentEvents []gomatrixserverlib.HeaderedEvent) []gomatrixserverlib.HeaderedEvent {
|
|
||||||
for _, recentEv := range recentEvents {
|
|
||||||
if recentEv.StateKey() == nil {
|
|
||||||
continue // not a state event
|
|
||||||
}
|
|
||||||
// TODO: This is a linear scan over all the current state events in this room. This will
|
|
||||||
// be slow for big rooms. We should instead sort the state events by event ID (ORDER BY)
|
|
||||||
// then do a binary search to find matching events, similar to what roomserver does.
|
|
||||||
for j := 0; j < len(stateEvents); j++ {
|
|
||||||
if stateEvents[j].EventID() == recentEv.EventID() {
|
|
||||||
// overwrite the element to remove with the last element then pop the last element.
|
|
||||||
// This is orders of magnitude faster than re-slicing, but doesn't preserve ordering
|
|
||||||
// (we don't care about the order of stateEvents)
|
|
||||||
stateEvents[j] = stateEvents[len(stateEvents)-1]
|
|
||||||
stateEvents = stateEvents[:len(stateEvents)-1]
|
|
||||||
break // there shouldn't be multiple events with the same event ID
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return stateEvents
|
|
||||||
}
|
|
||||||
|
|
||||||
// getMembershipFromEvent returns the value of content.membership iff the event is a state event
|
|
||||||
// with type 'm.room.member' and state_key of userID. Otherwise, an empty string is returned.
|
|
||||||
func getMembershipFromEvent(ev *gomatrixserverlib.HeaderedEvent, userID string) string {
|
|
||||||
if ev.Type() == "m.room.member" && ev.StateKeyEquals(userID) {
|
|
||||||
membership, err := ev.Membership()
|
|
||||||
if err != nil {
|
|
||||||
return ""
|
|
||||||
}
|
|
||||||
return membership
|
|
||||||
}
|
|
||||||
return ""
|
|
||||||
}
|
|
||||||
|
|
|
||||||
|
|
@ -34,7 +34,7 @@ func NewSyncServerDatasource(dataSourceName string, dbProperties common.DbProper
|
||||||
case "postgres":
|
case "postgres":
|
||||||
return postgres.NewDatabase(dataSourceName, dbProperties)
|
return postgres.NewDatabase(dataSourceName, dbProperties)
|
||||||
case "file":
|
case "file":
|
||||||
return sqlite3.NewSyncServerDatasource(dataSourceName)
|
return sqlite3.NewDatabase(dataSourceName)
|
||||||
default:
|
default:
|
||||||
return postgres.NewDatabase(dataSourceName, dbProperties)
|
return postgres.NewDatabase(dataSourceName, dbProperties)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -51,7 +51,7 @@ func MustCreateEvent(t *testing.T, roomID string, prevs []gomatrixserverlib.Head
|
||||||
}
|
}
|
||||||
|
|
||||||
func MustCreateDatabase(t *testing.T) storage.Database {
|
func MustCreateDatabase(t *testing.T) storage.Database {
|
||||||
db, err := sqlite3.NewSyncServerDatasource("file::memory:")
|
db, err := sqlite3.NewDatabase("file::memory:")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("NewSyncServerDatasource returned %s", err)
|
t.Fatalf("NewSyncServerDatasource returned %s", err)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -35,7 +35,7 @@ func NewSyncServerDatasource(
|
||||||
case "postgres":
|
case "postgres":
|
||||||
return nil, fmt.Errorf("Cannot use postgres implementation")
|
return nil, fmt.Errorf("Cannot use postgres implementation")
|
||||||
case "file":
|
case "file":
|
||||||
return sqlite3.NewSyncServerDatasource(dataSourceName)
|
return sqlite3.NewDatabase(dataSourceName)
|
||||||
default:
|
default:
|
||||||
return nil, fmt.Errorf("Cannot use postgres implementation")
|
return nil, fmt.Errorf("Cannot use postgres implementation")
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue