mirror of
https://github.com/matrix-org/dendrite.git
synced 2024-11-22 14:21:55 -06:00
Speed up start up time by batch querying ACL events (#3334)
This should significantly speed up start up times on servers with many rooms.
This commit is contained in:
parent
8f944f6434
commit
f4e77453cb
|
@ -958,7 +958,8 @@ func TestCapabilities(t *testing.T) {
|
||||||
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
|
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
|
||||||
|
|
||||||
// Needed to create accounts
|
// Needed to create accounts
|
||||||
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, nil, caching.DisableMetrics)
|
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
|
||||||
|
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
|
||||||
rsAPI.SetFederationAPI(nil, nil)
|
rsAPI.SetFederationAPI(nil, nil)
|
||||||
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
|
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
|
||||||
// We mostly need the rsAPI/userAPI for this test, so nil for other APIs etc.
|
// We mostly need the rsAPI/userAPI for this test, so nil for other APIs etc.
|
||||||
|
@ -1005,7 +1006,8 @@ func TestTurnserver(t *testing.T) {
|
||||||
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
|
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
|
||||||
|
|
||||||
// Needed to create accounts
|
// Needed to create accounts
|
||||||
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, nil, caching.DisableMetrics)
|
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
|
||||||
|
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
|
||||||
rsAPI.SetFederationAPI(nil, nil)
|
rsAPI.SetFederationAPI(nil, nil)
|
||||||
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
|
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
|
||||||
//rsAPI.SetUserAPI(userAPI)
|
//rsAPI.SetUserAPI(userAPI)
|
||||||
|
@ -1103,7 +1105,8 @@ func Test3PID(t *testing.T) {
|
||||||
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
|
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
|
||||||
|
|
||||||
// Needed to create accounts
|
// Needed to create accounts
|
||||||
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, nil, caching.DisableMetrics)
|
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
|
||||||
|
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
|
||||||
rsAPI.SetFederationAPI(nil, nil)
|
rsAPI.SetFederationAPI(nil, nil)
|
||||||
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
|
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
|
||||||
// We mostly need the rsAPI/userAPI for this test, so nil for other APIs etc.
|
// We mostly need the rsAPI/userAPI for this test, so nil for other APIs etc.
|
||||||
|
|
|
@ -23,7 +23,7 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/roomserver/types"
|
"github.com/matrix-org/dendrite/roomserver/storage/tables"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/matrix-org/gomatrixserverlib/spec"
|
"github.com/matrix-org/gomatrixserverlib/spec"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
|
@ -34,10 +34,10 @@ const MRoomServerACL = "m.room.server_acl"
|
||||||
type ServerACLDatabase interface {
|
type ServerACLDatabase interface {
|
||||||
// GetKnownRooms returns a list of all rooms we know about.
|
// GetKnownRooms returns a list of all rooms we know about.
|
||||||
GetKnownRooms(ctx context.Context) ([]string, error)
|
GetKnownRooms(ctx context.Context) ([]string, error)
|
||||||
// GetStateEvent returns the state event of a given type for a given room with a given state key
|
|
||||||
// If no event could be found, returns nil
|
// GetBulkStateContent returns all state events which match a given room ID and a given state key tuple. Both must be satisfied for a match.
|
||||||
// If there was an issue during the retrieval, returns an error
|
// If a tuple has the StateKey of '*' and allowWildcards=true then all state events with the EventType should be returned.
|
||||||
GetStateEvent(ctx context.Context, roomID, evType, stateKey string) (*types.HeaderedEvent, error)
|
GetBulkStateContent(ctx context.Context, roomIDs []string, tuples []gomatrixserverlib.StateKeyTuple, allowWildcards bool) ([]tables.StrippedEvent, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type ServerACLs struct {
|
type ServerACLs struct {
|
||||||
|
@ -58,15 +58,14 @@ func NewServerACLs(db ServerACLDatabase) *ServerACLs {
|
||||||
// For each room, let's see if we have a server ACL state event. If we
|
// For each room, let's see if we have a server ACL state event. If we
|
||||||
// do then we'll process it into memory so that we have the regexes to
|
// do then we'll process it into memory so that we have the regexes to
|
||||||
// hand.
|
// hand.
|
||||||
for _, room := range rooms {
|
|
||||||
state, err := db.GetStateEvent(ctx, room, MRoomServerACL, "")
|
events, err := db.GetBulkStateContent(ctx, rooms, []gomatrixserverlib.StateKeyTuple{{EventType: MRoomServerACL, StateKey: ""}}, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Errorf("Failed to get server ACLs for room %q", room)
|
logrus.WithError(err).Errorf("Failed to get server ACLs for all rooms: %q", err)
|
||||||
continue
|
|
||||||
}
|
|
||||||
if state != nil {
|
|
||||||
acls.OnServerACLUpdate(state.PDU)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for _, event := range events {
|
||||||
|
acls.OnServerACLUpdate(event)
|
||||||
}
|
}
|
||||||
return acls
|
return acls
|
||||||
}
|
}
|
||||||
|
@ -90,9 +89,9 @@ func compileACLRegex(orig string) (*regexp.Regexp, error) {
|
||||||
return regexp.Compile(escaped)
|
return regexp.Compile(escaped)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ServerACLs) OnServerACLUpdate(state gomatrixserverlib.PDU) {
|
func (s *ServerACLs) OnServerACLUpdate(strippedEvent tables.StrippedEvent) {
|
||||||
acls := &serverACL{}
|
acls := &serverACL{}
|
||||||
if err := json.Unmarshal(state.Content(), &acls.ServerACL); err != nil {
|
if err := json.Unmarshal([]byte(strippedEvent.ContentValue), &acls.ServerACL); err != nil {
|
||||||
logrus.WithError(err).Errorf("Failed to unmarshal state content for server ACLs")
|
logrus.WithError(err).Errorf("Failed to unmarshal state content for server ACLs")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -118,10 +117,10 @@ func (s *ServerACLs) OnServerACLUpdate(state gomatrixserverlib.PDU) {
|
||||||
"allow_ip_literals": acls.AllowIPLiterals,
|
"allow_ip_literals": acls.AllowIPLiterals,
|
||||||
"num_allowed": len(acls.allowedRegexes),
|
"num_allowed": len(acls.allowedRegexes),
|
||||||
"num_denied": len(acls.deniedRegexes),
|
"num_denied": len(acls.deniedRegexes),
|
||||||
}).Debugf("Updating server ACLs for %q", state.RoomID())
|
}).Debugf("Updating server ACLs for %q", strippedEvent.RoomID)
|
||||||
s.aclsMutex.Lock()
|
s.aclsMutex.Lock()
|
||||||
defer s.aclsMutex.Unlock()
|
defer s.aclsMutex.Unlock()
|
||||||
s.acls[state.RoomID().String()] = acls
|
s.acls[strippedEvent.RoomID] = acls
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ServerACLs) IsServerBannedFromRoom(serverName spec.ServerName, roomID string) bool {
|
func (s *ServerACLs) IsServerBannedFromRoom(serverName spec.ServerName, roomID string) bool {
|
||||||
|
|
|
@ -24,6 +24,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/storage/tables"
|
||||||
"github.com/tidwall/gjson"
|
"github.com/tidwall/gjson"
|
||||||
|
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
@ -509,7 +510,13 @@ func (r *Inputer) processRoomEvent(
|
||||||
logrus.WithError(err).Error("failed to get server ACLs")
|
logrus.WithError(err).Error("failed to get server ACLs")
|
||||||
}
|
}
|
||||||
if aclEvent != nil {
|
if aclEvent != nil {
|
||||||
r.ACLs.OnServerACLUpdate(aclEvent)
|
strippedEvent := tables.StrippedEvent{
|
||||||
|
RoomID: aclEvent.RoomID().String(),
|
||||||
|
EventType: aclEvent.Type(),
|
||||||
|
StateKey: *aclEvent.StateKey(),
|
||||||
|
ContentValue: string(aclEvent.Content()),
|
||||||
|
}
|
||||||
|
r.ACLs.OnServerACLUpdate(strippedEvent)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,6 +17,7 @@ package producers
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/storage/tables"
|
||||||
"github.com/nats-io/nats.go"
|
"github.com/nats-io/nats.go"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/tidwall/gjson"
|
"github.com/tidwall/gjson"
|
||||||
|
@ -75,7 +76,13 @@ func (r *RoomEventProducer) ProduceRoomEvents(roomID string, updates []api.Outpu
|
||||||
|
|
||||||
if eventType == acls.MRoomServerACL && update.NewRoomEvent.Event.StateKeyEquals("") {
|
if eventType == acls.MRoomServerACL && update.NewRoomEvent.Event.StateKeyEquals("") {
|
||||||
ev := update.NewRoomEvent.Event.PDU
|
ev := update.NewRoomEvent.Event.PDU
|
||||||
defer r.ACLs.OnServerACLUpdate(ev)
|
strippedEvent := tables.StrippedEvent{
|
||||||
|
RoomID: ev.RoomID().String(),
|
||||||
|
EventType: ev.Type(),
|
||||||
|
StateKey: *ev.StateKey(),
|
||||||
|
ContentValue: string(ev.Content()),
|
||||||
|
}
|
||||||
|
defer r.ACLs.OnServerACLUpdate(strippedEvent)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
logger.Tracef("Producing to topic '%s'", r.Topic)
|
logger.Tracef("Producing to topic '%s'", r.Topic)
|
||||||
|
|
|
@ -235,6 +235,10 @@ func ExtractContentValue(ev *types.HeaderedEvent) string {
|
||||||
key = "topic"
|
key = "topic"
|
||||||
case "m.room.guest_access":
|
case "m.room.guest_access":
|
||||||
key = "guest_access"
|
key = "guest_access"
|
||||||
|
case "m.room.server_acl":
|
||||||
|
// We need the entire content and not only one key, so we can use it
|
||||||
|
// on startup to generate the ACLs. This is merely a workaround.
|
||||||
|
return string(content)
|
||||||
}
|
}
|
||||||
result := gjson.GetBytes(content, key)
|
result := gjson.GetBytes(content, key)
|
||||||
if !result.Exists() {
|
if !result.Exists() {
|
||||||
|
|
Loading…
Reference in a new issue