Implement rejected events

Critically, rejected events CAN cause state resolution to happen
as it can merge forks in the DAG. This is fine, _provided_ we
do not add the rejected event when performing state resolution,
which is what this PR does. It also fixes the error handling
when NotAllowed happens, as we were checking too early and needlessly
handling NotAllowed in more than one place.
This commit is contained in:
Kegan Dougal 2020-09-15 17:50:19 +01:00
parent 6649528a7a
commit d9acb07326
8 changed files with 34 additions and 32 deletions

View file

@ -17,7 +17,6 @@ package routing
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"time"
@ -373,13 +372,10 @@ func (t *txnReq) processEvent(ctx context.Context, e gomatrixserverlib.Event, is
return t.processEventWithMissingState(ctx, e, stateResp.RoomVersion, isInboundTxn)
}
// Check that the event is allowed by the state at the event.
if err := checkAllowedByState(e, gomatrixserverlib.UnwrapEventHeaders(stateResp.StateEvents)); err != nil {
return err
}
// pass the event to the roomserver
err := api.SendEvents(
// pass the event to the roomserver which will do auth checks
// If the event fail auth checks, gmsl.NotAllowed error will be returned which we be silently
// discarded by the caller of this function
return api.SendEvents(
context.Background(),
t.rsAPI,
[]gomatrixserverlib.HeaderedEvent{
@ -388,12 +384,6 @@ func (t *txnReq) processEvent(ctx context.Context, e gomatrixserverlib.Event, is
api.DoNotSendToOtherServers,
nil,
)
var notAllowed *gomatrixserverlib.NotAllowed
if errors.As(err, &notAllowed) {
// the event was rejected, silently ignore.
err = nil
}
return err
}
func checkAllowedByState(e gomatrixserverlib.Event, stateEvents []gomatrixserverlib.Event) error {

View file

@ -70,17 +70,9 @@ func (r *Inputer) processRoomEvent(
if err != nil {
return "", fmt.Errorf("r.DB.StoreEvent: %w", err)
}
// We stop here if the event is rejected: We've stored it but won't update forward extremities or notify anyone about it.
if isRejected {
logrus.WithFields(logrus.Fields{
"event_id": event.EventID(),
"type": event.Type(),
"room": event.RoomID(),
}).Debug("Stored rejected event")
return event.EventID(), rejectionErr
}
// if storing this event results in it being redacted then do so.
if redactedEventID == event.EventID() {
if !isRejected && redactedEventID == event.EventID() {
r, rerr := eventutil.RedactEvent(redactionEvent, &event)
if rerr != nil {
return "", fmt.Errorf("eventutil.RedactEvent: %w", rerr)
@ -111,12 +103,22 @@ func (r *Inputer) processRoomEvent(
if stateAtEvent.BeforeStateSnapshotNID == 0 {
// We haven't calculated a state for this event yet.
// Lets calculate one.
err = r.calculateAndSetState(ctx, input, *roomInfo, &stateAtEvent, event)
err = r.calculateAndSetState(ctx, input, *roomInfo, &stateAtEvent, event, isRejected)
if err != nil {
return "", fmt.Errorf("r.calculateAndSetState: %w", err)
}
}
// We stop here if the event is rejected: We've stored it but won't update forward extremities or notify anyone about it.
if isRejected {
logrus.WithFields(logrus.Fields{
"event_id": event.EventID(),
"type": event.Type(),
"room": event.RoomID(),
}).Debug("Stored rejected event")
return event.EventID(), rejectionErr
}
if input.Kind == api.KindRewrite {
logrus.WithFields(logrus.Fields{
"event_id": event.EventID(),
@ -167,11 +169,12 @@ func (r *Inputer) calculateAndSetState(
roomInfo types.RoomInfo,
stateAtEvent *types.StateAtEvent,
event gomatrixserverlib.Event,
isRejected bool,
) error {
var err error
roomState := state.NewStateResolution(r.DB, roomInfo)
if input.HasState {
if input.HasState && !isRejected {
// Check here if we think we're in the room already.
stateAtEvent.Overwrite = true
var joinEventNIDs []types.EventNID
@ -198,7 +201,7 @@ func (r *Inputer) calculateAndSetState(
stateAtEvent.Overwrite = false
// We haven't been told what the state at the event is so we need to calculate it from the prev_events
if stateAtEvent.BeforeStateSnapshotNID, err = roomState.CalculateAndStoreStateBeforeEvent(ctx, event); err != nil {
if stateAtEvent.BeforeStateSnapshotNID, err = roomState.CalculateAndStoreStateBeforeEvent(ctx, event, isRejected); err != nil {
return fmt.Errorf("roomState.CalculateAndStoreStateBeforeEvent: %w", err)
}
}

View file

@ -70,6 +70,7 @@ func (r *Queryer) QueryStateAfterEvents(
if err != nil {
switch err.(type) {
case types.MissingEventError:
util.GetLogger(ctx).Errorf("QueryStateAfterEvents: MissingEventError: %s", err)
return nil
default:
return err

View file

@ -159,7 +159,7 @@ func (v StateResolution) LoadCombinedStateAfterEvents(
}
fullState = append(fullState, entries...)
}
if prevState.IsStateEvent() {
if prevState.IsStateEvent() && !prevState.IsRejected {
// If the prev event was a state event then add an entry for the event itself
// so that we get the state after the event rather than the state before.
fullState = append(fullState, prevState.StateEntry)
@ -523,6 +523,7 @@ func init() {
func (v StateResolution) CalculateAndStoreStateBeforeEvent(
ctx context.Context,
event gomatrixserverlib.Event,
isRejected bool,
) (types.StateSnapshotNID, error) {
// Load the state at the prev events.
prevEventRefs := event.PrevEvents()
@ -561,7 +562,7 @@ func (v StateResolution) CalculateAndStoreStateAfterEvents(
if len(prevStates) == 1 {
prevState := prevStates[0]
if prevState.EventStateKeyNID == 0 {
if prevState.EventStateKeyNID == 0 || prevState.IsRejected {
// 3) None of the previous events were state events and they all
// have the same state, so this event has exactly the same state
// as the previous events.

View file

@ -89,7 +89,7 @@ const bulkSelectStateEventByIDSQL = "" +
" ORDER BY event_type_nid, event_state_key_nid ASC"
const bulkSelectStateAtEventByIDSQL = "" +
"SELECT event_type_nid, event_state_key_nid, event_nid, state_snapshot_nid FROM roomserver_events" +
"SELECT event_type_nid, event_state_key_nid, event_nid, state_snapshot_nid, is_rejected FROM roomserver_events" +
" WHERE event_id = ANY($1)"
const updateEventStateSQL = "" +
@ -258,6 +258,7 @@ func (s *eventStatements) BulkSelectStateAtEventByID(
&result.EventStateKeyNID,
&result.EventNID,
&result.BeforeStateSnapshotNID,
&result.IsRejected,
); err != nil {
return nil, err
}

View file

@ -64,7 +64,7 @@ const bulkSelectStateEventByIDSQL = "" +
" ORDER BY event_type_nid, event_state_key_nid ASC"
const bulkSelectStateAtEventByIDSQL = "" +
"SELECT event_type_nid, event_state_key_nid, event_nid, state_snapshot_nid FROM roomserver_events" +
"SELECT event_type_nid, event_state_key_nid, event_nid, state_snapshot_nid, is_rejected FROM roomserver_events" +
" WHERE event_id IN ($1)"
const updateEventStateSQL = "" +
@ -263,6 +263,7 @@ func (s *eventStatements) BulkSelectStateAtEventByID(
&result.EventStateKeyNID,
&result.EventNID,
&result.BeforeStateSnapshotNID,
&result.IsRejected,
); err != nil {
return nil, err
}

View file

@ -101,6 +101,9 @@ type StateAtEvent struct {
Overwrite bool
// The state before the event.
BeforeStateSnapshotNID StateSnapshotNID
// True if this StateEntry is rejected. State resolution should then treat this
// StateEntry as being a message event (not a state event).
IsRejected bool
// The state entry for the event itself, allows us to calculate the state after the event.
StateEntry
}

View file

@ -471,3 +471,5 @@ We can't peek into rooms with invited history_visibility
We can't peek into rooms with joined history_visibility
Local users can peek by room alias
Peeked rooms only turn up in the sync for the device who peeked them
Room state at a rejected message event is the same as its predecessor
Room state at a rejected state event is the same as its predecessor