From a1056c156d13c03472571215b89721eb9831e174 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Thu, 14 May 2020 15:49:02 +0100 Subject: [PATCH] Remove remaining code --- syncapi/storage/shared/syncserver.go | 28 +- syncapi/storage/sqlite3/syncserver.go | 479 +------------------------- syncapi/storage/storage.go | 2 +- syncapi/storage/storage_test.go | 2 +- syncapi/storage/storage_wasm.go | 2 +- 5 files changed, 20 insertions(+), 493 deletions(-) diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index 8a65be970..96e9ff619 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -711,18 +711,20 @@ func (d *Database) AddInvitesToResponse( // Retrieve the backward topology position, i.e. the position of the // oldest event in the room's topology. func (d *Database) getBackwardTopologyPos( - ctx context.Context, + ctx context.Context, txn *sql.Tx, events []types.StreamEvent, -) (pos, spos types.StreamPosition) { - if len(events) > 0 { - pos, spos, _ = d.Topology.SelectPositionInTopology(ctx, nil, events[0].EventID()) +) (types.TopologyToken, error) { + zeroToken := types.NewTopologyToken(0, 0) + if len(events) == 0 { + return zeroToken, nil } - if pos-1 <= 0 { - pos = types.StreamPosition(1) - } else { - pos = pos - 1 + pos, spos, err := d.Topology.SelectPositionInTopology(ctx, txn, events[0].EventID()) + if err != nil { + return zeroToken, err } - return + tok := types.NewTopologyToken(pos, spos) + tok.Decrement() + return tok, nil } // addRoomDeltaToResponse adds a room state delta to a sync response @@ -754,10 +756,10 @@ func (d *Database) addRoomDeltaToResponse( } recentEvents := d.StreamEventsToEvents(device, recentStreamEvents) delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back - backwardTopologyPos, backwardStreamPos := d.getBackwardTopologyPos(ctx, recentStreamEvents) - prevBatch := types.NewTopologyToken( - backwardTopologyPos, backwardStreamPos, - ) + prevBatch, err := d.getBackwardTopologyPos(ctx, txn, recentStreamEvents) + if err != nil { + return err + } switch delta.membership { case gomatrixserverlib.Join: diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go index 29b225829..e23aeca4e 100644 --- a/syncapi/storage/sqlite3/syncserver.go +++ b/syncapi/storage/sqlite3/syncserver.go @@ -16,13 +16,10 @@ package sqlite3 import ( - "context" "database/sql" "errors" - "fmt" "net/url" - "github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/internal/sqlutil" // Import the sqlite3 package @@ -31,19 +28,8 @@ import ( "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/syncapi/storage/shared" - "github.com/matrix-org/dendrite/syncapi/types" - "github.com/matrix-org/gomatrixserverlib" ) -type stateDelta struct { - roomID string - stateEvents []gomatrixserverlib.HeaderedEvent - membership string - // The PDU stream position of the latest membership event for this user, if applicable. - // Can be 0 if there is no membership event in this delta. - membershipPos types.StreamPosition -} - // SyncServerDatasource represents a sync server datasource which manages // both the database for PDUs and caches for EDUs. type SyncServerDatasource struct { @@ -53,9 +39,9 @@ type SyncServerDatasource struct { streamID streamIDStatements } -// NewSyncServerDatasource creates a new sync server database +// NewDatabase creates a new sync server database // nolint: gocyclo -func NewSyncServerDatasource(dataSourceName string) (*SyncServerDatasource, error) { +func NewDatabase(dataSourceName string) (*SyncServerDatasource, error) { var d SyncServerDatasource uri, err := url.Parse(dataSourceName) if err != nil { @@ -121,464 +107,3 @@ func (d *SyncServerDatasource) prepare() (err error) { } return nil } - -// addPDUDeltaToResponse adds all PDU deltas to a sync response. -// IDs of all rooms the user joined are returned so EDU deltas can be added for them. -func (d *SyncServerDatasource) addPDUDeltaToResponse( - ctx context.Context, - device authtypes.Device, - fromPos, toPos types.StreamPosition, - numRecentEventsPerRoom int, - wantFullState bool, - res *types.Response, -) (joinedRoomIDs []string, err error) { - txn, err := d.db.BeginTx(ctx, &txReadOnlySnapshot) - if err != nil { - return nil, err - } - var succeeded bool - defer func() { - txerr := common.EndTransaction(txn, &succeeded) - if err == nil && txerr != nil { - err = txerr - } - }() - - stateFilterPart := gomatrixserverlib.DefaultStateFilter() // TODO: use filter provided in request - - // Work out which rooms to return in the response. This is done by getting not only the currently - // joined rooms, but also which rooms have membership transitions for this user between the 2 PDU stream positions. - // This works out what the 'state' key should be for each room as well as which membership block - // to put the room into. - var deltas []stateDelta - if !wantFullState { - deltas, joinedRoomIDs, err = d.getStateDeltas( - ctx, &device, txn, fromPos, toPos, device.UserID, &stateFilterPart, - ) - } else { - deltas, joinedRoomIDs, err = d.getStateDeltasForFullStateSync( - ctx, &device, txn, fromPos, toPos, device.UserID, &stateFilterPart, - ) - } - if err != nil { - return nil, err - } - - for _, delta := range deltas { - err = d.addRoomDeltaToResponse(ctx, &device, txn, fromPos, toPos, delta, numRecentEventsPerRoom, res) - if err != nil { - return nil, err - } - } - - // TODO: This should be done in getStateDeltas - if err = d.Database.AddInvitesToResponse(ctx, txn, device.UserID, fromPos, toPos, res); err != nil { - return nil, err - } - - succeeded = true - return joinedRoomIDs, nil -} - -// IncrementalSync returns all the data needed in order to create an incremental -// sync response for the given user. Events returned will include any client -// transaction IDs associated with the given device. These transaction IDs come -// from when the device sent the event via an API that included a transaction -// ID. -func (d *SyncServerDatasource) IncrementalSync( - ctx context.Context, - device authtypes.Device, - fromPos, toPos types.StreamingToken, - numRecentEventsPerRoom int, - wantFullState bool, -) (*types.Response, error) { - nextBatchPos := fromPos.WithUpdates(toPos) - res := types.NewResponse(nextBatchPos) - - var joinedRoomIDs []string - var err error - fmt.Println("from", fromPos.PDUPosition(), "to", toPos.PDUPosition()) - if fromPos.PDUPosition() != toPos.PDUPosition() || wantFullState { - joinedRoomIDs, err = d.addPDUDeltaToResponse( - ctx, device, fromPos.PDUPosition(), toPos.PDUPosition(), numRecentEventsPerRoom, wantFullState, res, - ) - } else { - joinedRoomIDs, err = d.Database.CurrentRoomState.SelectRoomIDsWithMembership( - ctx, nil, device.UserID, gomatrixserverlib.Join, - ) - } - if err != nil { - return nil, err - } - - err = d.Database.AddEDUDeltaToResponse( - fromPos, toPos, joinedRoomIDs, res, - ) - if err != nil { - return nil, err - } - - return res, nil -} - -var txReadOnlySnapshot = sql.TxOptions{ - // Set the isolation level so that we see a snapshot of the database. - // In PostgreSQL repeatable read transactions will see a snapshot taken - // at the first query, and since the transaction is read-only it can't - // run into any serialisation errors. - // https://www.postgresql.org/docs/9.5/static/transaction-iso.html#XACT-REPEATABLE-READ - Isolation: sql.LevelRepeatableRead, - ReadOnly: true, -} - -// Retrieve the backward topology position, i.e. the position of the -// oldest event in the room's topology. -func (d *SyncServerDatasource) getBackwardTopologyPos( - ctx context.Context, txn *sql.Tx, - events []types.StreamEvent, -) (pos, spos types.StreamPosition) { - if len(events) > 0 { - pos, spos, _ = d.Database.Topology.SelectPositionInTopology(ctx, txn, events[0].EventID()) - } - // go to the previous position so we don't pull out the same event twice - // FIXME: This could be done more nicely by being explicit with inclusive/exclusive rules - if pos-1 <= 0 { - pos = types.StreamPosition(1) - } else { - pos = pos - 1 - spos += 1000 // this has to be bigger than the number of events we backfill per request - } - return -} - -// addRoomDeltaToResponse adds a room state delta to a sync response -func (d *SyncServerDatasource) addRoomDeltaToResponse( - ctx context.Context, - device *authtypes.Device, - txn *sql.Tx, - fromPos, toPos types.StreamPosition, - delta stateDelta, - numRecentEventsPerRoom int, - res *types.Response, -) error { - endPos := toPos - if delta.membershipPos > 0 && delta.membership == gomatrixserverlib.Leave { - // make sure we don't leak recent events after the leave event. - // TODO: History visibility makes this somewhat complex to handle correctly. For example: - // TODO: This doesn't work for join -> leave in a single /sync request (see events prior to join). - // TODO: This will fail on join -> leave -> sensitive msg -> join -> leave - // in a single /sync request - // This is all "okay" assuming history_visibility == "shared" which it is by default. - endPos = delta.membershipPos - } - recentStreamEvents, err := d.Database.OutputEvents.SelectRecentEvents( - ctx, txn, delta.roomID, types.StreamPosition(fromPos), types.StreamPosition(endPos), - numRecentEventsPerRoom, true, true, - ) - if err != nil { - return err - } - recentEvents := d.StreamEventsToEvents(device, recentStreamEvents) - delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) - backwardTopologyPos, backwardStreamPos := d.getBackwardTopologyPos(ctx, txn, recentStreamEvents) - prevBatch := types.NewTopologyToken( - backwardTopologyPos, backwardStreamPos, - ) - - switch delta.membership { - case gomatrixserverlib.Join: - jr := types.NewJoinResponse() - jr.Timeline.PrevBatch = prevBatch.String() - jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) - jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true - jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) - res.Rooms.Join[delta.roomID] = *jr - case gomatrixserverlib.Leave: - fallthrough // transitions to leave are the same as ban - case gomatrixserverlib.Ban: - // TODO: recentEvents may contain events that this user is not allowed to see because they are - // no longer in the room. - lr := types.NewLeaveResponse() - lr.Timeline.PrevBatch = prevBatch.String() - lr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) - lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true - lr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) - res.Rooms.Leave[delta.roomID] = *lr - } - - return nil -} - -// fetchStateEvents converts the set of event IDs into a set of events. It will fetch any which are missing from the database. -// Returns a map of room ID to list of events. -func (d *SyncServerDatasource) fetchStateEvents( - ctx context.Context, txn *sql.Tx, - roomIDToEventIDSet map[string]map[string]bool, - eventIDToEvent map[string]types.StreamEvent, -) (map[string][]types.StreamEvent, error) { - stateBetween := make(map[string][]types.StreamEvent) - missingEvents := make(map[string][]string) - for roomID, ids := range roomIDToEventIDSet { - events := stateBetween[roomID] - for id, need := range ids { - if !need { - continue // deleted state - } - e, ok := eventIDToEvent[id] - if ok { - events = append(events, e) - } else { - m := missingEvents[roomID] - m = append(m, id) - missingEvents[roomID] = m - } - } - stateBetween[roomID] = events - } - - if len(missingEvents) > 0 { - // This happens when add_state_ids has an event ID which is not in the provided range. - // We need to explicitly fetch them. - allMissingEventIDs := []string{} - for _, missingEvIDs := range missingEvents { - allMissingEventIDs = append(allMissingEventIDs, missingEvIDs...) - } - evs, err := d.fetchMissingStateEvents(ctx, txn, allMissingEventIDs) - if err != nil { - return nil, err - } - // we know we got them all otherwise an error would've been returned, so just loop the events - for _, ev := range evs { - roomID := ev.RoomID() - stateBetween[roomID] = append(stateBetween[roomID], ev) - } - } - return stateBetween, nil -} - -func (d *SyncServerDatasource) fetchMissingStateEvents( - ctx context.Context, txn *sql.Tx, eventIDs []string, -) ([]types.StreamEvent, error) { - // Fetch from the events table first so we pick up the stream ID for the - // event. - events, err := d.Database.OutputEvents.SelectEvents(ctx, txn, eventIDs) - if err != nil { - return nil, err - } - - have := map[string]bool{} - for _, event := range events { - have[event.EventID()] = true - } - var missing []string - for _, eventID := range eventIDs { - if !have[eventID] { - missing = append(missing, eventID) - } - } - if len(missing) == 0 { - return events, nil - } - - // If they are missing from the events table then they should be state - // events that we received from outside the main event stream. - // These should be in the room state table. - stateEvents, err := d.Database.CurrentRoomState.SelectEventsWithEventIDs(ctx, txn, missing) - - if err != nil { - return nil, err - } - if len(stateEvents) != len(missing) { - return nil, fmt.Errorf("failed to map all event IDs to events: (got %d, wanted %d)", len(stateEvents), len(missing)) - } - events = append(events, stateEvents...) - return events, nil -} - -// getStateDeltas returns the state deltas between fromPos and toPos, -// exclusive of oldPos, inclusive of newPos, for the rooms in which -// the user has new membership events. -// A list of joined room IDs is also returned in case the caller needs it. -func (d *SyncServerDatasource) getStateDeltas( - ctx context.Context, device *authtypes.Device, txn *sql.Tx, - fromPos, toPos types.StreamPosition, userID string, - stateFilterPart *gomatrixserverlib.StateFilter, -) ([]stateDelta, []string, error) { - // Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821 - // - Get membership list changes for this user in this sync response - // - For each room which has membership list changes: - // * Check if the room is 'newly joined' (insufficient to just check for a join event because we allow dupe joins TODO). - // If it is, then we need to send the full room state down (and 'limited' is always true). - // * Check if user is still CURRENTLY invited to the room. If so, add room to 'invited' block. - // * Check if the user is CURRENTLY (TODO) left/banned. If so, add room to 'archived' block. - // - Get all CURRENTLY joined rooms, and add them to 'joined' block. - var deltas []stateDelta - - // get all the state events ever between these two positions - stateNeeded, eventMap, err := d.Database.OutputEvents.SelectStateInRange(ctx, txn, fromPos, toPos, stateFilterPart) - if err != nil { - return nil, nil, err - } - state, err := d.fetchStateEvents(ctx, txn, stateNeeded, eventMap) - if err != nil { - return nil, nil, err - } - - for roomID, stateStreamEvents := range state { - for _, ev := range stateStreamEvents { - // TODO: Currently this will incorrectly add rooms which were ALREADY joined but they sent another no-op join event. - // We should be checking if the user was already joined at fromPos and not proceed if so. As a result of this, - // dupe join events will result in the entire room state coming down to the client again. This is added in - // the 'state' part of the response though, so is transparent modulo bandwidth concerns as it is not added to - // the timeline. - if membership := getMembershipFromEvent(&ev.HeaderedEvent, userID); membership != "" { - if membership == gomatrixserverlib.Join { - // send full room state down instead of a delta - var s []types.StreamEvent - s, err = d.currentStateStreamEventsForRoom(ctx, txn, roomID, stateFilterPart) - if err != nil { - return nil, nil, err - } - state[roomID] = s - continue // we'll add this room in when we do joined rooms - } - - deltas = append(deltas, stateDelta{ - membership: membership, - membershipPos: ev.StreamPosition, - stateEvents: d.StreamEventsToEvents(device, stateStreamEvents), - roomID: roomID, - }) - break - } - } - } - - // Add in currently joined rooms - joinedRoomIDs, err := d.Database.CurrentRoomState.SelectRoomIDsWithMembership(ctx, txn, userID, gomatrixserverlib.Join) - if err != nil { - return nil, nil, err - } - for _, joinedRoomID := range joinedRoomIDs { - deltas = append(deltas, stateDelta{ - membership: gomatrixserverlib.Join, - stateEvents: d.StreamEventsToEvents(device, state[joinedRoomID]), - roomID: joinedRoomID, - }) - } - - return deltas, joinedRoomIDs, nil -} - -// getStateDeltasForFullStateSync is a variant of getStateDeltas used for /sync -// requests with full_state=true. -// Fetches full state for all joined rooms and uses selectStateInRange to get -// updates for other rooms. -func (d *SyncServerDatasource) getStateDeltasForFullStateSync( - ctx context.Context, device *authtypes.Device, txn *sql.Tx, - fromPos, toPos types.StreamPosition, userID string, - stateFilterPart *gomatrixserverlib.StateFilter, -) ([]stateDelta, []string, error) { - joinedRoomIDs, err := d.Database.CurrentRoomState.SelectRoomIDsWithMembership(ctx, txn, userID, gomatrixserverlib.Join) - if err != nil { - return nil, nil, err - } - - // Use a reasonable initial capacity - deltas := make([]stateDelta, 0, len(joinedRoomIDs)) - - // Add full states for all joined rooms - for _, joinedRoomID := range joinedRoomIDs { - s, stateErr := d.currentStateStreamEventsForRoom(ctx, txn, joinedRoomID, stateFilterPart) - if stateErr != nil { - return nil, nil, stateErr - } - deltas = append(deltas, stateDelta{ - membership: gomatrixserverlib.Join, - stateEvents: d.StreamEventsToEvents(device, s), - roomID: joinedRoomID, - }) - } - - // Get all the state events ever between these two positions - stateNeeded, eventMap, err := d.Database.OutputEvents.SelectStateInRange(ctx, txn, fromPos, toPos, stateFilterPart) - if err != nil { - return nil, nil, err - } - state, err := d.fetchStateEvents(ctx, txn, stateNeeded, eventMap) - if err != nil { - return nil, nil, err - } - - for roomID, stateStreamEvents := range state { - for _, ev := range stateStreamEvents { - if membership := getMembershipFromEvent(&ev.HeaderedEvent, userID); membership != "" { - if membership != gomatrixserverlib.Join { // We've already added full state for all joined rooms above. - deltas = append(deltas, stateDelta{ - membership: membership, - membershipPos: ev.StreamPosition, - stateEvents: d.StreamEventsToEvents(device, stateStreamEvents), - roomID: roomID, - }) - } - - break - } - } - } - - return deltas, joinedRoomIDs, nil -} - -func (d *SyncServerDatasource) currentStateStreamEventsForRoom( - ctx context.Context, txn *sql.Tx, roomID string, - stateFilterPart *gomatrixserverlib.StateFilter, -) ([]types.StreamEvent, error) { - allState, err := d.Database.CurrentRoomState.SelectCurrentState(ctx, txn, roomID, stateFilterPart) - if err != nil { - return nil, err - } - s := make([]types.StreamEvent, len(allState)) - for i := 0; i < len(s); i++ { - s[i] = types.StreamEvent{HeaderedEvent: allState[i], StreamPosition: 0} - } - return s, nil -} - -// There may be some overlap where events in stateEvents are already in recentEvents, so filter -// them out so we don't include them twice in the /sync response. They should be in recentEvents -// only, so clients get to the correct state once they have rolled forward. -func removeDuplicates(stateEvents, recentEvents []gomatrixserverlib.HeaderedEvent) []gomatrixserverlib.HeaderedEvent { - for _, recentEv := range recentEvents { - if recentEv.StateKey() == nil { - continue // not a state event - } - // TODO: This is a linear scan over all the current state events in this room. This will - // be slow for big rooms. We should instead sort the state events by event ID (ORDER BY) - // then do a binary search to find matching events, similar to what roomserver does. - for j := 0; j < len(stateEvents); j++ { - if stateEvents[j].EventID() == recentEv.EventID() { - // overwrite the element to remove with the last element then pop the last element. - // This is orders of magnitude faster than re-slicing, but doesn't preserve ordering - // (we don't care about the order of stateEvents) - stateEvents[j] = stateEvents[len(stateEvents)-1] - stateEvents = stateEvents[:len(stateEvents)-1] - break // there shouldn't be multiple events with the same event ID - } - } - } - return stateEvents -} - -// getMembershipFromEvent returns the value of content.membership iff the event is a state event -// with type 'm.room.member' and state_key of userID. Otherwise, an empty string is returned. -func getMembershipFromEvent(ev *gomatrixserverlib.HeaderedEvent, userID string) string { - if ev.Type() == "m.room.member" && ev.StateKeyEquals(userID) { - membership, err := ev.Membership() - if err != nil { - return "" - } - return membership - } - return "" -} diff --git a/syncapi/storage/storage.go b/syncapi/storage/storage.go index 12ab8a63b..76a4b7a4e 100644 --- a/syncapi/storage/storage.go +++ b/syncapi/storage/storage.go @@ -34,7 +34,7 @@ func NewSyncServerDatasource(dataSourceName string, dbProperties common.DbProper case "postgres": return postgres.NewDatabase(dataSourceName, dbProperties) case "file": - return sqlite3.NewSyncServerDatasource(dataSourceName) + return sqlite3.NewDatabase(dataSourceName) default: return postgres.NewDatabase(dataSourceName, dbProperties) } diff --git a/syncapi/storage/storage_test.go b/syncapi/storage/storage_test.go index f7fa1a870..bffcfd053 100644 --- a/syncapi/storage/storage_test.go +++ b/syncapi/storage/storage_test.go @@ -51,7 +51,7 @@ func MustCreateEvent(t *testing.T, roomID string, prevs []gomatrixserverlib.Head } func MustCreateDatabase(t *testing.T) storage.Database { - db, err := sqlite3.NewSyncServerDatasource("file::memory:") + db, err := sqlite3.NewDatabase("file::memory:") if err != nil { t.Fatalf("NewSyncServerDatasource returned %s", err) } diff --git a/syncapi/storage/storage_wasm.go b/syncapi/storage/storage_wasm.go index 84bd9df96..666179afb 100644 --- a/syncapi/storage/storage_wasm.go +++ b/syncapi/storage/storage_wasm.go @@ -35,7 +35,7 @@ func NewSyncServerDatasource( case "postgres": return nil, fmt.Errorf("Cannot use postgres implementation") case "file": - return sqlite3.NewSyncServerDatasource(dataSourceName) + return sqlite3.NewDatabase(dataSourceName) default: return nil, fmt.Errorf("Cannot use postgres implementation") }