diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index 7fda5c91e..8a65be970 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -466,7 +466,7 @@ func (d *Database) addPDUDeltaToResponse( } // TODO: This should be done in getStateDeltas - if err = d.addInvitesToResponse(ctx, txn, device.UserID, fromPos, toPos, res); err != nil { + if err = d.AddInvitesToResponse(ctx, txn, device.UserID, fromPos, toPos, res); err != nil { return nil, err } @@ -510,7 +510,8 @@ func (d *Database) addTypingDeltaToResponse( // addEDUDeltaToResponse adds updates for EDUs of each type since fromPos if // the positions of that type are not equal in fromPos and toPos. -func (d *Database) addEDUDeltaToResponse( +// TODO FIXME TEMPORARY PUBLIC +func (d *Database) AddEDUDeltaToResponse( fromPos, toPos types.StreamingToken, joinedRoomIDs []string, res *types.Response, @@ -550,7 +551,7 @@ func (d *Database) IncrementalSync( return nil, err } - err = d.addEDUDeltaToResponse( + err = d.AddEDUDeltaToResponse( fromPos, toPos, joinedRoomIDs, res, ) if err != nil { @@ -627,7 +628,7 @@ func (d *Database) getResponseWithPDUsForCompleteSync( var prevBatchStr string if len(recentStreamEvents) > 0 { var backwardTopologyPos, backwardStreamPos types.StreamPosition - backwardTopologyPos, backwardStreamPos, err = d.Topology.SelectPositionInTopology(ctx, nil, recentStreamEvents[0].EventID()) + backwardTopologyPos, backwardStreamPos, err = d.Topology.SelectPositionInTopology(ctx, txn, recentStreamEvents[0].EventID()) if err != nil { return } @@ -648,7 +649,7 @@ func (d *Database) getResponseWithPDUsForCompleteSync( res.Rooms.Join[roomID] = *jr } - if err = d.addInvitesToResponse(ctx, txn, userID, 0, toPos.PDUPosition(), res); err != nil { + if err = d.AddInvitesToResponse(ctx, txn, userID, 0, toPos.PDUPosition(), res); err != nil { return } @@ -667,7 +668,7 @@ func (d *Database) CompleteSync( } // Use a zero value SyncPosition for fromPos so all EDU states are added. - err = d.addEDUDeltaToResponse( + err = d.AddEDUDeltaToResponse( types.NewStreamToken(0, 0), toPos, joinedRoomIDs, res, ) if err != nil { @@ -687,7 +688,8 @@ var txReadOnlySnapshot = sql.TxOptions{ ReadOnly: true, } -func (d *Database) addInvitesToResponse( +// TODO FIXME temporary public +func (d *Database) AddInvitesToResponse( ctx context.Context, txn *sql.Tx, userID string, fromPos, toPos types.StreamPosition, diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go index df02c39de..29b225829 100644 --- a/syncapi/storage/sqlite3/syncserver.go +++ b/syncapi/storage/sqlite3/syncserver.go @@ -18,7 +18,6 @@ package sqlite3 import ( "context" "database/sql" - "encoding/json" "errors" "fmt" "net/url" @@ -173,7 +172,7 @@ func (d *SyncServerDatasource) addPDUDeltaToResponse( } // TODO: This should be done in getStateDeltas - if err = d.addInvitesToResponse(ctx, txn, device.UserID, fromPos, toPos, res); err != nil { + if err = d.Database.AddInvitesToResponse(ctx, txn, device.UserID, fromPos, toPos, res); err != nil { return nil, err } @@ -181,57 +180,6 @@ func (d *SyncServerDatasource) addPDUDeltaToResponse( return joinedRoomIDs, nil } -// addTypingDeltaToResponse adds all typing notifications to a sync response -// since the specified position. -func (d *SyncServerDatasource) addTypingDeltaToResponse( - since types.StreamingToken, - joinedRoomIDs []string, - res *types.Response, -) error { - var jr types.JoinResponse - var ok bool - var err error - for _, roomID := range joinedRoomIDs { - if typingUsers, updated := d.Database.EDUCache.GetTypingUsersIfUpdatedAfter( - roomID, int64(since.EDUPosition()), - ); updated { - ev := gomatrixserverlib.ClientEvent{ - Type: gomatrixserverlib.MTyping, - } - ev.Content, err = json.Marshal(map[string]interface{}{ - "user_ids": typingUsers, - }) - if err != nil { - return err - } - - if jr, ok = res.Rooms.Join[roomID]; !ok { - jr = *types.NewJoinResponse() - } - jr.Ephemeral.Events = append(jr.Ephemeral.Events, ev) - res.Rooms.Join[roomID] = jr - } - } - return nil -} - -// addEDUDeltaToResponse adds updates for EDUs of each type since fromPos if -// the positions of that type are not equal in fromPos and toPos. -func (d *SyncServerDatasource) addEDUDeltaToResponse( - fromPos, toPos types.StreamingToken, - joinedRoomIDs []string, - res *types.Response, -) (err error) { - - if fromPos.EDUPosition() != toPos.EDUPosition() { - err = d.addTypingDeltaToResponse( - fromPos, joinedRoomIDs, res, - ) - } - - return -} - // IncrementalSync returns all the data needed in order to create an incremental // sync response for the given user. Events returned will include any client // transaction IDs associated with the given device. These transaction IDs come @@ -263,7 +211,7 @@ func (d *SyncServerDatasource) IncrementalSync( return nil, err } - err = d.addEDUDeltaToResponse( + err = d.Database.AddEDUDeltaToResponse( fromPos, toPos, joinedRoomIDs, res, ) if err != nil { @@ -273,126 +221,6 @@ func (d *SyncServerDatasource) IncrementalSync( return res, nil } -// getResponseWithPDUsForCompleteSync creates a response and adds all PDUs needed -// to it. It returns toPos and joinedRoomIDs for use of adding EDUs. -func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync( - ctx context.Context, - userID string, - numRecentEventsPerRoom int, -) ( - res *types.Response, - toPos types.StreamingToken, - joinedRoomIDs []string, - err error, -) { - // This needs to be all done in a transaction as we need to do multiple SELECTs, and we need to have - // a consistent view of the database throughout. This includes extracting the sync position. - // This does have the unfortunate side-effect that all the matrixy logic resides in this function, - // but it's better to not hide the fact that this is being done in a transaction. - txn, err := d.db.BeginTx(ctx, &txReadOnlySnapshot) - if err != nil { - return - } - var succeeded bool - defer func() { - txerr := common.EndTransaction(txn, &succeeded) - if err == nil && txerr != nil { - err = txerr - } - }() - - // Get the current sync position which we will base the sync response on. - toPos, err = d.Database.SyncPositionTx(ctx, txn) - if err != nil { - return - } - - res = types.NewResponse(toPos) - - // Extract room state and recent events for all rooms the user is joined to. - joinedRoomIDs, err = d.Database.CurrentRoomState.SelectRoomIDsWithMembership(ctx, txn, userID, gomatrixserverlib.Join) - if err != nil { - return - } - - stateFilterPart := gomatrixserverlib.DefaultStateFilter() // TODO: use filter provided in request - - // Build up a /sync response. Add joined rooms. - for _, roomID := range joinedRoomIDs { - var stateEvents []gomatrixserverlib.HeaderedEvent - stateEvents, err = d.Database.CurrentRoomState.SelectCurrentState(ctx, txn, roomID, &stateFilterPart) - if err != nil { - return - } - //fmt.Println("State events:", stateEvents) - // TODO: When filters are added, we may need to call this multiple times to get enough events. - // See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316 - var recentStreamEvents []types.StreamEvent - recentStreamEvents, err = d.Database.OutputEvents.SelectRecentEvents( - ctx, txn, roomID, types.StreamPosition(0), toPos.PDUPosition(), - numRecentEventsPerRoom, true, true, - ) - if err != nil { - return - } - //fmt.Println("Recent stream events:", recentStreamEvents) - - // Retrieve the backward topology position, i.e. the position of the - // oldest event in the room's topology. - var prevBatchStr string - if len(recentStreamEvents) > 0 { - var backwardTopologyPos, backwardStreamPos types.StreamPosition - backwardTopologyPos, backwardStreamPos, err = d.Database.Topology.SelectPositionInTopology(ctx, txn, recentStreamEvents[0].EventID()) - if err != nil { - return - } - prevBatch := types.NewTopologyToken(backwardTopologyPos, backwardStreamPos) - prevBatch.Decrement() - prevBatchStr = prevBatch.String() - } - - // We don't include a device here as we don't need to send down - // transaction IDs for complete syncs - recentEvents := d.StreamEventsToEvents(nil, recentStreamEvents) - stateEvents = removeDuplicates(stateEvents, recentEvents) - jr := types.NewJoinResponse() - jr.Timeline.PrevBatch = prevBatchStr - jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) - jr.Timeline.Limited = true - jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(stateEvents, gomatrixserverlib.FormatSync) - res.Rooms.Join[roomID] = *jr - } - - if err = d.addInvitesToResponse(ctx, txn, userID, 0, toPos.PDUPosition(), res); err != nil { - return - } - - succeeded = true - return res, toPos, joinedRoomIDs, err -} - -// CompleteSync returns a complete /sync API response for the given user. -func (d *SyncServerDatasource) CompleteSync( - ctx context.Context, userID string, numRecentEventsPerRoom int, -) (*types.Response, error) { - res, toPos, joinedRoomIDs, err := d.getResponseWithPDUsForCompleteSync( - ctx, userID, numRecentEventsPerRoom, - ) - if err != nil { - return nil, err - } - - // Use a zero value SyncPosition for fromPos so all EDU states are added. - err = d.addEDUDeltaToResponse( - types.NewStreamToken(0, 0), toPos, joinedRoomIDs, res, - ) - if err != nil { - return nil, err - } - - return res, nil -} - var txReadOnlySnapshot = sql.TxOptions{ // Set the isolation level so that we see a snapshot of the database. // In PostgreSQL repeatable read transactions will see a snapshot taken @@ -403,25 +231,6 @@ var txReadOnlySnapshot = sql.TxOptions{ ReadOnly: true, } -func (d *SyncServerDatasource) addInvitesToResponse( - ctx context.Context, txn *sql.Tx, - userID string, - fromPos, toPos types.StreamPosition, - res *types.Response, -) error { - invites, err := d.Database.Invites.SelectInviteEventsInRange( - ctx, txn, userID, fromPos, toPos, - ) - if err != nil { - return err - } - for roomID, inviteEvent := range invites { - ir := types.NewInviteResponse(inviteEvent) - res.Rooms.Invite[roomID] = *ir - } - return nil -} - // Retrieve the backward topology position, i.e. the position of the // oldest event in the room's topology. func (d *SyncServerDatasource) getBackwardTopologyPos(