Don't get stuck on mutexes:

This commit is contained in:
Neil Alexander 2021-12-15 15:45:53 +00:00
parent 2a46752d0b
commit df6a60f35e
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
4 changed files with 93 additions and 313 deletions

View file

@ -133,8 +133,6 @@ func fillInRooms(ctx context.Context, roomIDs []string, rsAPI roomserverAPI.Room
util.GetLogger(ctx).WithError(err).Error("QueryBulkStateContent failed")
return nil, err
}
util.GetLogger(ctx).Infof("room IDs: %+v", roomIDs)
util.GetLogger(ctx).Infof("State res: %+v", stateRes.Rooms)
chunk := make([]gomatrixserverlib.PublicRoom, len(roomIDs))
i := 0
for roomID, data := range stateRes.Rooms {

View file

@ -136,7 +136,7 @@ func (q *sendFIFOQueue) pop() (*inputTask, bool) {
type inputTask struct {
ctx context.Context
t *txnReq
event *gomatrixserverlib.Event
event *gomatrixserverlib.HeaderedEvent
wg *sync.WaitGroup
err error // written back by worker, only safe to read when all tasks are done
duration time.Duration // written back by worker, only safe to read when all tasks are done
@ -338,7 +338,7 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res
task := &inputTask{
ctx: ctx,
t: t,
event: event,
event: event.Headered(verRes.RoomVersion),
wg: &wg,
}
tasks = append(tasks, task)
@ -420,12 +420,6 @@ func (t *inputWorker) run() {
}
}
type roomNotFoundError struct {
roomID string
}
func (e roomNotFoundError) Error() string { return fmt.Sprintf("room %q not found", e.roomID) }
func (t *txnReq) processEDUs(ctx context.Context) {
for _, e := range t.EDUs {
eduCountTotal.Inc()
@ -568,41 +562,43 @@ func (t *txnReq) processDeviceListUpdate(ctx context.Context, e gomatrixserverli
}
}
func (t *txnReq) processEvent(ctx context.Context, e *gomatrixserverlib.Event) error {
func (t *txnReq) processEvent(_ context.Context, e *gomatrixserverlib.HeaderedEvent) error {
t.work = "" // reset from previous event
// Ask the roomserver if we know about the room and/or if we're joined
// to it. If we aren't then we won't bother processing the event.
joinedReq := api.QueryServerJoinedToRoomRequest{
RoomID: e.RoomID(),
}
var joinedRes api.QueryServerJoinedToRoomResponse
if err := t.rsAPI.QueryServerJoinedToRoom(ctx, &joinedReq, &joinedRes); err != nil {
return fmt.Errorf("t.rsAPI.QueryServerJoinedToRoom: %w", err)
}
/*
// Ask the roomserver if we know about the room and/or if we're joined
// to it. If we aren't then we won't bother processing the event.
joinedReq := api.QueryServerJoinedToRoomRequest{
RoomID: e.RoomID(),
}
var joinedRes api.QueryServerJoinedToRoomResponse
if err := t.rsAPI.QueryServerJoinedToRoom(ctx, &joinedReq, &joinedRes); err != nil {
return fmt.Errorf("t.rsAPI.QueryServerJoinedToRoom: %w", err)
}
if !joinedRes.RoomExists || !joinedRes.IsInRoom {
// We don't believe we're a member of this room, therefore there's
// no point in wasting work trying to figure out what to do with
// missing auth or prev events. Drop the event.
return roomNotFoundError{e.RoomID()}
}
if !joinedRes.RoomExists || !joinedRes.IsInRoom {
// We don't believe we're a member of this room, therefore there's
// no point in wasting work trying to figure out what to do with
// missing auth or prev events. Drop the event.
return roomNotFoundError{e.RoomID()}
}
// Work out if the roomserver knows everything it needs to know to auth
// the event. This includes the prev_events and auth_events.
// NOTE! This is going to include prev_events that have an empty state
// snapshot. This is because we will need to re-request the event, and
// it's /state_ids, in order for it to exist in the roomserver correctly
// before the roomserver tries to work out
stateReq := api.QueryMissingAuthPrevEventsRequest{
RoomID: e.RoomID(),
AuthEventIDs: nil, //e.AuthEventIDs(),
PrevEventIDs: nil, //e.PrevEventIDs(),
}
var stateResp api.QueryMissingAuthPrevEventsResponse
if err := t.rsAPI.QueryMissingAuthPrevEvents(ctx, &stateReq, &stateResp); err != nil {
return fmt.Errorf("t.rsAPI.QueryMissingAuthPrevEvents: %w", err)
}
// Work out if the roomserver knows everything it needs to know to auth
// the event. This includes the prev_events and auth_events.
// NOTE! This is going to include prev_events that have an empty state
// snapshot. This is because we will need to re-request the event, and
// it's /state_ids, in order for it to exist in the roomserver correctly
// before the roomserver tries to work out
stateReq := api.QueryMissingAuthPrevEventsRequest{
RoomID: e.RoomID(),
AuthEventIDs: nil, //e.AuthEventIDs(),
PrevEventIDs: nil, //e.PrevEventIDs(),
}
var stateResp api.QueryMissingAuthPrevEventsResponse
if err := t.rsAPI.QueryMissingAuthPrevEvents(ctx, &stateReq, &stateResp); err != nil {
return fmt.Errorf("t.rsAPI.QueryMissingAuthPrevEvents: %w", err)
}
*/
// pass the event to the roomserver which will do auth checks
// If the event fail auth checks, gmsl.NotAllowed error will be returned which we be silently
@ -611,9 +607,7 @@ func (t *txnReq) processEvent(ctx context.Context, e *gomatrixserverlib.Event) e
context.Background(),
t.rsAPI,
api.KindNew,
[]*gomatrixserverlib.HeaderedEvent{
e.Headered(stateResp.RoomVersion),
},
[]*gomatrixserverlib.HeaderedEvent{e},
t.Origin,
api.DoNotSendToOtherServers,
nil,

View file

@ -64,7 +64,7 @@ var processRoomEventDuration = prometheus.NewHistogramVec(
func (r *Inputer) processRoomEvent(
ctx context.Context,
input *api.InputRoomEvent,
) (eventID string, err error) {
) (string, error) {
// Measure how long it takes to process this event.
started := time.Now()
defer func() {
@ -105,26 +105,37 @@ func (r *Inputer) processRoomEvent(
}
}
missingReq := &api.QueryMissingAuthPrevEventsRequest{
RoomID: event.RoomID(),
AuthEventIDs: event.AuthEventIDs(),
PrevEventIDs: event.PrevEventIDs(),
}
missingRes := &api.QueryMissingAuthPrevEventsResponse{}
serverRes := &fedapi.QueryJoinedHostServerNamesInRoomResponse{}
if event.Type() != gomatrixserverlib.MRoomCreate {
if err = r.Queryer.QueryMissingAuthPrevEvents(ctx, missingReq, missingRes); err != nil {
missingReq := &api.QueryMissingAuthPrevEventsRequest{
RoomID: event.RoomID(),
AuthEventIDs: event.AuthEventIDs(),
PrevEventIDs: event.PrevEventIDs(),
}
if err := r.Queryer.QueryMissingAuthPrevEvents(ctx, missingReq, missingRes); err != nil {
return "", fmt.Errorf("r.Queryer.QueryMissingAuthPrevEvents: %w", err)
}
}
if len(missingRes.MissingAuthEventIDs) > 0 || len(missingRes.MissingPrevEventIDs) > 0 {
serverReq := &fedapi.QueryJoinedHostServerNamesInRoomRequest{
RoomID: event.RoomID(),
}
if err := r.FSAPI.QueryJoinedHostServerNamesInRoom(ctx, serverReq, serverRes); err != nil {
return "", fmt.Errorf("r.FSAPI.QueryJoinedHostServerNamesInRoom: %w", err)
}
}
// First of all, check that the auth events of the event are known.
// If they aren't then we will ask the federation API for them.
isRejected := false
authEvents := gomatrixserverlib.NewAuthEvents(nil)
knownEvents := map[string]*types.Event{}
if err = r.checkForMissingAuthEvents(ctx, logger, input.Event, &authEvents, knownEvents); err != nil {
logger.Println("Starting to check for missing auth events")
if err := r.checkForMissingAuthEvents(ctx, logger, input.Event, &authEvents, knownEvents, serverRes.ServerNames); err != nil {
return "", fmt.Errorf("r.checkForMissingAuthEvents: %w", err)
}
logger.Println("Checked for missing auth events")
// Check if the event is allowed by its auth events. If it isn't then
// we consider the event to be "rejected" — it will still be persisted.
@ -138,6 +149,9 @@ func (r *Inputer) processRoomEvent(
authEventIDs := event.AuthEventIDs()
authEventNIDs := make([]types.EventNID, 0, len(authEventIDs))
for _, authEventID := range authEventIDs {
if _, ok := knownEvents[authEventID]; !ok {
return "", fmt.Errorf("missing auth event %s", authEventID)
}
authEventNIDs = append(authEventNIDs, knownEvents[authEventID].EventNID)
}
@ -145,6 +159,7 @@ func (r *Inputer) processRoomEvent(
if input.Kind == api.KindNew {
// Check that the event passes authentication checks based on the
// current room state.
var err error
softfail, err = helpers.CheckForSoftFail(ctx, r.DB, headered, input.StateEventIDs)
if err != nil {
logger.WithError(err).Info("Error authing soft-failed event")
@ -187,6 +202,7 @@ func (r *Inputer) processRoomEvent(
}
if len(missingRes.MissingPrevEventIDs) > 0 {
logger.Println("Starting to check for missing prev events")
missingState := missingStateReq{
origin: input.Origin,
inputer: r,
@ -201,6 +217,7 @@ func (r *Inputer) processRoomEvent(
if err = missingState.processEventWithMissingState(ctx, input.Event.Unwrap(), roomInfo.RoomVersion); err != nil {
return "", fmt.Errorf("r.checkForMissingPrevEvents: %w", err)
}
logger.Println("Checked for missing prev events")
}
if stateAtEvent.BeforeStateSnapshotNID == 0 {
@ -274,6 +291,7 @@ func (r *Inputer) checkForMissingAuthEvents(
event *gomatrixserverlib.HeaderedEvent,
auth *gomatrixserverlib.AuthEvents,
known map[string]*types.Event,
servers []gomatrixserverlib.ServerName,
) error {
authEventIDs := event.AuthEventIDs()
if len(authEventIDs) == 0 {
@ -300,20 +318,11 @@ func (r *Inputer) checkForMissingAuthEvents(
if len(unknown) > 0 {
logger.Printf("XXX: There are %d missing auth events", len(unknown))
serverReq := &fedapi.QueryJoinedHostServerNamesInRoomRequest{
RoomID: event.RoomID(),
}
serverRes := &fedapi.QueryJoinedHostServerNamesInRoomResponse{}
if err = r.FSAPI.QueryJoinedHostServerNamesInRoom(ctx, serverReq, serverRes); err != nil {
return fmt.Errorf("r.FSAPI.QueryJoinedHostServerNamesInRoom: %w", err)
}
logger.Printf("XXX: Asking servers %+v", serverRes.ServerNames)
logger.Printf("XXX: Asking servers %+v", servers)
var res gomatrixserverlib.RespEventAuth
var found bool
for _, serverName := range serverRes.ServerNames {
for _, serverName := range servers {
res, err = r.FSAPI.GetEventAuth(ctx, serverName, event.RoomID(), event.EventID())
if err != nil {
logger.WithError(err).Warnf("Failed to get event auth from federation for %q: %s", event.EventID(), err)
@ -324,7 +333,7 @@ func (r *Inputer) checkForMissingAuthEvents(
break
}
if !found {
logger.Printf("XXX: None of the %d servers provided us with auth events", len(serverRes.ServerNames))
logger.Printf("XXX: None of the %d servers provided us with auth events", len(servers))
return fmt.Errorf("no servers provided event auth")
}
@ -334,7 +343,7 @@ func (r *Inputer) checkForMissingAuthEvents(
) {
// If we already know about this event then we don't need to store
// it or do anything further with it.
if _, ok := known[event.EventID()]; ok {
if ev, ok := known[event.EventID()]; ok && ev != nil {
continue
}
@ -358,7 +367,6 @@ func (r *Inputer) checkForMissingAuthEvents(
}
// Let's take a note of the fact that we now know about this event.
known[event.EventID()] = nil
if err := auth.AddEvent(event); err != nil {
return fmt.Errorf("auth.AddEvent: %w", err)
}
@ -387,228 +395,6 @@ func (r *Inputer) checkForMissingAuthEvents(
return nil
}
/*
func (r *Inputer) checkForMissingPrevEvents(
ctx context.Context,
logger *logrus.Entry,
event *gomatrixserverlib.HeaderedEvent,
roomInfo *types.RoomInfo,
known map[string]*types.Event,
) error {
prevStates := map[string]*types.StateAtEvent{}
prevEventIDs := event.PrevEventIDs()
if len(prevEventIDs) == 0 && event.Type() != gomatrixserverlib.MRoomCreate {
return fmt.Errorf("expected to find some prev events for event type %q", event.Type())
}
for _, eventID := range prevEventIDs {
state, err := r.DB.StateAtEventIDs(ctx, []string{eventID})
if err != nil {
if _, ok := err.(types.MissingEventError); ok {
continue
}
return fmt.Errorf("r.DB.StateAtEventIDs: %w", err)
}
if len(state) == 1 {
prevStates[eventID] = &state[0]
continue
}
}
// If we know all of the states of the previous events then there is nothing more to
// do here, as the state across them will be resolved later.
if len(prevStates) == len(prevEventIDs) {
return nil
}
if r.FSAPI == nil {
return fmt.Errorf("cannot satisfy missing events without federation")
}
// Ask the federation API which servers we should ask. In theory the roomserver
// doesn't need the help of the federation API to do this because we already know
// all of the membership states, it's just that the federation API tracks this in
// a table for this purpose. TODO: Work out what makes most sense here.
serverReq := &fedapi.QueryJoinedHostServerNamesInRoomRequest{
RoomID: event.RoomID(),
}
serverRes := &fedapi.QueryJoinedHostServerNamesInRoomResponse{}
if err := r.FSAPI.QueryJoinedHostServerNamesInRoom(ctx, serverReq, serverRes); err != nil {
return fmt.Errorf("r.FSAPI.QueryJoinedHostServerNamesInRoom: %w", err)
}
// Attempt to fill in the gap using /get_missing_events
// This will either:
// - fill in the gap completely then process event `e` returning no backwards extremity
// - fail to fill in the gap and tell us to terminate the transaction err=not nil
// - fail to fill in the gap and tell us to fetch state at the new backwards extremity, and to not terminate the transaction
newEvents, err := r.getMissingEvents(ctx, logger, event, roomInfo, serverRes.ServerNames, known)
if err != nil {
return err
}
if len(newEvents) == 0 {
return fmt.Errorf("/get_missing_events returned no new events")
}
return nil
}
func (r *Inputer) getMissingEvents(
ctx context.Context,
logger *logrus.Entry,
event *gomatrixserverlib.HeaderedEvent,
roomInfo *types.RoomInfo,
servers []gomatrixserverlib.ServerName,
known map[string]*types.Event,
) (newEvents []*gomatrixserverlib.Event, err error) {
logger.Printf("XXX: get_missing_events called")
needed := gomatrixserverlib.StateNeededForAuth([]*gomatrixserverlib.Event{event.Unwrap()})
// Ask the roomserver for our current forward extremities. These will form
// the "earliest" part of the `/get_missing_events` request.
req := &api.QueryLatestEventsAndStateRequest{
RoomID: event.RoomID(),
StateToFetch: needed.Tuples(),
}
res := &api.QueryLatestEventsAndStateResponse{}
if err = r.Queryer.QueryLatestEventsAndState(ctx, req, res); err != nil {
logger.WithError(err).Warn("Failed to query latest events")
return nil, err
}
// Accumulate the event IDs of our forward extremities for use in the request.
latestEvents := make([]string, len(res.LatestEvents))
for i := range res.LatestEvents {
latestEvents[i] = res.LatestEvents[i].EventID
}
var missingResp *gomatrixserverlib.RespMissingEvents
for _, server := range servers {
logger.Printf("XXX: Calling /get_missing_events via %q", server)
var m gomatrixserverlib.RespMissingEvents
if m, err = r.FSAPI.LookupMissingEvents(ctx, server, event.RoomID(), gomatrixserverlib.MissingEvents{
Limit: 20,
EarliestEvents: latestEvents,
LatestEvents: []string{event.EventID()},
}, event.RoomVersion); err == nil {
missingResp = &m
break
} else if errors.Is(err, context.DeadlineExceeded) {
break
}
}
if missingResp == nil {
return nil, fmt.Errorf("/get_missing_events failed via all candidate servers")
}
if len(missingResp.Events) == 0 {
return nil, fmt.Errorf("/get_missing_events returned no events")
}
// security: how we handle failures depends on whether or not this event will become the new forward extremity for the room.
// There's 2 scenarios to consider:
// - Case A: We got pushed an event and are now fetching missing prev_events. (isInboundTxn=true)
// - Case B: We are fetching missing prev_events already and now fetching some more (isInboundTxn=false)
// In Case B, we know for sure that the event we are currently processing will not become the new forward extremity for the room,
// as it was called in response to an inbound txn which had it as a prev_event.
// In Case A, the event is a forward extremity, and could eventually become the _only_ forward extremity in the room. This is bad
// because it means we would trust the state at that event to be the state for the entire room, and allows rooms to be hijacked.
// https://github.com/matrix-org/synapse/pull/3456
// https://github.com/matrix-org/synapse/blob/229eb81498b0fe1da81e9b5b333a0285acde9446/synapse/handlers/federation.py#L335
// For now, we do not allow Case B, so reject the event.
logger.Printf("XXX: get_missing_events returned %d events", len(missingResp.Events))
newEvents = gomatrixserverlib.ReverseTopologicalOrdering(
missingResp.Events,
gomatrixserverlib.TopologicalOrderByPrevEvents,
)
for _, pe := range event.PrevEventIDs() {
hasPrevEvent := false
for _, ev := range newEvents {
if ev.EventID() == pe {
hasPrevEvent = true
break
}
}
if !hasPrevEvent {
logger.Errorf("Prev event %q is still missing after /get_missing_events", pe)
}
}
backwardExtremity := newEvents[0]
fastForwardEvents := newEvents[1:]
// Do we know about the state of the backward extremity already?
if _, err := r.DB.StateAtEventIDs(ctx, []string{backwardExtremity.EventID()}); err == nil {
// Yes, we do, so we don't need to store that event.
} else {
// No, we don't, so let's go find it.
// r.FSAPI.LookupStateIDs()
}
for _, ev := range fastForwardEvents {
if _, err := r.processRoomEvent(ctx, &api.InputRoomEvent{
Kind: api.KindOld,
Event: ev.Headered(event.RoomVersion),
AuthEventIDs: ev.AuthEventIDs(),
}); err != nil {
return nil, fmt.Errorf("r.processRoomEvent (prev event): %w", err)
}
}
return newEvents, nil
}
func (r *Inputer) lookupStateBeforeEvent(
ctx context.Context,
logger *logrus.Entry,
event *gomatrixserverlib.HeaderedEvent,
roomInfo *types.RoomInfo,
servers []gomatrixserverlib.ServerName,
) error {
knownPrevStates := map[string]types.StateAtEvent{}
unknownPrevStates := map[string]struct{}{}
neededStateEvents := map[string]struct{}{}
for _, prevEventID := range event.PrevEventIDs() {
if state, err := r.DB.StateAtEventIDs(ctx, []string{prevEventID}); err == nil && len(state) == 1 {
knownPrevStates[prevEventID] = state[0]
} else {
unknownPrevStates[prevEventID] = struct{}{}
}
}
for prevEventID := range unknownPrevStates {
stateIDs, err := r.FSAPI.LookupStateIDs(ctx, "TODO: SERVER", event.RoomID(), prevEventID)
if err != nil {
return fmt.Errorf("r.FSAPI.LookupStateIDs: %w", err)
}
events, err := r.DB.EventsFromIDs(ctx, stateIDs.StateEventIDs)
if err != nil {
return fmt.Errorf("r.DB.EventsFromIDs: %w", err)
}
for i, eventID := range stateIDs.StateEventIDs {
if events[i].Event == nil || events[i].EventNID == 0 {
neededStateEvents[eventID] = struct{}{}
}
}
if len(neededStateEvents) > (len(stateIDs.StateEventIDs) / 2) {
// More than 50% of the state events are missing, so let's just
// call `/state` instead of fetching the events individually.
state, err := r.FSAPI.LookupState(ctx, "", event.RoomID(), prevEventID, roomInfo.RoomVersion)
if err != nil {
return fmt.Errorf("r.FSAPI.LookupState: %w", err)
}
knownPrevStates[prevEventID] = types.StateAtEvent{
StateEntry: types.StateEntry{},
}
}
}
return nil
}
*/
func (r *Inputer) calculateAndSetState(
ctx context.Context,
input *api.InputRoomEvent,

View file

@ -146,37 +146,39 @@ func (t *missingStateReq) processEventWithMissingState(
}
t.hadEventsMutex.Unlock()
err = api.SendEventWithState(
context.Background(),
t.inputer,
api.KindOld,
resolvedState,
backwardsExtremity.Headered(roomVersion),
t.origin,
hadEvents,
)
stateIDs := make([]string, len(resolvedState.StateEvents))
for _, event := range resolvedState.StateEvents {
stateIDs = append(stateIDs, event.EventID())
}
_, err = t.inputer.processRoomEvent(ctx, &api.InputRoomEvent{
Kind: api.KindOld,
Event: backwardsExtremity.Headered(roomVersion),
Origin: t.origin,
AuthEventIDs: backwardsExtremity.AuthEventIDs(),
HasState: true,
StateEventIDs: stateIDs,
SendAsServer: api.DoNotSendToOtherServers,
})
if err != nil {
return fmt.Errorf("api.SendEventWithState: %w", err)
return fmt.Errorf("t.inputer.processRoomEvent: %w", err)
}
// Then send all of the newer backfilled events, of which will all be newer
// than the backward extremity, into the roomserver without state. This way
// they will automatically fast-forward based on the room state at the
// extremity in the last step.
headeredNewEvents := make([]*gomatrixserverlib.HeaderedEvent, len(newEvents))
for i, newEvent := range newEvents {
headeredNewEvents[i] = newEvent.Headered(roomVersion)
}
if err = api.SendEvents(
context.Background(),
t.inputer,
api.KindOld,
append(headeredNewEvents, e.Headered(roomVersion)),
t.origin,
api.DoNotSendToOtherServers,
nil,
); err != nil {
return fmt.Errorf("api.SendEvents: %w", err)
for _, newEvent := range newEvents {
_, err = t.inputer.processRoomEvent(ctx, &api.InputRoomEvent{
Kind: api.KindOld,
Event: newEvent.Headered(roomVersion),
Origin: t.origin,
AuthEventIDs: backwardsExtremity.AuthEventIDs(),
SendAsServer: api.DoNotSendToOtherServers,
})
if err != nil {
return fmt.Errorf("t.inputer.processRoomEvent: %w", err)
}
}
return nil
@ -627,7 +629,7 @@ func (t *missingStateReq) createRespStateFromStateIDs(stateIDs gomatrixserverlib
return &respState, nil
}
func (t *missingStateReq) lookupEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, roomID, missingEventID string, localFirst bool) (*gomatrixserverlib.HeaderedEvent, error) {
func (t *missingStateReq) lookupEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, _, missingEventID string, localFirst bool) (*gomatrixserverlib.HeaderedEvent, error) {
if localFirst {
// fetch from the roomserver
queryReq := api.QueryEventsByIDRequest{