mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-02-14 06:54:27 -06:00
Pass through errors properly
This commit is contained in:
parent
15038eb2e7
commit
2ca972ef76
|
@ -155,14 +155,18 @@ func (r *Inputer) processRoomEventUsingUpdater(
|
|||
if err != nil {
|
||||
return true, fmt.Errorf("r.DB.GetRoomUpdater: %w", err)
|
||||
}
|
||||
if err = r.processRoomEvent(ctx, updater, inputRoomEvent); err != nil {
|
||||
if rerr := updater.Rollback(); rerr != nil {
|
||||
return false, fmt.Errorf("r.processRoomEvent: %w (with additional error rolling back transaction: %s)", err, rerr)
|
||||
commit, err := r.processRoomEvent(ctx, updater, inputRoomEvent)
|
||||
if commit {
|
||||
if err = updater.Commit(); err != nil {
|
||||
return false, fmt.Errorf("updater.Commit: %w", err)
|
||||
}
|
||||
} else {
|
||||
if rerr := updater.Rollback(); rerr != nil {
|
||||
return true, fmt.Errorf("updater.Rollback: %w", err)
|
||||
}
|
||||
return false, fmt.Errorf("r.processRoomEvent: %w", err)
|
||||
}
|
||||
if err = updater.Commit(); err != nil {
|
||||
return true, fmt.Errorf("updater.Commit: %w", err)
|
||||
if err != nil {
|
||||
return true, err
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
|
|
@ -70,13 +70,13 @@ func (r *Inputer) processRoomEvent(
|
|||
ctx context.Context,
|
||||
updater *shared.RoomUpdater,
|
||||
input *api.InputRoomEvent,
|
||||
) (err error) {
|
||||
) (commit bool, err error) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// Before we do anything, make sure the context hasn't expired for this pending task.
|
||||
// If it has then we'll give up straight away — it's probably a synchronous input
|
||||
// request and the caller has already given up, but the inbox task was still queued.
|
||||
return context.DeadlineExceeded
|
||||
return false, context.DeadlineExceeded
|
||||
default:
|
||||
}
|
||||
|
||||
|
@ -118,11 +118,11 @@ func (r *Inputer) processRoomEvent(
|
|||
case gomatrixserverlib.EventIDFormatV1:
|
||||
if bytes.Equal(event.EventReference().EventSHA256, evs[0].EventReference().EventSHA256) {
|
||||
logger.Debugf("Already processed event; ignoring")
|
||||
return nil
|
||||
return false, nil
|
||||
}
|
||||
default:
|
||||
logger.Debugf("Already processed event; ignoring")
|
||||
return nil
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -137,7 +137,7 @@ func (r *Inputer) processRoomEvent(
|
|||
PrevEventIDs: event.PrevEventIDs(),
|
||||
}
|
||||
if err = r.Queryer.QueryMissingAuthPrevEvents(ctx, missingReq, missingRes); err != nil {
|
||||
return fmt.Errorf("r.Queryer.QueryMissingAuthPrevEvents: %w", err)
|
||||
return false, fmt.Errorf("r.Queryer.QueryMissingAuthPrevEvents: %w", err)
|
||||
}
|
||||
}
|
||||
missingAuth := len(missingRes.MissingAuthEventIDs) > 0
|
||||
|
@ -149,7 +149,7 @@ func (r *Inputer) processRoomEvent(
|
|||
ExcludeSelf: true,
|
||||
}
|
||||
if err = r.FSAPI.QueryJoinedHostServerNamesInRoom(ctx, serverReq, serverRes); err != nil {
|
||||
return fmt.Errorf("r.FSAPI.QueryJoinedHostServerNamesInRoom: %w", err)
|
||||
return false, fmt.Errorf("r.FSAPI.QueryJoinedHostServerNamesInRoom: %w", err)
|
||||
}
|
||||
// Sort all of the servers into a map so that we can randomise
|
||||
// their order. Then make sure that the input origin and the
|
||||
|
@ -179,7 +179,7 @@ func (r *Inputer) processRoomEvent(
|
|||
authEvents := gomatrixserverlib.NewAuthEvents(nil)
|
||||
knownEvents := map[string]*types.Event{}
|
||||
if err = r.fetchAuthEvents(ctx, updater, logger, headered, &authEvents, knownEvents, serverRes.ServerNames); err != nil {
|
||||
return fmt.Errorf("r.fetchAuthEvents: %w", err)
|
||||
return false, fmt.Errorf("r.fetchAuthEvents: %w", err)
|
||||
}
|
||||
|
||||
// Check if the event is allowed by its auth events. If it isn't then
|
||||
|
@ -195,7 +195,7 @@ func (r *Inputer) processRoomEvent(
|
|||
authEventNIDs := make([]types.EventNID, 0, len(authEventIDs))
|
||||
for _, authEventID := range authEventIDs {
|
||||
if _, ok := knownEvents[authEventID]; !ok {
|
||||
return fmt.Errorf("missing auth event %s", authEventID)
|
||||
return false, fmt.Errorf("missing auth event %s", authEventID)
|
||||
}
|
||||
authEventNIDs = append(authEventNIDs, knownEvents[authEventID].EventNID)
|
||||
}
|
||||
|
@ -252,14 +252,14 @@ func (r *Inputer) processRoomEvent(
|
|||
// Store the event.
|
||||
_, _, stateAtEvent, redactionEvent, redactedEventID, err := updater.StoreEvent(ctx, event, authEventNIDs, isRejected)
|
||||
if err != nil {
|
||||
return fmt.Errorf("updater.StoreEvent: %w", err)
|
||||
return false, fmt.Errorf("updater.StoreEvent: %w", err)
|
||||
}
|
||||
|
||||
// if storing this event results in it being redacted then do so.
|
||||
if !isRejected && redactedEventID == event.EventID() {
|
||||
r, rerr := eventutil.RedactEvent(redactionEvent, event)
|
||||
if rerr != nil {
|
||||
return fmt.Errorf("eventutil.RedactEvent: %w", rerr)
|
||||
return false, fmt.Errorf("eventutil.RedactEvent: %w", rerr)
|
||||
}
|
||||
event = r
|
||||
}
|
||||
|
@ -270,15 +270,15 @@ func (r *Inputer) processRoomEvent(
|
|||
if input.Kind == api.KindOutlier {
|
||||
logger.Debug("Stored outlier")
|
||||
hooks.Run(hooks.KindNewEventPersisted, headered)
|
||||
return nil
|
||||
return true, nil
|
||||
}
|
||||
|
||||
roomInfo, err := updater.RoomInfo(ctx, event.RoomID())
|
||||
if err != nil {
|
||||
return fmt.Errorf("updater.RoomInfo: %w", err)
|
||||
return false, fmt.Errorf("updater.RoomInfo: %w", err)
|
||||
}
|
||||
if roomInfo == nil {
|
||||
return fmt.Errorf("updater.RoomInfo missing for room %s", event.RoomID())
|
||||
return false, fmt.Errorf("updater.RoomInfo missing for room %s", event.RoomID())
|
||||
}
|
||||
|
||||
if !missingPrev && stateAtEvent.BeforeStateSnapshotNID == 0 {
|
||||
|
@ -286,7 +286,7 @@ func (r *Inputer) processRoomEvent(
|
|||
// Lets calculate one.
|
||||
err = r.calculateAndSetState(ctx, updater, input, roomInfo, &stateAtEvent, event, isRejected)
|
||||
if err != nil {
|
||||
return fmt.Errorf("r.calculateAndSetState: %w", err)
|
||||
return false, fmt.Errorf("r.calculateAndSetState: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -296,7 +296,7 @@ func (r *Inputer) processRoomEvent(
|
|||
"soft_fail": softfail,
|
||||
"missing_prev": missingPrev,
|
||||
}).Warn("Stored rejected event")
|
||||
return nil
|
||||
return true, rejectionErr
|
||||
}
|
||||
|
||||
switch input.Kind {
|
||||
|
@ -311,7 +311,7 @@ func (r *Inputer) processRoomEvent(
|
|||
input.TransactionID, // transaction ID
|
||||
input.HasState, // rewrites state?
|
||||
); err != nil {
|
||||
return fmt.Errorf("r.updateLatestEvents: %w", err)
|
||||
return false, fmt.Errorf("r.updateLatestEvents: %w", err)
|
||||
}
|
||||
case api.KindOld:
|
||||
err = r.WriteOutputEvents(event.RoomID(), []api.OutputEvent{
|
||||
|
@ -323,7 +323,7 @@ func (r *Inputer) processRoomEvent(
|
|||
},
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("r.WriteOutputEvents (old): %w", err)
|
||||
return false, fmt.Errorf("r.WriteOutputEvents (old): %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -342,14 +342,14 @@ func (r *Inputer) processRoomEvent(
|
|||
},
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("r.WriteOutputEvents (redactions): %w", err)
|
||||
return false, fmt.Errorf("r.WriteOutputEvents (redactions): %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Everything was OK — the latest events updater didn't error and
|
||||
// we've sent output events. Finally, generate a hook call.
|
||||
hooks.Run(hooks.KindNewEventPersisted, headered)
|
||||
return nil
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// fetchAuthEvents will check to see if any of the
|
||||
|
|
|
@ -79,7 +79,7 @@ func (t *missingStateReq) processEventWithMissingState(
|
|||
// we can just inject all the newEvents as new as we may have only missed 1 or 2 events and have filled
|
||||
// in the gap in the DAG
|
||||
for _, newEvent := range newEvents {
|
||||
err = t.inputer.processRoomEvent(ctx, t.db, &api.InputRoomEvent{
|
||||
_, err = t.inputer.processRoomEvent(ctx, t.db, &api.InputRoomEvent{
|
||||
Kind: api.KindNew,
|
||||
Event: newEvent.Headered(roomVersion),
|
||||
Origin: t.origin,
|
||||
|
@ -188,7 +188,7 @@ func (t *missingStateReq) processEventWithMissingState(
|
|||
}
|
||||
// TODO: we could do this concurrently?
|
||||
for _, ire := range outlierRoomEvents {
|
||||
if err = t.inputer.processRoomEvent(ctx, t.db, &ire); err != nil {
|
||||
if _, err = t.inputer.processRoomEvent(ctx, t.db, &ire); err != nil {
|
||||
return fmt.Errorf("t.inputer.processRoomEvent[outlier]: %w", err)
|
||||
}
|
||||
}
|
||||
|
@ -201,7 +201,7 @@ func (t *missingStateReq) processEventWithMissingState(
|
|||
stateIDs = append(stateIDs, event.EventID())
|
||||
}
|
||||
|
||||
err = t.inputer.processRoomEvent(ctx, t.db, &api.InputRoomEvent{
|
||||
_, err = t.inputer.processRoomEvent(ctx, t.db, &api.InputRoomEvent{
|
||||
Kind: api.KindOld,
|
||||
Event: backwardsExtremity.Headered(roomVersion),
|
||||
Origin: t.origin,
|
||||
|
@ -218,7 +218,7 @@ func (t *missingStateReq) processEventWithMissingState(
|
|||
// they will automatically fast-forward based on the room state at the
|
||||
// extremity in the last step.
|
||||
for _, newEvent := range newEvents {
|
||||
err = t.inputer.processRoomEvent(ctx, t.db, &api.InputRoomEvent{
|
||||
_, err = t.inputer.processRoomEvent(ctx, t.db, &api.InputRoomEvent{
|
||||
Kind: api.KindOld,
|
||||
Event: newEvent.Headered(roomVersion),
|
||||
Origin: t.origin,
|
||||
|
|
Loading…
Reference in a new issue