mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-28 09:13:09 -06:00
Don't drop errors
This commit is contained in:
parent
89e0d0f124
commit
875cb2c379
|
|
@ -16,7 +16,6 @@ package storage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
|
||||||
|
|
||||||
eduAPI "github.com/matrix-org/dendrite/eduserver/api"
|
eduAPI "github.com/matrix-org/dendrite/eduserver/api"
|
||||||
|
|
||||||
|
|
@ -30,24 +29,22 @@ import (
|
||||||
type Database interface {
|
type Database interface {
|
||||||
internal.PartitionStorer
|
internal.PartitionStorer
|
||||||
|
|
||||||
ReadOnlySnapshot(ctx context.Context) (*sql.Tx, error)
|
|
||||||
|
|
||||||
MaxStreamTokenForPDUs(ctx context.Context) (types.StreamPosition, error)
|
MaxStreamTokenForPDUs(ctx context.Context) (types.StreamPosition, error)
|
||||||
MaxStreamTokenForReceipts(ctx context.Context) (types.StreamPosition, error)
|
MaxStreamTokenForReceipts(ctx context.Context) (types.StreamPosition, error)
|
||||||
MaxStreamTokenForInvites(ctx context.Context) (types.StreamPosition, error)
|
MaxStreamTokenForInvites(ctx context.Context) (types.StreamPosition, error)
|
||||||
|
|
||||||
CurrentState(ctx context.Context, txn *sql.Tx, roomID string, stateFilterPart *gomatrixserverlib.StateFilter) ([]*gomatrixserverlib.HeaderedEvent, error)
|
CurrentState(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter) ([]*gomatrixserverlib.HeaderedEvent, error)
|
||||||
GetStateDeltasForFullStateSync(ctx context.Context, device *userapi.Device, txn *sql.Tx, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter) ([]types.StateDelta, []string, error)
|
GetStateDeltasForFullStateSync(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter) ([]types.StateDelta, []string, error)
|
||||||
GetStateDeltas(ctx context.Context, device *userapi.Device, txn *sql.Tx, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter) ([]types.StateDelta, []string, error)
|
GetStateDeltas(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter) ([]types.StateDelta, []string, error)
|
||||||
RoomIDsWithMembership(ctx context.Context, txn *sql.Tx, userID string, membership string) ([]string, error)
|
RoomIDsWithMembership(ctx context.Context, userID string, membership string) ([]string, error)
|
||||||
|
|
||||||
RecentEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, limit int, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error)
|
RecentEvents(ctx context.Context, roomID string, r types.Range, limit int, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error)
|
||||||
|
|
||||||
GetBackwardTopologyPos(ctx context.Context, txn *sql.Tx, events []types.StreamEvent) (types.TopologyToken, error)
|
GetBackwardTopologyPos(ctx context.Context, events []types.StreamEvent) (types.TopologyToken, error)
|
||||||
PositionInTopology(ctx context.Context, txn *sql.Tx, eventID string) (pos types.StreamPosition, spos types.StreamPosition, err error)
|
PositionInTopology(ctx context.Context, eventID string) (pos types.StreamPosition, spos types.StreamPosition, err error)
|
||||||
|
|
||||||
InviteEventsInRange(ctx context.Context, txn *sql.Tx, targetUserID string, r types.Range) (map[string]*gomatrixserverlib.HeaderedEvent, map[string]*gomatrixserverlib.HeaderedEvent, error)
|
InviteEventsInRange(ctx context.Context, targetUserID string, r types.Range) (map[string]*gomatrixserverlib.HeaderedEvent, map[string]*gomatrixserverlib.HeaderedEvent, error)
|
||||||
PeeksInRange(ctx context.Context, txn *sql.Tx, userID, deviceID string, r types.Range) (peeks []types.Peek, err error)
|
PeeksInRange(ctx context.Context, userID, deviceID string, r types.Range) (peeks []types.Peek, err error)
|
||||||
RoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []eduAPI.OutputReceiptEvent, error)
|
RoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []eduAPI.OutputReceiptEvent, error)
|
||||||
|
|
||||||
// AllJoinedUsersInRooms returns a map of room ID to a list of all joined user IDs.
|
// AllJoinedUsersInRooms returns a map of room ID to a list of all joined user IDs.
|
||||||
|
|
|
||||||
|
|
@ -49,7 +49,7 @@ type Database struct {
|
||||||
Receipts tables.Receipts
|
Receipts tables.Receipts
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Database) ReadOnlySnapshot(ctx context.Context) (*sql.Tx, error) {
|
func (d *Database) readOnlySnapshot(ctx context.Context) (*sql.Tx, error) {
|
||||||
return d.DB.BeginTx(ctx, &sql.TxOptions{
|
return d.DB.BeginTx(ctx, &sql.TxOptions{
|
||||||
// Set the isolation level so that we see a snapshot of the database.
|
// Set the isolation level so that we see a snapshot of the database.
|
||||||
// In PostgreSQL repeatable read transactions will see a snapshot taken
|
// In PostgreSQL repeatable read transactions will see a snapshot taken
|
||||||
|
|
@ -85,28 +85,28 @@ func (d *Database) MaxStreamTokenForInvites(ctx context.Context) (types.StreamPo
|
||||||
return types.StreamPosition(id), nil
|
return types.StreamPosition(id), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Database) CurrentState(ctx context.Context, txn *sql.Tx, roomID string, stateFilterPart *gomatrixserverlib.StateFilter) ([]*gomatrixserverlib.HeaderedEvent, error) {
|
func (d *Database) CurrentState(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter) ([]*gomatrixserverlib.HeaderedEvent, error) {
|
||||||
return d.CurrentRoomState.SelectCurrentState(ctx, txn, roomID, stateFilterPart)
|
return d.CurrentRoomState.SelectCurrentState(ctx, nil, roomID, stateFilterPart)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Database) RoomIDsWithMembership(ctx context.Context, txn *sql.Tx, userID string, membership string) ([]string, error) {
|
func (d *Database) RoomIDsWithMembership(ctx context.Context, userID string, membership string) ([]string, error) {
|
||||||
return d.CurrentRoomState.SelectRoomIDsWithMembership(ctx, txn, userID, membership)
|
return d.CurrentRoomState.SelectRoomIDsWithMembership(ctx, nil, userID, membership)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Database) RecentEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, limit int, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error) {
|
func (d *Database) RecentEvents(ctx context.Context, roomID string, r types.Range, limit int, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error) {
|
||||||
return d.OutputEvents.SelectRecentEvents(ctx, txn, roomID, r, limit, chronologicalOrder, onlySyncEvents)
|
return d.OutputEvents.SelectRecentEvents(ctx, nil, roomID, r, limit, chronologicalOrder, onlySyncEvents)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Database) PositionInTopology(ctx context.Context, txn *sql.Tx, eventID string) (pos types.StreamPosition, spos types.StreamPosition, err error) {
|
func (d *Database) PositionInTopology(ctx context.Context, eventID string) (pos types.StreamPosition, spos types.StreamPosition, err error) {
|
||||||
return d.Topology.SelectPositionInTopology(ctx, txn, eventID)
|
return d.Topology.SelectPositionInTopology(ctx, nil, eventID)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Database) InviteEventsInRange(ctx context.Context, txn *sql.Tx, targetUserID string, r types.Range) (map[string]*gomatrixserverlib.HeaderedEvent, map[string]*gomatrixserverlib.HeaderedEvent, error) {
|
func (d *Database) InviteEventsInRange(ctx context.Context, targetUserID string, r types.Range) (map[string]*gomatrixserverlib.HeaderedEvent, map[string]*gomatrixserverlib.HeaderedEvent, error) {
|
||||||
return d.Invites.SelectInviteEventsInRange(ctx, txn, targetUserID, r)
|
return d.Invites.SelectInviteEventsInRange(ctx, nil, targetUserID, r)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Database) PeeksInRange(ctx context.Context, txn *sql.Tx, userID, deviceID string, r types.Range) (peeks []types.Peek, err error) {
|
func (d *Database) PeeksInRange(ctx context.Context, userID, deviceID string, r types.Range) (peeks []types.Peek, err error) {
|
||||||
return d.Peeks.SelectPeeksInRange(ctx, txn, userID, deviceID, r)
|
return d.Peeks.SelectPeeksInRange(ctx, nil, userID, deviceID, r)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Database) RoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []eduAPI.OutputReceiptEvent, error) {
|
func (d *Database) RoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []eduAPI.OutputReceiptEvent, error) {
|
||||||
|
|
@ -551,14 +551,14 @@ func (d *Database) RedactEvent(ctx context.Context, redactedEventID string, reda
|
||||||
// Retrieve the backward topology position, i.e. the position of the
|
// Retrieve the backward topology position, i.e. the position of the
|
||||||
// oldest event in the room's topology.
|
// oldest event in the room's topology.
|
||||||
func (d *Database) GetBackwardTopologyPos(
|
func (d *Database) GetBackwardTopologyPos(
|
||||||
ctx context.Context, txn *sql.Tx,
|
ctx context.Context,
|
||||||
events []types.StreamEvent,
|
events []types.StreamEvent,
|
||||||
) (types.TopologyToken, error) {
|
) (types.TopologyToken, error) {
|
||||||
zeroToken := types.TopologyToken{}
|
zeroToken := types.TopologyToken{}
|
||||||
if len(events) == 0 {
|
if len(events) == 0 {
|
||||||
return zeroToken, nil
|
return zeroToken, nil
|
||||||
}
|
}
|
||||||
pos, spos, err := d.Topology.SelectPositionInTopology(ctx, txn, events[0].EventID())
|
pos, spos, err := d.Topology.SelectPositionInTopology(ctx, nil, events[0].EventID())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return zeroToken, err
|
return zeroToken, err
|
||||||
}
|
}
|
||||||
|
|
@ -567,80 +567,6 @@ func (d *Database) GetBackwardTopologyPos(
|
||||||
return tok, nil
|
return tok, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// addRoomDeltaToResponse adds a room state delta to a sync response
|
|
||||||
/*
|
|
||||||
func (d *Database) addRoomDeltaToResponse(
|
|
||||||
ctx context.Context,
|
|
||||||
device *userapi.Device,
|
|
||||||
txn *sql.Tx,
|
|
||||||
r types.Range,
|
|
||||||
delta types.StateDelta,
|
|
||||||
numRecentEventsPerRoom int,
|
|
||||||
res *types.Response,
|
|
||||||
) error {
|
|
||||||
if delta.membershipPos > 0 && delta.membership == gomatrixserverlib.Leave {
|
|
||||||
// make sure we don't leak recent events after the leave event.
|
|
||||||
// TODO: History visibility makes this somewhat complex to handle correctly. For example:
|
|
||||||
// TODO: This doesn't work for join -> leave in a single /sync request (see events prior to join).
|
|
||||||
// TODO: This will fail on join -> leave -> sensitive msg -> join -> leave
|
|
||||||
// in a single /sync request
|
|
||||||
// This is all "okay" assuming history_visibility == "shared" which it is by default.
|
|
||||||
r.To = delta.membershipPos
|
|
||||||
}
|
|
||||||
recentStreamEvents, limited, err := d.OutputEvents.SelectRecentEvents(
|
|
||||||
ctx, txn, delta.roomID, r,
|
|
||||||
numRecentEventsPerRoom, true, true,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
recentEvents := d.StreamEventsToEvents(device, recentStreamEvents)
|
|
||||||
delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back
|
|
||||||
prevBatch, err := d.getBackwardTopologyPos(ctx, txn, recentStreamEvents)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// XXX: should we ever get this far if we have no recent events or state in this room?
|
|
||||||
// in practice we do for peeks, but possibly not joins?
|
|
||||||
if len(recentEvents) == 0 && len(delta.stateEvents) == 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
switch delta.membership {
|
|
||||||
case gomatrixserverlib.Join:
|
|
||||||
jr := types.NewJoinResponse()
|
|
||||||
|
|
||||||
jr.Timeline.PrevBatch = &prevBatch
|
|
||||||
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
|
||||||
jr.Timeline.Limited = limited
|
|
||||||
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
|
|
||||||
res.Rooms.Join[delta.roomID] = *jr
|
|
||||||
case gomatrixserverlib.Peek:
|
|
||||||
jr := types.NewJoinResponse()
|
|
||||||
|
|
||||||
jr.Timeline.PrevBatch = &prevBatch
|
|
||||||
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
|
||||||
jr.Timeline.Limited = limited
|
|
||||||
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
|
|
||||||
res.Rooms.Peek[delta.roomID] = *jr
|
|
||||||
case gomatrixserverlib.Leave:
|
|
||||||
fallthrough // transitions to leave are the same as ban
|
|
||||||
case gomatrixserverlib.Ban:
|
|
||||||
// TODO: recentEvents may contain events that this user is not allowed to see because they are
|
|
||||||
// no longer in the room.
|
|
||||||
lr := types.NewLeaveResponse()
|
|
||||||
lr.Timeline.PrevBatch = &prevBatch
|
|
||||||
lr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
|
||||||
lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
|
|
||||||
lr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
|
|
||||||
res.Rooms.Leave[delta.roomID] = *lr
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
|
||||||
// fetchStateEvents converts the set of event IDs into a set of events. It will fetch any which are missing from the database.
|
// fetchStateEvents converts the set of event IDs into a set of events. It will fetch any which are missing from the database.
|
||||||
// Returns a map of room ID to list of events.
|
// Returns a map of room ID to list of events.
|
||||||
func (d *Database) fetchStateEvents(
|
func (d *Database) fetchStateEvents(
|
||||||
|
|
@ -739,7 +665,7 @@ func (d *Database) fetchMissingStateEvents(
|
||||||
// A list of joined room IDs is also returned in case the caller needs it.
|
// A list of joined room IDs is also returned in case the caller needs it.
|
||||||
// nolint:gocyclo
|
// nolint:gocyclo
|
||||||
func (d *Database) GetStateDeltas(
|
func (d *Database) GetStateDeltas(
|
||||||
ctx context.Context, device *userapi.Device, txn *sql.Tx,
|
ctx context.Context, device *userapi.Device,
|
||||||
r types.Range, userID string,
|
r types.Range, userID string,
|
||||||
stateFilter *gomatrixserverlib.StateFilter,
|
stateFilter *gomatrixserverlib.StateFilter,
|
||||||
) ([]types.StateDelta, []string, error) {
|
) ([]types.StateDelta, []string, error) {
|
||||||
|
|
@ -751,6 +677,13 @@ func (d *Database) GetStateDeltas(
|
||||||
// * Check if user is still CURRENTLY invited to the room. If so, add room to 'invited' block.
|
// * Check if user is still CURRENTLY invited to the room. If so, add room to 'invited' block.
|
||||||
// * Check if the user is CURRENTLY (TODO) left/banned. If so, add room to 'archived' block.
|
// * Check if the user is CURRENTLY (TODO) left/banned. If so, add room to 'archived' block.
|
||||||
// - Get all CURRENTLY joined rooms, and add them to 'joined' block.
|
// - Get all CURRENTLY joined rooms, and add them to 'joined' block.
|
||||||
|
txn, err := d.readOnlySnapshot(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("d.readOnlySnapshot: %w", err)
|
||||||
|
}
|
||||||
|
var succeeded bool
|
||||||
|
defer sqlutil.EndTransactionWithCheck(txn, &succeeded, &err)
|
||||||
|
|
||||||
var deltas []types.StateDelta
|
var deltas []types.StateDelta
|
||||||
|
|
||||||
// get all the state events ever (i.e. for all available rooms) between these two positions
|
// get all the state events ever (i.e. for all available rooms) between these two positions
|
||||||
|
|
@ -834,6 +767,7 @@ func (d *Database) GetStateDeltas(
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
succeeded = true
|
||||||
return deltas, joinedRoomIDs, nil
|
return deltas, joinedRoomIDs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -843,10 +777,17 @@ func (d *Database) GetStateDeltas(
|
||||||
// updates for other rooms.
|
// updates for other rooms.
|
||||||
// nolint:gocyclo
|
// nolint:gocyclo
|
||||||
func (d *Database) GetStateDeltasForFullStateSync(
|
func (d *Database) GetStateDeltasForFullStateSync(
|
||||||
ctx context.Context, device *userapi.Device, txn *sql.Tx,
|
ctx context.Context, device *userapi.Device,
|
||||||
r types.Range, userID string,
|
r types.Range, userID string,
|
||||||
stateFilter *gomatrixserverlib.StateFilter,
|
stateFilter *gomatrixserverlib.StateFilter,
|
||||||
) ([]types.StateDelta, []string, error) {
|
) ([]types.StateDelta, []string, error) {
|
||||||
|
txn, err := d.readOnlySnapshot(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("d.readOnlySnapshot: %w", err)
|
||||||
|
}
|
||||||
|
var succeeded bool
|
||||||
|
defer sqlutil.EndTransactionWithCheck(txn, &succeeded, &err)
|
||||||
|
|
||||||
// Use a reasonable initial capacity
|
// Use a reasonable initial capacity
|
||||||
deltas := make(map[string]types.StateDelta)
|
deltas := make(map[string]types.StateDelta)
|
||||||
|
|
||||||
|
|
@ -923,6 +864,7 @@ func (d *Database) GetStateDeltasForFullStateSync(
|
||||||
i++
|
i++
|
||||||
}
|
}
|
||||||
|
|
||||||
|
succeeded = true
|
||||||
return result, joinedRoomIDs, nil
|
return result, joinedRoomIDs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -26,7 +26,8 @@ func (p *AccountDataStreamProvider) CompleteSync(
|
||||||
}
|
}
|
||||||
dataRes := &userapi.QueryAccountDataResponse{}
|
dataRes := &userapi.QueryAccountDataResponse{}
|
||||||
if err := p.userAPI.QueryAccountData(ctx, dataReq, dataRes); err != nil {
|
if err := p.userAPI.QueryAccountData(ctx, dataReq, dataRes); err != nil {
|
||||||
return p.LatestPosition(ctx) // nil, err
|
req.Log.WithError(err).Error("p.userAPI.QueryAccountData failed")
|
||||||
|
return p.LatestPosition(ctx)
|
||||||
}
|
}
|
||||||
for datatype, databody := range dataRes.GlobalAccountData {
|
for datatype, databody := range dataRes.GlobalAccountData {
|
||||||
req.Response.AccountData.Events = append(
|
req.Response.AccountData.Events = append(
|
||||||
|
|
@ -68,7 +69,8 @@ func (p *AccountDataStreamProvider) IncrementalSync(
|
||||||
ctx, req.Device.UserID, r, &accountDataFilter,
|
ctx, req.Device.UserID, r, &accountDataFilter,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return to // nil, fmt.Errorf("rp.db.GetAccountDataInRange: %w", err)
|
req.Log.WithError(err).Error("p.DB.GetAccountDataInRange failed")
|
||||||
|
return to
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(dataTypes) == 0 {
|
if len(dataTypes) == 0 {
|
||||||
|
|
@ -88,6 +90,7 @@ func (p *AccountDataStreamProvider) IncrementalSync(
|
||||||
dataRes := userapi.QueryAccountDataResponse{}
|
dataRes := userapi.QueryAccountDataResponse{}
|
||||||
err = p.userAPI.QueryAccountData(ctx, &dataReq, &dataRes)
|
err = p.userAPI.QueryAccountData(ctx, &dataReq, &dataRes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
req.Log.WithError(err).Error("p.userAPI.QueryAccountData failed")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if roomID == "" {
|
if roomID == "" {
|
||||||
|
|
|
||||||
|
|
@ -18,7 +18,7 @@ func (p *InviteStreamProvider) Setup() {
|
||||||
|
|
||||||
id, err := p.DB.MaxStreamTokenForInvites(context.Background())
|
id, err := p.DB.MaxStreamTokenForInvites(context.Background())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
panic(err)
|
||||||
}
|
}
|
||||||
p.latest = id
|
p.latest = id
|
||||||
}
|
}
|
||||||
|
|
@ -41,10 +41,11 @@ func (p *InviteStreamProvider) IncrementalSync(
|
||||||
}
|
}
|
||||||
|
|
||||||
invites, retiredInvites, err := p.DB.InviteEventsInRange(
|
invites, retiredInvites, err := p.DB.InviteEventsInRange(
|
||||||
ctx, nil, req.Device.UserID, r,
|
ctx, req.Device.UserID, r,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return to // fmt.Errorf("d.Invites.SelectInviteEventsInRange: %w", err)
|
req.Log.WithError(err).Error("p.DB.InviteEventsInRange failed")
|
||||||
|
return to
|
||||||
}
|
}
|
||||||
|
|
||||||
for roomID, inviteEvent := range invites {
|
for roomID, inviteEvent := range invites {
|
||||||
|
|
|
||||||
|
|
@ -2,9 +2,7 @@ package streams
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
|
@ -22,7 +20,7 @@ func (p *PDUStreamProvider) Setup() {
|
||||||
|
|
||||||
id, err := p.DB.MaxStreamTokenForPDUs(context.Background())
|
id, err := p.DB.MaxStreamTokenForPDUs(context.Background())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
panic(err)
|
||||||
}
|
}
|
||||||
p.latest = id
|
p.latest = id
|
||||||
}
|
}
|
||||||
|
|
@ -33,17 +31,6 @@ func (p *PDUStreamProvider) CompleteSync(
|
||||||
) types.StreamPosition {
|
) types.StreamPosition {
|
||||||
to := p.LatestPosition(ctx)
|
to := p.LatestPosition(ctx)
|
||||||
|
|
||||||
// This needs to be all done in a transaction as we need to do multiple SELECTs, and we need to have
|
|
||||||
// a consistent view of the database throughout. This does have the unfortunate side-effect that all
|
|
||||||
// the matrixy logic resides in this function, but it's better to not hide the fact that this is
|
|
||||||
// being done in a transaction.
|
|
||||||
txn, err := p.DB.ReadOnlySnapshot(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return to
|
|
||||||
}
|
|
||||||
succeeded := false
|
|
||||||
defer sqlutil.EndTransactionWithCheck(txn, &succeeded, &err)
|
|
||||||
|
|
||||||
// Get the current sync position which we will base the sync response on.
|
// Get the current sync position which we will base the sync response on.
|
||||||
r := types.Range{
|
r := types.Range{
|
||||||
From: 0,
|
From: 0,
|
||||||
|
|
@ -51,8 +38,9 @@ func (p *PDUStreamProvider) CompleteSync(
|
||||||
}
|
}
|
||||||
|
|
||||||
// Extract room state and recent events for all rooms the user is joined to.
|
// Extract room state and recent events for all rooms the user is joined to.
|
||||||
joinedRoomIDs, err := p.DB.RoomIDsWithMembership(ctx, txn, req.Device.UserID, gomatrixserverlib.Join)
|
joinedRoomIDs, err := p.DB.RoomIDsWithMembership(ctx, req.Device.UserID, gomatrixserverlib.Join)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
req.Log.WithError(err).Error("p.DB.RoomIDsWithMembership failed")
|
||||||
return to
|
return to
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -62,9 +50,10 @@ func (p *PDUStreamProvider) CompleteSync(
|
||||||
for _, roomID := range joinedRoomIDs {
|
for _, roomID := range joinedRoomIDs {
|
||||||
var jr *types.JoinResponse
|
var jr *types.JoinResponse
|
||||||
jr, err = p.getJoinResponseForCompleteSync(
|
jr, err = p.getJoinResponseForCompleteSync(
|
||||||
ctx, txn, roomID, r, &stateFilter, 20, req.Device,
|
ctx, roomID, r, &stateFilter, 20, req.Device,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed")
|
||||||
return to
|
return to
|
||||||
}
|
}
|
||||||
req.Response.Rooms.Join[roomID] = *jr
|
req.Response.Rooms.Join[roomID] = *jr
|
||||||
|
|
@ -72,24 +61,25 @@ func (p *PDUStreamProvider) CompleteSync(
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add peeked rooms.
|
// Add peeked rooms.
|
||||||
peeks, err := p.DB.PeeksInRange(ctx, txn, req.Device.UserID, req.Device.ID, r)
|
peeks, err := p.DB.PeeksInRange(ctx, req.Device.UserID, req.Device.ID, r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
req.Log.WithError(err).Error("p.DB.PeeksInRange failed")
|
||||||
return to
|
return to
|
||||||
}
|
}
|
||||||
for _, peek := range peeks {
|
for _, peek := range peeks {
|
||||||
if !peek.Deleted {
|
if !peek.Deleted {
|
||||||
var jr *types.JoinResponse
|
var jr *types.JoinResponse
|
||||||
jr, err = p.getJoinResponseForCompleteSync(
|
jr, err = p.getJoinResponseForCompleteSync(
|
||||||
ctx, txn, peek.RoomID, r, &stateFilter, 20, req.Device,
|
ctx, peek.RoomID, r, &stateFilter, 20, req.Device,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed")
|
||||||
return to
|
return to
|
||||||
}
|
}
|
||||||
req.Response.Rooms.Peek[peek.RoomID] = *jr
|
req.Response.Rooms.Peek[peek.RoomID] = *jr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
succeeded = true
|
|
||||||
return p.LatestPosition(ctx)
|
return p.LatestPosition(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -115,11 +105,11 @@ func (p *PDUStreamProvider) IncrementalSync(
|
||||||
stateFilter := gomatrixserverlib.DefaultStateFilter()
|
stateFilter := gomatrixserverlib.DefaultStateFilter()
|
||||||
|
|
||||||
if req.WantFullState {
|
if req.WantFullState {
|
||||||
if stateDeltas, joinedRooms, err = p.DB.GetStateDeltasForFullStateSync(ctx, req.Device, nil, r, req.Device.UserID, &stateFilter); err != nil {
|
if stateDeltas, joinedRooms, err = p.DB.GetStateDeltasForFullStateSync(ctx, req.Device, r, req.Device.UserID, &stateFilter); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if stateDeltas, joinedRooms, err = p.DB.GetStateDeltas(ctx, req.Device, nil, r, req.Device.UserID, &stateFilter); err != nil {
|
if stateDeltas, joinedRooms, err = p.DB.GetStateDeltas(ctx, req.Device, r, req.Device.UserID, &stateFilter); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -129,7 +119,7 @@ func (p *PDUStreamProvider) IncrementalSync(
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, delta := range stateDeltas {
|
for _, delta := range stateDeltas {
|
||||||
err = p.addRoomDeltaToResponse(ctx, req.Device, nil, r, delta, 20, req.Response)
|
err = p.addRoomDeltaToResponse(ctx, req.Device, r, delta, 20, req.Response)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return newPos // nil, fmt.Errorf("d.addRoomDeltaToResponse: %w", err)
|
return newPos // nil, fmt.Errorf("d.addRoomDeltaToResponse: %w", err)
|
||||||
}
|
}
|
||||||
|
|
@ -141,7 +131,6 @@ func (p *PDUStreamProvider) IncrementalSync(
|
||||||
func (p *PDUStreamProvider) addRoomDeltaToResponse(
|
func (p *PDUStreamProvider) addRoomDeltaToResponse(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
device *userapi.Device,
|
device *userapi.Device,
|
||||||
txn *sql.Tx,
|
|
||||||
r types.Range,
|
r types.Range,
|
||||||
delta types.StateDelta,
|
delta types.StateDelta,
|
||||||
numRecentEventsPerRoom int,
|
numRecentEventsPerRoom int,
|
||||||
|
|
@ -157,7 +146,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
|
||||||
r.To = delta.MembershipPos
|
r.To = delta.MembershipPos
|
||||||
}
|
}
|
||||||
recentStreamEvents, limited, err := p.DB.RecentEvents(
|
recentStreamEvents, limited, err := p.DB.RecentEvents(
|
||||||
ctx, txn, delta.RoomID, r,
|
ctx, delta.RoomID, r,
|
||||||
numRecentEventsPerRoom, true, true,
|
numRecentEventsPerRoom, true, true,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -165,7 +154,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
|
||||||
}
|
}
|
||||||
recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents)
|
recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents)
|
||||||
delta.StateEvents = removeDuplicates(delta.StateEvents, recentEvents) // roll back
|
delta.StateEvents = removeDuplicates(delta.StateEvents, recentEvents) // roll back
|
||||||
prevBatch, err := p.DB.GetBackwardTopologyPos(ctx, txn, recentStreamEvents)
|
prevBatch, err := p.DB.GetBackwardTopologyPos(ctx, recentStreamEvents)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
@ -210,14 +199,14 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
|
func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
|
||||||
ctx context.Context, txn *sql.Tx,
|
ctx context.Context,
|
||||||
roomID string,
|
roomID string,
|
||||||
r types.Range,
|
r types.Range,
|
||||||
stateFilter *gomatrixserverlib.StateFilter,
|
stateFilter *gomatrixserverlib.StateFilter,
|
||||||
numRecentEventsPerRoom int, device *userapi.Device,
|
numRecentEventsPerRoom int, device *userapi.Device,
|
||||||
) (jr *types.JoinResponse, err error) {
|
) (jr *types.JoinResponse, err error) {
|
||||||
var stateEvents []*gomatrixserverlib.HeaderedEvent
|
var stateEvents []*gomatrixserverlib.HeaderedEvent
|
||||||
stateEvents, err = p.DB.CurrentState(ctx, txn, roomID, stateFilter)
|
stateEvents, err = p.DB.CurrentState(ctx, roomID, stateFilter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
@ -226,7 +215,7 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
|
||||||
var recentStreamEvents []types.StreamEvent
|
var recentStreamEvents []types.StreamEvent
|
||||||
var limited bool
|
var limited bool
|
||||||
recentStreamEvents, limited, err = p.DB.RecentEvents(
|
recentStreamEvents, limited, err = p.DB.RecentEvents(
|
||||||
ctx, txn, roomID, r, numRecentEventsPerRoom, true, true,
|
ctx, roomID, r, numRecentEventsPerRoom, true, true,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
|
|
@ -264,7 +253,7 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
|
||||||
var prevBatch *types.TopologyToken
|
var prevBatch *types.TopologyToken
|
||||||
if len(recentStreamEvents) > 0 {
|
if len(recentStreamEvents) > 0 {
|
||||||
var backwardTopologyPos, backwardStreamPos types.StreamPosition
|
var backwardTopologyPos, backwardStreamPos types.StreamPosition
|
||||||
backwardTopologyPos, backwardStreamPos, err = p.DB.PositionInTopology(ctx, txn, recentStreamEvents[0].EventID())
|
backwardTopologyPos, backwardStreamPos, err = p.DB.PositionInTopology(ctx, recentStreamEvents[0].EventID())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -18,7 +18,7 @@ func (p *ReceiptStreamProvider) Setup() {
|
||||||
|
|
||||||
id, err := p.DB.MaxStreamTokenForReceipts(context.Background())
|
id, err := p.DB.MaxStreamTokenForReceipts(context.Background())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
panic(err)
|
||||||
}
|
}
|
||||||
p.latest = id
|
p.latest = id
|
||||||
}
|
}
|
||||||
|
|
@ -44,7 +44,8 @@ func (p *ReceiptStreamProvider) IncrementalSync(
|
||||||
|
|
||||||
lastPos, receipts, err := p.DB.RoomReceiptsAfter(ctx, joinedRooms, from)
|
lastPos, receipts, err := p.DB.RoomReceiptsAfter(ctx, joinedRooms, from)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return to //fmt.Errorf("unable to select receipts for rooms: %w", err)
|
req.Log.WithError(err).Error("p.DB.RoomReceiptsAfter failed")
|
||||||
|
return to
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(receipts) == 0 || lastPos == 0 {
|
if len(receipts) == 0 || lastPos == 0 {
|
||||||
|
|
@ -78,7 +79,8 @@ func (p *ReceiptStreamProvider) IncrementalSync(
|
||||||
}
|
}
|
||||||
ev.Content, err = json.Marshal(content)
|
ev.Content, err = json.Marshal(content)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return to // err
|
req.Log.WithError(err).Error("json.Marshal failed")
|
||||||
|
return to
|
||||||
}
|
}
|
||||||
|
|
||||||
jr.Ephemeral.Events = append(jr.Ephemeral.Events, ev)
|
jr.Ephemeral.Events = append(jr.Ephemeral.Events, ev)
|
||||||
|
|
|
||||||
|
|
@ -25,7 +25,8 @@ func (p *SendToDeviceStreamProvider) IncrementalSync(
|
||||||
// See if we have any new tasks to do for the send-to-device messaging.
|
// See if we have any new tasks to do for the send-to-device messaging.
|
||||||
lastPos, events, updates, deletions, err := p.DB.SendToDeviceUpdatesForSync(req.Context, req.Device.UserID, req.Device.ID, req.Since)
|
lastPos, events, updates, deletions, err := p.DB.SendToDeviceUpdatesForSync(req.Context, req.Device.UserID, req.Device.ID, req.Since)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return to // nil, fmt.Errorf("rp.db.SendToDeviceUpdatesForSync: %w", err)
|
req.Log.WithError(err).Error("p.DB.SendToDeviceUpdatesForSync failed")
|
||||||
|
return to
|
||||||
}
|
}
|
||||||
|
|
||||||
// Before we return the sync response, make sure that we take action on
|
// Before we return the sync response, make sure that we take action on
|
||||||
|
|
@ -35,7 +36,8 @@ func (p *SendToDeviceStreamProvider) IncrementalSync(
|
||||||
// Handle the updates and deletions in the database.
|
// Handle the updates and deletions in the database.
|
||||||
err = p.DB.CleanSendToDeviceUpdates(context.Background(), updates, deletions, req.Since)
|
err = p.DB.CleanSendToDeviceUpdates(context.Background(), updates, deletions, req.Since)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return to // res, fmt.Errorf("rp.db.CleanSendToDeviceUpdates: %w", err)
|
req.Log.WithError(err).Error("p.DB.CleanSendToDeviceUpdates failed")
|
||||||
|
return to
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(events) > 0 {
|
if len(events) > 0 {
|
||||||
|
|
|
||||||
|
|
@ -47,6 +47,7 @@ func (p *TypingStreamProvider) IncrementalSync(
|
||||||
"user_ids": users,
|
"user_ids": users,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
req.Log.WithError(err).Error("json.Marshal failed")
|
||||||
return to
|
return to
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue