mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-07 06:03:09 -06:00
Refactor a bit, include in output events
This commit is contained in:
parent
9d70c70e11
commit
6d04ae10e4
|
|
@ -161,6 +161,8 @@ type OutputNewRoomEvent struct {
|
||||||
// The transaction ID of the send request if sent by a local user and one
|
// The transaction ID of the send request if sent by a local user and one
|
||||||
// was specified
|
// was specified
|
||||||
TransactionID *TransactionID `json:"transaction_id,omitempty"`
|
TransactionID *TransactionID `json:"transaction_id,omitempty"`
|
||||||
|
// The history visibility of the event.
|
||||||
|
HistoryVisibility string `json:"history_visibility"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (o *OutputNewRoomEvent) NeededStateEventIDs() ([]*gomatrixserverlib.HeaderedEvent, []string) {
|
func (o *OutputNewRoomEvent) NeededStateEventIDs() ([]*gomatrixserverlib.HeaderedEvent, []string) {
|
||||||
|
|
@ -187,7 +189,8 @@ func (o *OutputNewRoomEvent) NeededStateEventIDs() ([]*gomatrixserverlib.Headere
|
||||||
// should build their current room state up from OutputNewRoomEvents only.
|
// should build their current room state up from OutputNewRoomEvents only.
|
||||||
type OutputOldRoomEvent struct {
|
type OutputOldRoomEvent struct {
|
||||||
// The Event.
|
// The Event.
|
||||||
Event *gomatrixserverlib.HeaderedEvent `json:"event"`
|
Event *gomatrixserverlib.HeaderedEvent `json:"event"`
|
||||||
|
HistoryVisibility string `json:"history_visibility"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// An OutputNewInviteEvent is written whenever an invite becomes active.
|
// An OutputNewInviteEvent is written whenever an invite becomes active.
|
||||||
|
|
|
||||||
|
|
@ -296,58 +296,18 @@ func (r *Inputer) processRoomEvent(
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get the state before the event so that we can work out if the event was
|
// Get the state before the event so that we can work out if the event was
|
||||||
// allowed at the time. Don't bother doing this if the event is already
|
// allowed at the time, and also to get the history visibility. We won't
|
||||||
// rejected since it's just wasting CPU time.
|
// bother doing this if the event was already rejected as it just ends up
|
||||||
|
// burning CPU time.
|
||||||
|
historyVisibility := "joined" // Default to restrictive.
|
||||||
if rejectionErr == nil {
|
if rejectionErr == nil {
|
||||||
var stateBeforeEvent []*gomatrixserverlib.Event
|
var err error
|
||||||
if input.HasState {
|
historyVisibility, rejectionErr, err = r.processStateBefore(ctx, input, missingPrev)
|
||||||
// If we're overriding the state then we need to go and retrieve
|
if err != nil {
|
||||||
// them from the database. It's a hard error if they are missing.
|
return fmt.Errorf("r.processStateBefore: %w", err)
|
||||||
stateEvents, err := r.DB.EventsFromIDs(ctx, input.StateEventIDs)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("r.DB.EventsFromIDs: %w", err)
|
|
||||||
}
|
|
||||||
stateBeforeEvent = make([]*gomatrixserverlib.Event, 0, len(stateEvents))
|
|
||||||
for _, entry := range stateEvents {
|
|
||||||
stateBeforeEvent = append(stateBeforeEvent, entry.Event)
|
|
||||||
}
|
|
||||||
} else if event.Type() != gomatrixserverlib.MRoomCreate && len(event.PrevEventIDs()) > 0 {
|
|
||||||
// For all non-create events, there must be prev events, so we'll
|
|
||||||
// ask the query API for the relevant tuples needed for auth.
|
|
||||||
tuplesNeeded := gomatrixserverlib.StateNeededForAuth([]*gomatrixserverlib.Event{event}).Tuples()
|
|
||||||
stateBeforeReq := &api.QueryStateAfterEventsRequest{
|
|
||||||
RoomID: event.RoomID(),
|
|
||||||
PrevEventIDs: event.PrevEventIDs(),
|
|
||||||
StateToFetch: tuplesNeeded,
|
|
||||||
}
|
|
||||||
stateBeforeRes := &api.QueryStateAfterEventsResponse{}
|
|
||||||
if err := r.Queryer.QueryStateAfterEvents(ctx, stateBeforeReq, stateBeforeRes); err != nil {
|
|
||||||
return fmt.Errorf("r.Queryer.QueryStateAfterEvents: %w", err)
|
|
||||||
}
|
|
||||||
switch {
|
|
||||||
case !stateBeforeRes.RoomExists:
|
|
||||||
rejectionErr = fmt.Errorf("room %q does not exist", event.RoomID())
|
|
||||||
isRejected = true
|
|
||||||
case !stateBeforeRes.PrevEventsExist:
|
|
||||||
rejectionErr = fmt.Errorf("prev events of %q are not known", event.EventID())
|
|
||||||
isRejected = true
|
|
||||||
default:
|
|
||||||
stateBeforeEvent = gomatrixserverlib.UnwrapEventHeaders(stateBeforeRes.StateEvents)
|
|
||||||
}
|
|
||||||
} else if event.Type() != gomatrixserverlib.MRoomCreate && len(event.PrevEventIDs()) == 0 {
|
|
||||||
// Non-create events that don't have any prev event IDs are
|
|
||||||
// impossible in theory, so reject them.
|
|
||||||
rejectionErr = fmt.Errorf("event %q should have prev events", event.EventID())
|
|
||||||
isRejected = true
|
|
||||||
}
|
}
|
||||||
if rejectionErr == nil {
|
if rejectionErr != nil {
|
||||||
// If we haven't rejected the event for some other reason by now,
|
isRejected = true
|
||||||
// see whether the event is allowed against the state at the time.
|
|
||||||
stateBeforeAuth := gomatrixserverlib.NewAuthEvents(stateBeforeEvent)
|
|
||||||
if rejectionErr = gomatrixserverlib.Allowed(event, &stateBeforeAuth); rejectionErr != nil {
|
|
||||||
isRejected = true
|
|
||||||
logger.WithError(rejectionErr).Warnf("Event %s not allowed at the time of the prev events", event.EventID())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -416,6 +376,7 @@ func (r *Inputer) processRoomEvent(
|
||||||
input.SendAsServer, // send as server
|
input.SendAsServer, // send as server
|
||||||
input.TransactionID, // transaction ID
|
input.TransactionID, // transaction ID
|
||||||
input.HasState, // rewrites state?
|
input.HasState, // rewrites state?
|
||||||
|
historyVisibility, // the history visibility
|
||||||
); err != nil {
|
); err != nil {
|
||||||
return fmt.Errorf("r.updateLatestEvents: %w", err)
|
return fmt.Errorf("r.updateLatestEvents: %w", err)
|
||||||
}
|
}
|
||||||
|
|
@ -424,7 +385,8 @@ func (r *Inputer) processRoomEvent(
|
||||||
{
|
{
|
||||||
Type: api.OutputTypeOldRoomEvent,
|
Type: api.OutputTypeOldRoomEvent,
|
||||||
OldRoomEvent: &api.OutputOldRoomEvent{
|
OldRoomEvent: &api.OutputOldRoomEvent{
|
||||||
Event: headered,
|
Event: headered,
|
||||||
|
HistoryVisibility: historyVisibility,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
@ -458,6 +420,94 @@ func (r *Inputer) processRoomEvent(
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// processStateBefore works out what the state is before the event and
|
||||||
|
// then checks the event auths against the state at the time. It also
|
||||||
|
// tries to determine what the history visibility was of the event at
|
||||||
|
// the time, so that it can be sent in the output event to downstream
|
||||||
|
// components.
|
||||||
|
// nolint:nakedret
|
||||||
|
func (r *Inputer) processStateBefore(
|
||||||
|
ctx context.Context,
|
||||||
|
input *api.InputRoomEvent,
|
||||||
|
missingPrev bool,
|
||||||
|
) (historyVisibility string, rejectionErr error, err error) {
|
||||||
|
historyVisibility = "joined" // Default to restrictive.
|
||||||
|
event := input.Event.Unwrap()
|
||||||
|
var stateBeforeEvent []*gomatrixserverlib.Event
|
||||||
|
if input.HasState {
|
||||||
|
// If we're overriding the state then we need to go and retrieve
|
||||||
|
// them from the database. It's a hard error if they are missing.
|
||||||
|
stateEvents, err := r.DB.EventsFromIDs(ctx, input.StateEventIDs)
|
||||||
|
if err != nil {
|
||||||
|
return "", nil, fmt.Errorf("r.DB.EventsFromIDs: %w", err)
|
||||||
|
}
|
||||||
|
stateBeforeEvent = make([]*gomatrixserverlib.Event, 0, len(stateEvents))
|
||||||
|
for _, entry := range stateEvents {
|
||||||
|
stateBeforeEvent = append(stateBeforeEvent, entry.Event)
|
||||||
|
}
|
||||||
|
} else if missingPrev {
|
||||||
|
// If we're missing prev events and still failed to fetch them
|
||||||
|
// before then we're stuck.
|
||||||
|
rejectionErr = fmt.Errorf("event %q should have prev events", event.EventID())
|
||||||
|
return
|
||||||
|
} else if event.Type() != gomatrixserverlib.MRoomCreate && len(event.PrevEventIDs()) > 0 {
|
||||||
|
// For all non-create events, there must be prev events, so we'll
|
||||||
|
// ask the query API for the relevant tuples needed for auth. We
|
||||||
|
// will include the history visibility here even though we don't
|
||||||
|
// actually need it for auth, because we want to send it in the
|
||||||
|
// output events.
|
||||||
|
tuplesNeeded := gomatrixserverlib.StateNeededForAuth([]*gomatrixserverlib.Event{event}).Tuples()
|
||||||
|
tuplesNeeded = append(tuplesNeeded, gomatrixserverlib.StateKeyTuple{
|
||||||
|
EventType: gomatrixserverlib.MRoomHistoryVisibility,
|
||||||
|
StateKey: "",
|
||||||
|
})
|
||||||
|
stateBeforeReq := &api.QueryStateAfterEventsRequest{
|
||||||
|
RoomID: event.RoomID(),
|
||||||
|
PrevEventIDs: event.PrevEventIDs(),
|
||||||
|
StateToFetch: tuplesNeeded,
|
||||||
|
}
|
||||||
|
stateBeforeRes := &api.QueryStateAfterEventsResponse{}
|
||||||
|
if err := r.Queryer.QueryStateAfterEvents(ctx, stateBeforeReq, stateBeforeRes); err != nil {
|
||||||
|
return "", nil, fmt.Errorf("r.Queryer.QueryStateAfterEvents: %w", err)
|
||||||
|
}
|
||||||
|
switch {
|
||||||
|
case !stateBeforeRes.RoomExists:
|
||||||
|
rejectionErr = fmt.Errorf("room %q does not exist", event.RoomID())
|
||||||
|
return
|
||||||
|
case !stateBeforeRes.PrevEventsExist:
|
||||||
|
rejectionErr = fmt.Errorf("prev events of %q are not known", event.EventID())
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
stateBeforeEvent = gomatrixserverlib.UnwrapEventHeaders(stateBeforeRes.StateEvents)
|
||||||
|
}
|
||||||
|
} else if event.Type() != gomatrixserverlib.MRoomCreate && len(event.PrevEventIDs()) == 0 {
|
||||||
|
// Non-create events that don't have any prev event IDs are
|
||||||
|
// impossible in theory, so reject them.
|
||||||
|
rejectionErr = fmt.Errorf("event %q should have prev events", event.EventID())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if rejectionErr == nil {
|
||||||
|
// If we haven't rejected the event for some other reason by now,
|
||||||
|
// see whether the event is allowed against the state at the time.
|
||||||
|
stateBeforeAuth := gomatrixserverlib.NewAuthEvents(stateBeforeEvent)
|
||||||
|
if rejectionErr = gomatrixserverlib.Allowed(event, &stateBeforeAuth); rejectionErr != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Work out what the history visibility was at the time of the
|
||||||
|
// event.
|
||||||
|
for _, event := range stateBeforeEvent {
|
||||||
|
if event.Type() != gomatrixserverlib.MRoomHistoryVisibility || !event.StateKeyEquals("") {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if hisVis, err := event.HistoryVisibility(); err == nil {
|
||||||
|
historyVisibility = hisVis
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// fetchAuthEvents will check to see if any of the
|
// fetchAuthEvents will check to see if any of the
|
||||||
// auth events specified by the given event are unknown. If they are
|
// auth events specified by the given event are unknown. If they are
|
||||||
// then we will go off and request them from the federation and then
|
// then we will go off and request them from the federation and then
|
||||||
|
|
|
||||||
|
|
@ -56,6 +56,7 @@ func (r *Inputer) updateLatestEvents(
|
||||||
sendAsServer string,
|
sendAsServer string,
|
||||||
transactionID *api.TransactionID,
|
transactionID *api.TransactionID,
|
||||||
rewritesState bool,
|
rewritesState bool,
|
||||||
|
historyVisibility string,
|
||||||
) (err error) {
|
) (err error) {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "updateLatestEvents")
|
span, ctx := opentracing.StartSpanFromContext(ctx, "updateLatestEvents")
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
@ -69,15 +70,16 @@ func (r *Inputer) updateLatestEvents(
|
||||||
defer sqlutil.EndTransactionWithCheck(updater, &succeeded, &err)
|
defer sqlutil.EndTransactionWithCheck(updater, &succeeded, &err)
|
||||||
|
|
||||||
u := latestEventsUpdater{
|
u := latestEventsUpdater{
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
api: r,
|
api: r,
|
||||||
updater: updater,
|
updater: updater,
|
||||||
roomInfo: roomInfo,
|
roomInfo: roomInfo,
|
||||||
stateAtEvent: stateAtEvent,
|
stateAtEvent: stateAtEvent,
|
||||||
event: event,
|
event: event,
|
||||||
sendAsServer: sendAsServer,
|
sendAsServer: sendAsServer,
|
||||||
transactionID: transactionID,
|
transactionID: transactionID,
|
||||||
rewritesState: rewritesState,
|
rewritesState: rewritesState,
|
||||||
|
historyVisibility: historyVisibility,
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = u.doUpdateLatestEvents(); err != nil {
|
if err = u.doUpdateLatestEvents(); err != nil {
|
||||||
|
|
@ -117,8 +119,9 @@ type latestEventsUpdater struct {
|
||||||
stateBeforeEventRemoves []types.StateEntry
|
stateBeforeEventRemoves []types.StateEntry
|
||||||
stateBeforeEventAdds []types.StateEntry
|
stateBeforeEventAdds []types.StateEntry
|
||||||
// The snapshots of current state before and after processing this event
|
// The snapshots of current state before and after processing this event
|
||||||
oldStateNID types.StateSnapshotNID
|
oldStateNID types.StateSnapshotNID
|
||||||
newStateNID types.StateSnapshotNID
|
newStateNID types.StateSnapshotNID
|
||||||
|
historyVisibility string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (u *latestEventsUpdater) doUpdateLatestEvents() error {
|
func (u *latestEventsUpdater) doUpdateLatestEvents() error {
|
||||||
|
|
@ -365,12 +368,13 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
ore := api.OutputNewRoomEvent{
|
ore := api.OutputNewRoomEvent{
|
||||||
Event: u.event.Headered(u.roomInfo.RoomVersion),
|
Event: u.event.Headered(u.roomInfo.RoomVersion),
|
||||||
RewritesState: u.rewritesState,
|
RewritesState: u.rewritesState,
|
||||||
LastSentEventID: u.lastEventIDSent,
|
LastSentEventID: u.lastEventIDSent,
|
||||||
LatestEventIDs: latestEventIDs,
|
LatestEventIDs: latestEventIDs,
|
||||||
TransactionID: u.transactionID,
|
TransactionID: u.transactionID,
|
||||||
SendAsServer: u.sendAsServer,
|
SendAsServer: u.sendAsServer,
|
||||||
|
HistoryVisibility: u.historyVisibility,
|
||||||
}
|
}
|
||||||
|
|
||||||
eventIDMap, err := u.stateEventMap()
|
eventIDMap, err := u.stateEventMap()
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue