Protect processEventWithMissingState with per-room mutex, to prevent mass CPU burn/RAM usage
Squashed commit of the following: commit 7fad77c10e3c1c78feddb37351812b209d9c0f25 Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Mon Jun 28 15:06:52 2021 +0100 Fix processEventWithMissingStateMutexes commit 138cddcac7b8373a8e1816a232f84a7bda6adcdf Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Mon Jun 28 13:59:44 2021 +0100 Use internal.MutexByRoom commit 6e6f026cfad31da391ad261cfec16d41dff1b15b Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Mon Jun 28 13:50:18 2021 +0100 Try to slow things down per room commit b97d406dff2e11769a9202fbf58b138a541ca449 Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Mon Jun 28 13:41:27 2021 +0100 Try to slow things down commit 8866120ebf880b4fd8a456937f69903e233c19a2 Merge: 9f2de8a24a37b19a
Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Mon Jun 28 13:40:33 2021 +0100 Merge branch 'neilalexander/rsinputfifo' into neilalexander/rsinputfifo2 commit4a37b19a8f
Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Mon Jun 28 13:34:54 2021 +0100 Add comments commitf9ab3f4b81
Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Mon Jun 28 13:31:21 2021 +0100 Tweaks commit 9f2de8a29cadec4c785d9c2e4e74c1138305f759 Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Mon Jun 28 13:15:59 2021 +0100 Ask origin only for missing things for now commit8fd878c75a
Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Mon Jun 28 11:18:11 2021 +0100 Make sure someone wakes up commitb63f699f1b
Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Mon Jun 28 11:12:58 2021 +0100 Use a FIFO queue instead of a channel to reduce backpressure
This commit is contained in:
parent
7c3991ee2f
commit
4417f24678
|
@ -486,14 +486,16 @@ func (t *txnReq) getServers(ctx context.Context, roomID string) []gomatrixserver
|
||||||
return t.servers
|
return t.servers
|
||||||
}
|
}
|
||||||
t.servers = []gomatrixserverlib.ServerName{t.Origin}
|
t.servers = []gomatrixserverlib.ServerName{t.Origin}
|
||||||
serverReq := &api.QueryServerJoinedToRoomRequest{
|
/*
|
||||||
RoomID: roomID,
|
serverReq := &api.QueryServerJoinedToRoomRequest{
|
||||||
}
|
RoomID: roomID,
|
||||||
serverRes := &api.QueryServerJoinedToRoomResponse{}
|
}
|
||||||
if err := t.rsAPI.QueryServerJoinedToRoom(ctx, serverReq, serverRes); err == nil {
|
serverRes := &api.QueryServerJoinedToRoomResponse{}
|
||||||
t.servers = append(t.servers, serverRes.ServerNames...)
|
if err := t.rsAPI.QueryServerJoinedToRoom(ctx, serverReq, serverRes); err == nil {
|
||||||
util.GetLogger(ctx).Infof("Found %d server(s) to query for missing events in %q", len(t.servers), roomID)
|
t.servers = append(t.servers, serverRes.ServerNames...)
|
||||||
}
|
util.GetLogger(ctx).Infof("Found %d server(s) to query for missing events in %q", len(t.servers), roomID)
|
||||||
|
}
|
||||||
|
*/
|
||||||
return t.servers
|
return t.servers
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -618,13 +620,18 @@ func checkAllowedByState(e *gomatrixserverlib.Event, stateEvents []*gomatrixserv
|
||||||
return gomatrixserverlib.Allowed(e, &authUsingState)
|
return gomatrixserverlib.Allowed(e, &authUsingState)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var processEventWithMissingStateMutexes = internal.NewMutexByRoom()
|
||||||
|
|
||||||
func (t *txnReq) processEventWithMissingState(
|
func (t *txnReq) processEventWithMissingState(
|
||||||
ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion,
|
ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion,
|
||||||
) error {
|
) error {
|
||||||
|
processEventWithMissingStateMutexes.Lock(e.RoomID())
|
||||||
|
defer processEventWithMissingStateMutexes.Unlock(e.RoomID())
|
||||||
|
|
||||||
// Do this with a fresh context, so that we keep working even if the
|
// Do this with a fresh context, so that we keep working even if the
|
||||||
// original request times out. With any luck, by the time the remote
|
// original request times out. With any luck, by the time the remote
|
||||||
// side retries, we'll have fetched the missing state.
|
// side retries, we'll have fetched the missing state.
|
||||||
gmectx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
|
gmectx, cancel := context.WithTimeout(context.Background(), time.Minute)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
// We are missing the previous events for this events.
|
// We are missing the previous events for this events.
|
||||||
// This means that there is a gap in our view of the history of the
|
// This means that there is a gap in our view of the history of the
|
||||||
|
|
Loading…
Reference in a new issue