diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index 14af66c2f..db5264ce6 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -16,7 +16,6 @@ package storage import ( "context" - "database/sql" eduAPI "github.com/matrix-org/dendrite/eduserver/api" @@ -30,24 +29,22 @@ import ( type Database interface { internal.PartitionStorer - ReadOnlySnapshot(ctx context.Context) (*sql.Tx, error) - MaxStreamTokenForPDUs(ctx context.Context) (types.StreamPosition, error) MaxStreamTokenForReceipts(ctx context.Context) (types.StreamPosition, error) MaxStreamTokenForInvites(ctx context.Context) (types.StreamPosition, error) - CurrentState(ctx context.Context, txn *sql.Tx, roomID string, stateFilterPart *gomatrixserverlib.StateFilter) ([]*gomatrixserverlib.HeaderedEvent, error) - GetStateDeltasForFullStateSync(ctx context.Context, device *userapi.Device, txn *sql.Tx, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter) ([]types.StateDelta, []string, error) - GetStateDeltas(ctx context.Context, device *userapi.Device, txn *sql.Tx, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter) ([]types.StateDelta, []string, error) - RoomIDsWithMembership(ctx context.Context, txn *sql.Tx, userID string, membership string) ([]string, error) + CurrentState(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter) ([]*gomatrixserverlib.HeaderedEvent, error) + GetStateDeltasForFullStateSync(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter) ([]types.StateDelta, []string, error) + GetStateDeltas(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter) ([]types.StateDelta, []string, error) + RoomIDsWithMembership(ctx context.Context, userID string, membership string) ([]string, error) - RecentEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, limit int, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error) + RecentEvents(ctx context.Context, roomID string, r types.Range, limit int, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error) - GetBackwardTopologyPos(ctx context.Context, txn *sql.Tx, events []types.StreamEvent) (types.TopologyToken, error) - PositionInTopology(ctx context.Context, txn *sql.Tx, eventID string) (pos types.StreamPosition, spos types.StreamPosition, err error) + GetBackwardTopologyPos(ctx context.Context, events []types.StreamEvent) (types.TopologyToken, error) + PositionInTopology(ctx context.Context, eventID string) (pos types.StreamPosition, spos types.StreamPosition, err error) - InviteEventsInRange(ctx context.Context, txn *sql.Tx, targetUserID string, r types.Range) (map[string]*gomatrixserverlib.HeaderedEvent, map[string]*gomatrixserverlib.HeaderedEvent, error) - PeeksInRange(ctx context.Context, txn *sql.Tx, userID, deviceID string, r types.Range) (peeks []types.Peek, err error) + InviteEventsInRange(ctx context.Context, targetUserID string, r types.Range) (map[string]*gomatrixserverlib.HeaderedEvent, map[string]*gomatrixserverlib.HeaderedEvent, error) + PeeksInRange(ctx context.Context, userID, deviceID string, r types.Range) (peeks []types.Peek, err error) RoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []eduAPI.OutputReceiptEvent, error) // AllJoinedUsersInRooms returns a map of room ID to a list of all joined user IDs. diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index c0e593464..af8fc1974 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -49,7 +49,7 @@ type Database struct { Receipts tables.Receipts } -func (d *Database) ReadOnlySnapshot(ctx context.Context) (*sql.Tx, error) { +func (d *Database) readOnlySnapshot(ctx context.Context) (*sql.Tx, error) { return d.DB.BeginTx(ctx, &sql.TxOptions{ // Set the isolation level so that we see a snapshot of the database. // In PostgreSQL repeatable read transactions will see a snapshot taken @@ -85,28 +85,28 @@ func (d *Database) MaxStreamTokenForInvites(ctx context.Context) (types.StreamPo return types.StreamPosition(id), nil } -func (d *Database) CurrentState(ctx context.Context, txn *sql.Tx, roomID string, stateFilterPart *gomatrixserverlib.StateFilter) ([]*gomatrixserverlib.HeaderedEvent, error) { - return d.CurrentRoomState.SelectCurrentState(ctx, txn, roomID, stateFilterPart) +func (d *Database) CurrentState(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter) ([]*gomatrixserverlib.HeaderedEvent, error) { + return d.CurrentRoomState.SelectCurrentState(ctx, nil, roomID, stateFilterPart) } -func (d *Database) RoomIDsWithMembership(ctx context.Context, txn *sql.Tx, userID string, membership string) ([]string, error) { - return d.CurrentRoomState.SelectRoomIDsWithMembership(ctx, txn, userID, membership) +func (d *Database) RoomIDsWithMembership(ctx context.Context, userID string, membership string) ([]string, error) { + return d.CurrentRoomState.SelectRoomIDsWithMembership(ctx, nil, userID, membership) } -func (d *Database) RecentEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, limit int, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error) { - return d.OutputEvents.SelectRecentEvents(ctx, txn, roomID, r, limit, chronologicalOrder, onlySyncEvents) +func (d *Database) RecentEvents(ctx context.Context, roomID string, r types.Range, limit int, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error) { + return d.OutputEvents.SelectRecentEvents(ctx, nil, roomID, r, limit, chronologicalOrder, onlySyncEvents) } -func (d *Database) PositionInTopology(ctx context.Context, txn *sql.Tx, eventID string) (pos types.StreamPosition, spos types.StreamPosition, err error) { - return d.Topology.SelectPositionInTopology(ctx, txn, eventID) +func (d *Database) PositionInTopology(ctx context.Context, eventID string) (pos types.StreamPosition, spos types.StreamPosition, err error) { + return d.Topology.SelectPositionInTopology(ctx, nil, eventID) } -func (d *Database) InviteEventsInRange(ctx context.Context, txn *sql.Tx, targetUserID string, r types.Range) (map[string]*gomatrixserverlib.HeaderedEvent, map[string]*gomatrixserverlib.HeaderedEvent, error) { - return d.Invites.SelectInviteEventsInRange(ctx, txn, targetUserID, r) +func (d *Database) InviteEventsInRange(ctx context.Context, targetUserID string, r types.Range) (map[string]*gomatrixserverlib.HeaderedEvent, map[string]*gomatrixserverlib.HeaderedEvent, error) { + return d.Invites.SelectInviteEventsInRange(ctx, nil, targetUserID, r) } -func (d *Database) PeeksInRange(ctx context.Context, txn *sql.Tx, userID, deviceID string, r types.Range) (peeks []types.Peek, err error) { - return d.Peeks.SelectPeeksInRange(ctx, txn, userID, deviceID, r) +func (d *Database) PeeksInRange(ctx context.Context, userID, deviceID string, r types.Range) (peeks []types.Peek, err error) { + return d.Peeks.SelectPeeksInRange(ctx, nil, userID, deviceID, r) } func (d *Database) RoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []eduAPI.OutputReceiptEvent, error) { @@ -551,14 +551,14 @@ func (d *Database) RedactEvent(ctx context.Context, redactedEventID string, reda // Retrieve the backward topology position, i.e. the position of the // oldest event in the room's topology. func (d *Database) GetBackwardTopologyPos( - ctx context.Context, txn *sql.Tx, + ctx context.Context, events []types.StreamEvent, ) (types.TopologyToken, error) { zeroToken := types.TopologyToken{} if len(events) == 0 { return zeroToken, nil } - pos, spos, err := d.Topology.SelectPositionInTopology(ctx, txn, events[0].EventID()) + pos, spos, err := d.Topology.SelectPositionInTopology(ctx, nil, events[0].EventID()) if err != nil { return zeroToken, err } @@ -567,80 +567,6 @@ func (d *Database) GetBackwardTopologyPos( return tok, nil } -// addRoomDeltaToResponse adds a room state delta to a sync response -/* -func (d *Database) addRoomDeltaToResponse( - ctx context.Context, - device *userapi.Device, - txn *sql.Tx, - r types.Range, - delta types.StateDelta, - numRecentEventsPerRoom int, - res *types.Response, -) error { - if delta.membershipPos > 0 && delta.membership == gomatrixserverlib.Leave { - // make sure we don't leak recent events after the leave event. - // TODO: History visibility makes this somewhat complex to handle correctly. For example: - // TODO: This doesn't work for join -> leave in a single /sync request (see events prior to join). - // TODO: This will fail on join -> leave -> sensitive msg -> join -> leave - // in a single /sync request - // This is all "okay" assuming history_visibility == "shared" which it is by default. - r.To = delta.membershipPos - } - recentStreamEvents, limited, err := d.OutputEvents.SelectRecentEvents( - ctx, txn, delta.roomID, r, - numRecentEventsPerRoom, true, true, - ) - if err != nil { - return err - } - recentEvents := d.StreamEventsToEvents(device, recentStreamEvents) - delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back - prevBatch, err := d.getBackwardTopologyPos(ctx, txn, recentStreamEvents) - if err != nil { - return err - } - - // XXX: should we ever get this far if we have no recent events or state in this room? - // in practice we do for peeks, but possibly not joins? - if len(recentEvents) == 0 && len(delta.stateEvents) == 0 { - return nil - } - - switch delta.membership { - case gomatrixserverlib.Join: - jr := types.NewJoinResponse() - - jr.Timeline.PrevBatch = &prevBatch - jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) - jr.Timeline.Limited = limited - jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) - res.Rooms.Join[delta.roomID] = *jr - case gomatrixserverlib.Peek: - jr := types.NewJoinResponse() - - jr.Timeline.PrevBatch = &prevBatch - jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) - jr.Timeline.Limited = limited - jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) - res.Rooms.Peek[delta.roomID] = *jr - case gomatrixserverlib.Leave: - fallthrough // transitions to leave are the same as ban - case gomatrixserverlib.Ban: - // TODO: recentEvents may contain events that this user is not allowed to see because they are - // no longer in the room. - lr := types.NewLeaveResponse() - lr.Timeline.PrevBatch = &prevBatch - lr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) - lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true - lr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) - res.Rooms.Leave[delta.roomID] = *lr - } - - return nil -} -*/ - // fetchStateEvents converts the set of event IDs into a set of events. It will fetch any which are missing from the database. // Returns a map of room ID to list of events. func (d *Database) fetchStateEvents( @@ -739,7 +665,7 @@ func (d *Database) fetchMissingStateEvents( // A list of joined room IDs is also returned in case the caller needs it. // nolint:gocyclo func (d *Database) GetStateDeltas( - ctx context.Context, device *userapi.Device, txn *sql.Tx, + ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter, ) ([]types.StateDelta, []string, error) { @@ -751,6 +677,13 @@ func (d *Database) GetStateDeltas( // * Check if user is still CURRENTLY invited to the room. If so, add room to 'invited' block. // * Check if the user is CURRENTLY (TODO) left/banned. If so, add room to 'archived' block. // - Get all CURRENTLY joined rooms, and add them to 'joined' block. + txn, err := d.readOnlySnapshot(ctx) + if err != nil { + return nil, nil, fmt.Errorf("d.readOnlySnapshot: %w", err) + } + var succeeded bool + defer sqlutil.EndTransactionWithCheck(txn, &succeeded, &err) + var deltas []types.StateDelta // get all the state events ever (i.e. for all available rooms) between these two positions @@ -834,6 +767,7 @@ func (d *Database) GetStateDeltas( }) } + succeeded = true return deltas, joinedRoomIDs, nil } @@ -843,10 +777,17 @@ func (d *Database) GetStateDeltas( // updates for other rooms. // nolint:gocyclo func (d *Database) GetStateDeltasForFullStateSync( - ctx context.Context, device *userapi.Device, txn *sql.Tx, + ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter, ) ([]types.StateDelta, []string, error) { + txn, err := d.readOnlySnapshot(ctx) + if err != nil { + return nil, nil, fmt.Errorf("d.readOnlySnapshot: %w", err) + } + var succeeded bool + defer sqlutil.EndTransactionWithCheck(txn, &succeeded, &err) + // Use a reasonable initial capacity deltas := make(map[string]types.StateDelta) @@ -923,6 +864,7 @@ func (d *Database) GetStateDeltasForFullStateSync( i++ } + succeeded = true return result, joinedRoomIDs, nil } diff --git a/syncapi/streams/stream_accountdata.go b/syncapi/streams/stream_accountdata.go index 169dc8b02..3de6ed15b 100644 --- a/syncapi/streams/stream_accountdata.go +++ b/syncapi/streams/stream_accountdata.go @@ -26,7 +26,8 @@ func (p *AccountDataStreamProvider) CompleteSync( } dataRes := &userapi.QueryAccountDataResponse{} if err := p.userAPI.QueryAccountData(ctx, dataReq, dataRes); err != nil { - return p.LatestPosition(ctx) // nil, err + req.Log.WithError(err).Error("p.userAPI.QueryAccountData failed") + return p.LatestPosition(ctx) } for datatype, databody := range dataRes.GlobalAccountData { req.Response.AccountData.Events = append( @@ -68,7 +69,8 @@ func (p *AccountDataStreamProvider) IncrementalSync( ctx, req.Device.UserID, r, &accountDataFilter, ) if err != nil { - return to // nil, fmt.Errorf("rp.db.GetAccountDataInRange: %w", err) + req.Log.WithError(err).Error("p.DB.GetAccountDataInRange failed") + return to } if len(dataTypes) == 0 { @@ -88,6 +90,7 @@ func (p *AccountDataStreamProvider) IncrementalSync( dataRes := userapi.QueryAccountDataResponse{} err = p.userAPI.QueryAccountData(ctx, &dataReq, &dataRes) if err != nil { + req.Log.WithError(err).Error("p.userAPI.QueryAccountData failed") continue } if roomID == "" { diff --git a/syncapi/streams/stream_invite.go b/syncapi/streams/stream_invite.go index 351348369..636dc22ca 100644 --- a/syncapi/streams/stream_invite.go +++ b/syncapi/streams/stream_invite.go @@ -18,7 +18,7 @@ func (p *InviteStreamProvider) Setup() { id, err := p.DB.MaxStreamTokenForInvites(context.Background()) if err != nil { - return + panic(err) } p.latest = id } @@ -41,10 +41,11 @@ func (p *InviteStreamProvider) IncrementalSync( } invites, retiredInvites, err := p.DB.InviteEventsInRange( - ctx, nil, req.Device.UserID, r, + ctx, req.Device.UserID, r, ) if err != nil { - return to // fmt.Errorf("d.Invites.SelectInviteEventsInRange: %w", err) + req.Log.WithError(err).Error("p.DB.InviteEventsInRange failed") + return to } for roomID, inviteEvent := range invites { diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index 42e2b053b..84d535b9f 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -2,9 +2,7 @@ package streams import ( "context" - "database/sql" - "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/syncapi/types" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" @@ -22,7 +20,7 @@ func (p *PDUStreamProvider) Setup() { id, err := p.DB.MaxStreamTokenForPDUs(context.Background()) if err != nil { - return + panic(err) } p.latest = id } @@ -33,17 +31,6 @@ func (p *PDUStreamProvider) CompleteSync( ) types.StreamPosition { to := p.LatestPosition(ctx) - // This needs to be all done in a transaction as we need to do multiple SELECTs, and we need to have - // a consistent view of the database throughout. This does have the unfortunate side-effect that all - // the matrixy logic resides in this function, but it's better to not hide the fact that this is - // being done in a transaction. - txn, err := p.DB.ReadOnlySnapshot(ctx) - if err != nil { - return to - } - succeeded := false - defer sqlutil.EndTransactionWithCheck(txn, &succeeded, &err) - // Get the current sync position which we will base the sync response on. r := types.Range{ From: 0, @@ -51,8 +38,9 @@ func (p *PDUStreamProvider) CompleteSync( } // Extract room state and recent events for all rooms the user is joined to. - joinedRoomIDs, err := p.DB.RoomIDsWithMembership(ctx, txn, req.Device.UserID, gomatrixserverlib.Join) + joinedRoomIDs, err := p.DB.RoomIDsWithMembership(ctx, req.Device.UserID, gomatrixserverlib.Join) if err != nil { + req.Log.WithError(err).Error("p.DB.RoomIDsWithMembership failed") return to } @@ -62,9 +50,10 @@ func (p *PDUStreamProvider) CompleteSync( for _, roomID := range joinedRoomIDs { var jr *types.JoinResponse jr, err = p.getJoinResponseForCompleteSync( - ctx, txn, roomID, r, &stateFilter, 20, req.Device, + ctx, roomID, r, &stateFilter, 20, req.Device, ) if err != nil { + req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed") return to } req.Response.Rooms.Join[roomID] = *jr @@ -72,24 +61,25 @@ func (p *PDUStreamProvider) CompleteSync( } // Add peeked rooms. - peeks, err := p.DB.PeeksInRange(ctx, txn, req.Device.UserID, req.Device.ID, r) + peeks, err := p.DB.PeeksInRange(ctx, req.Device.UserID, req.Device.ID, r) if err != nil { + req.Log.WithError(err).Error("p.DB.PeeksInRange failed") return to } for _, peek := range peeks { if !peek.Deleted { var jr *types.JoinResponse jr, err = p.getJoinResponseForCompleteSync( - ctx, txn, peek.RoomID, r, &stateFilter, 20, req.Device, + ctx, peek.RoomID, r, &stateFilter, 20, req.Device, ) if err != nil { + req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed") return to } req.Response.Rooms.Peek[peek.RoomID] = *jr } } - succeeded = true return p.LatestPosition(ctx) } @@ -115,11 +105,11 @@ func (p *PDUStreamProvider) IncrementalSync( stateFilter := gomatrixserverlib.DefaultStateFilter() if req.WantFullState { - if stateDeltas, joinedRooms, err = p.DB.GetStateDeltasForFullStateSync(ctx, req.Device, nil, r, req.Device.UserID, &stateFilter); err != nil { + if stateDeltas, joinedRooms, err = p.DB.GetStateDeltasForFullStateSync(ctx, req.Device, r, req.Device.UserID, &stateFilter); err != nil { return } } else { - if stateDeltas, joinedRooms, err = p.DB.GetStateDeltas(ctx, req.Device, nil, r, req.Device.UserID, &stateFilter); err != nil { + if stateDeltas, joinedRooms, err = p.DB.GetStateDeltas(ctx, req.Device, r, req.Device.UserID, &stateFilter); err != nil { return } } @@ -129,7 +119,7 @@ func (p *PDUStreamProvider) IncrementalSync( } for _, delta := range stateDeltas { - err = p.addRoomDeltaToResponse(ctx, req.Device, nil, r, delta, 20, req.Response) + err = p.addRoomDeltaToResponse(ctx, req.Device, r, delta, 20, req.Response) if err != nil { return newPos // nil, fmt.Errorf("d.addRoomDeltaToResponse: %w", err) } @@ -141,7 +131,6 @@ func (p *PDUStreamProvider) IncrementalSync( func (p *PDUStreamProvider) addRoomDeltaToResponse( ctx context.Context, device *userapi.Device, - txn *sql.Tx, r types.Range, delta types.StateDelta, numRecentEventsPerRoom int, @@ -157,7 +146,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( r.To = delta.MembershipPos } recentStreamEvents, limited, err := p.DB.RecentEvents( - ctx, txn, delta.RoomID, r, + ctx, delta.RoomID, r, numRecentEventsPerRoom, true, true, ) if err != nil { @@ -165,7 +154,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( } recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents) delta.StateEvents = removeDuplicates(delta.StateEvents, recentEvents) // roll back - prevBatch, err := p.DB.GetBackwardTopologyPos(ctx, txn, recentStreamEvents) + prevBatch, err := p.DB.GetBackwardTopologyPos(ctx, recentStreamEvents) if err != nil { return err } @@ -210,14 +199,14 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( } func (p *PDUStreamProvider) getJoinResponseForCompleteSync( - ctx context.Context, txn *sql.Tx, + ctx context.Context, roomID string, r types.Range, stateFilter *gomatrixserverlib.StateFilter, numRecentEventsPerRoom int, device *userapi.Device, ) (jr *types.JoinResponse, err error) { var stateEvents []*gomatrixserverlib.HeaderedEvent - stateEvents, err = p.DB.CurrentState(ctx, txn, roomID, stateFilter) + stateEvents, err = p.DB.CurrentState(ctx, roomID, stateFilter) if err != nil { return } @@ -226,7 +215,7 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync( var recentStreamEvents []types.StreamEvent var limited bool recentStreamEvents, limited, err = p.DB.RecentEvents( - ctx, txn, roomID, r, numRecentEventsPerRoom, true, true, + ctx, roomID, r, numRecentEventsPerRoom, true, true, ) if err != nil { return @@ -264,7 +253,7 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync( var prevBatch *types.TopologyToken if len(recentStreamEvents) > 0 { var backwardTopologyPos, backwardStreamPos types.StreamPosition - backwardTopologyPos, backwardStreamPos, err = p.DB.PositionInTopology(ctx, txn, recentStreamEvents[0].EventID()) + backwardTopologyPos, backwardStreamPos, err = p.DB.PositionInTopology(ctx, recentStreamEvents[0].EventID()) if err != nil { return } diff --git a/syncapi/streams/stream_receipt.go b/syncapi/streams/stream_receipt.go index cd345c312..876e2b1db 100644 --- a/syncapi/streams/stream_receipt.go +++ b/syncapi/streams/stream_receipt.go @@ -18,7 +18,7 @@ func (p *ReceiptStreamProvider) Setup() { id, err := p.DB.MaxStreamTokenForReceipts(context.Background()) if err != nil { - return + panic(err) } p.latest = id } @@ -44,7 +44,8 @@ func (p *ReceiptStreamProvider) IncrementalSync( lastPos, receipts, err := p.DB.RoomReceiptsAfter(ctx, joinedRooms, from) if err != nil { - return to //fmt.Errorf("unable to select receipts for rooms: %w", err) + req.Log.WithError(err).Error("p.DB.RoomReceiptsAfter failed") + return to } if len(receipts) == 0 || lastPos == 0 { @@ -78,7 +79,8 @@ func (p *ReceiptStreamProvider) IncrementalSync( } ev.Content, err = json.Marshal(content) if err != nil { - return to // err + req.Log.WithError(err).Error("json.Marshal failed") + return to } jr.Ephemeral.Events = append(jr.Ephemeral.Events, ev) diff --git a/syncapi/streams/stream_sendtodevice.go b/syncapi/streams/stream_sendtodevice.go index 3627e47fa..73be8304e 100644 --- a/syncapi/streams/stream_sendtodevice.go +++ b/syncapi/streams/stream_sendtodevice.go @@ -25,7 +25,8 @@ func (p *SendToDeviceStreamProvider) IncrementalSync( // See if we have any new tasks to do for the send-to-device messaging. lastPos, events, updates, deletions, err := p.DB.SendToDeviceUpdatesForSync(req.Context, req.Device.UserID, req.Device.ID, req.Since) if err != nil { - return to // nil, fmt.Errorf("rp.db.SendToDeviceUpdatesForSync: %w", err) + req.Log.WithError(err).Error("p.DB.SendToDeviceUpdatesForSync failed") + return to } // Before we return the sync response, make sure that we take action on @@ -35,7 +36,8 @@ func (p *SendToDeviceStreamProvider) IncrementalSync( // Handle the updates and deletions in the database. err = p.DB.CleanSendToDeviceUpdates(context.Background(), updates, deletions, req.Since) if err != nil { - return to // res, fmt.Errorf("rp.db.CleanSendToDeviceUpdates: %w", err) + req.Log.WithError(err).Error("p.DB.CleanSendToDeviceUpdates failed") + return to } } if len(events) > 0 { diff --git a/syncapi/streams/stream_typing.go b/syncapi/streams/stream_typing.go index 8100c0d92..779e4da5e 100644 --- a/syncapi/streams/stream_typing.go +++ b/syncapi/streams/stream_typing.go @@ -47,6 +47,7 @@ func (p *TypingStreamProvider) IncrementalSync( "user_ids": users, }) if err != nil { + req.Log.WithError(err).Error("json.Marshal failed") return to }