mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-16 18:43:10 -06:00
Pull out creation of several NIDs; Add more caching
This commit is contained in:
parent
d16cb1edf9
commit
6ade9ed7a4
|
|
@ -7,6 +7,7 @@ import "github.com/matrix-org/dendrite/roomserver/types"
|
|||
type EventStateKeyCache interface {
|
||||
GetEventStateKey(eventStateKeyNID types.EventStateKeyNID) (string, bool)
|
||||
StoreEventStateKey(eventStateKeyNID types.EventStateKeyNID, eventStateKey string)
|
||||
GetEventStateKeyNID(eventStateKey string) (types.EventStateKeyNID, bool)
|
||||
}
|
||||
|
||||
func (c Caches) GetEventStateKey(eventStateKeyNID types.EventStateKeyNID) (string, bool) {
|
||||
|
|
@ -15,4 +16,26 @@ func (c Caches) GetEventStateKey(eventStateKeyNID types.EventStateKeyNID) (strin
|
|||
|
||||
func (c Caches) StoreEventStateKey(eventStateKeyNID types.EventStateKeyNID, eventStateKey string) {
|
||||
c.RoomServerStateKeys.Set(eventStateKeyNID, eventStateKey)
|
||||
c.RoomServerStateKeyNIDs.Set(eventStateKey, eventStateKeyNID)
|
||||
}
|
||||
|
||||
func (c Caches) GetEventStateKeyNID(eventStateKey string) (types.EventStateKeyNID, bool) {
|
||||
if eventStateKey == "" {
|
||||
return 1, true // 1 is the empty statekey as per the default value in the database
|
||||
}
|
||||
return c.RoomServerStateKeyNIDs.Get(eventStateKey)
|
||||
}
|
||||
|
||||
type EventTypeCache interface {
|
||||
GetEventTypeKey(eventType string) (types.EventTypeNID, bool)
|
||||
StoreEventTypeKey(eventTypeNID types.EventTypeNID, eventType string)
|
||||
}
|
||||
|
||||
func (c Caches) StoreEventTypeKey(eventTypeNID types.EventTypeNID, eventType string) {
|
||||
c.RoomServerEventTypeNIDs.Set(eventType, eventTypeNID)
|
||||
c.RoomServerEventTypes.Set(eventTypeNID, eventType)
|
||||
}
|
||||
|
||||
func (c Caches) GetEventTypeKey(eventType string) (types.EventTypeNID, bool) {
|
||||
return c.RoomServerEventTypeNIDs.Get(eventType)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -9,19 +9,28 @@ type RoomServerCaches interface {
|
|||
RoomVersionCache
|
||||
RoomServerEventsCache
|
||||
EventStateKeyCache
|
||||
EventTypeCache
|
||||
}
|
||||
|
||||
// RoomServerNIDsCache contains the subset of functions needed for
|
||||
// a roomserver NID cache.
|
||||
type RoomServerNIDsCache interface {
|
||||
GetRoomServerRoomID(roomNID types.RoomNID) (string, bool)
|
||||
// StoreRoomServerRoomID stores roomNID -> roomID and roomID -> roomNID
|
||||
StoreRoomServerRoomID(roomNID types.RoomNID, roomID string)
|
||||
GetRoomServerRoomNID(roomID string) (types.RoomNID, bool)
|
||||
}
|
||||
|
||||
func (c Caches) GetRoomServerRoomID(roomNID types.RoomNID) (string, bool) {
|
||||
return c.RoomServerRoomIDs.Get(roomNID)
|
||||
}
|
||||
|
||||
// StoreRoomServerRoomID stores roomNID -> roomID and roomID -> roomNID
|
||||
func (c Caches) StoreRoomServerRoomID(roomNID types.RoomNID, roomID string) {
|
||||
c.RoomServerRoomNIDs.Set(roomID, roomNID)
|
||||
c.RoomServerRoomIDs.Set(roomNID, roomID)
|
||||
}
|
||||
|
||||
func (c Caches) GetRoomServerRoomNID(roomID string) (types.RoomNID, bool) {
|
||||
return c.RoomServerRoomNIDs.Get(roomID)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -23,16 +23,19 @@ import (
|
|||
// different implementations as long as they satisfy the Cache
|
||||
// interface.
|
||||
type Caches struct {
|
||||
RoomVersions Cache[string, gomatrixserverlib.RoomVersion] // room ID -> room version
|
||||
ServerKeys Cache[string, gomatrixserverlib.PublicKeyLookupResult] // server name -> server keys
|
||||
RoomServerRoomNIDs Cache[string, types.RoomNID] // room ID -> room NID
|
||||
RoomServerRoomIDs Cache[types.RoomNID, string] // room NID -> room ID
|
||||
RoomServerEvents Cache[int64, *gomatrixserverlib.Event] // event NID -> event
|
||||
RoomServerStateKeys Cache[types.EventStateKeyNID, string] // event NID -> event state key
|
||||
FederationPDUs Cache[int64, *gomatrixserverlib.HeaderedEvent] // queue NID -> PDU
|
||||
FederationEDUs Cache[int64, *gomatrixserverlib.EDU] // queue NID -> EDU
|
||||
SpaceSummaryRooms Cache[string, gomatrixserverlib.MSC2946SpacesResponse] // room ID -> space response
|
||||
LazyLoading Cache[lazyLoadingCacheKey, string] // composite key -> event ID
|
||||
RoomVersions Cache[string, gomatrixserverlib.RoomVersion] // room ID -> room version
|
||||
ServerKeys Cache[string, gomatrixserverlib.PublicKeyLookupResult] // server name -> server keys
|
||||
RoomServerRoomNIDs Cache[string, types.RoomNID] // room ID -> room NID
|
||||
RoomServerRoomIDs Cache[types.RoomNID, string] // room NID -> room ID
|
||||
RoomServerEvents Cache[int64, *gomatrixserverlib.Event] // event NID -> event
|
||||
RoomServerStateKeys Cache[types.EventStateKeyNID, string] // eventStateKey NID -> event state key
|
||||
RoomServerStateKeyNIDs Cache[string, types.EventStateKeyNID] // event state key -> eventStateKey NID
|
||||
RoomServerEventTypeNIDs Cache[string, types.EventTypeNID] // eventType -> eventType NID
|
||||
RoomServerEventTypes Cache[types.EventTypeNID, string] // eventType NID -> eventType
|
||||
FederationPDUs Cache[int64, *gomatrixserverlib.HeaderedEvent] // queue NID -> PDU
|
||||
FederationEDUs Cache[int64, *gomatrixserverlib.EDU] // queue NID -> EDU
|
||||
SpaceSummaryRooms Cache[string, gomatrixserverlib.MSC2946SpacesResponse] // room ID -> space response
|
||||
LazyLoading Cache[lazyLoadingCacheKey, string] // composite key -> event ID
|
||||
}
|
||||
|
||||
// Cache is the interface that an implementation must satisfy.
|
||||
|
|
|
|||
|
|
@ -40,6 +40,9 @@ const (
|
|||
spaceSummaryRoomsCache
|
||||
lazyLoadingCache
|
||||
eventStateKeyCache
|
||||
eventTypeCache
|
||||
eventTypeNIDCache
|
||||
eventStateKeyNIDCache
|
||||
)
|
||||
|
||||
func NewRistrettoCache(maxCost config.DataUnit, maxAge time.Duration, enablePrometheus bool) *Caches {
|
||||
|
|
@ -105,6 +108,21 @@ func NewRistrettoCache(maxCost config.DataUnit, maxAge time.Duration, enableProm
|
|||
Prefix: eventStateKeyCache,
|
||||
MaxAge: maxAge,
|
||||
},
|
||||
RoomServerStateKeyNIDs: &RistrettoCachePartition[string, types.EventStateKeyNID]{ // eventStateKey -> eventStateKey NID
|
||||
cache: cache,
|
||||
Prefix: eventStateKeyNIDCache,
|
||||
MaxAge: maxAge,
|
||||
},
|
||||
RoomServerEventTypeNIDs: &RistrettoCachePartition[string, types.EventTypeNID]{ // eventType -> eventType NID
|
||||
cache: cache,
|
||||
Prefix: eventTypeCache,
|
||||
MaxAge: maxAge,
|
||||
},
|
||||
RoomServerEventTypes: &RistrettoCachePartition[types.EventTypeNID, string]{ // eventType NID -> eventType
|
||||
cache: cache,
|
||||
Prefix: eventTypeNIDCache,
|
||||
MaxAge: maxAge,
|
||||
},
|
||||
FederationPDUs: &RistrettoCostedCachePartition[int64, *gomatrixserverlib.HeaderedEvent]{ // queue NID -> PDU
|
||||
&RistrettoCachePartition[int64, *gomatrixserverlib.HeaderedEvent]{
|
||||
cache: cache,
|
||||
|
|
|
|||
|
|
@ -38,7 +38,18 @@ func TestIsInvitePendingWithoutNID(t *testing.T) {
|
|||
var authNIDs []types.EventNID
|
||||
for _, x := range room.Events() {
|
||||
|
||||
evNID, _, _, _, _, err := db.StoreEvent(context.Background(), x.Event, authNIDs, false)
|
||||
roomNID, err := db.GetOrCreateRoomNID(context.Background(), x.Unwrap())
|
||||
assert.NoError(t, err)
|
||||
assert.Greater(t, roomNID, types.RoomNID(0))
|
||||
|
||||
eventTypeNID, err := db.GetOrCreateEventTypeNID(context.Background(), x.Type())
|
||||
assert.NoError(t, err)
|
||||
assert.Greater(t, eventTypeNID, types.EventTypeNID(0))
|
||||
|
||||
eventStateKeyNID, err := db.GetOrCreateEventStateKeyNID(context.Background(), x.StateKey())
|
||||
assert.NoError(t, err)
|
||||
|
||||
evNID, _, _, _, err := db.StoreEvent(context.Background(), x.Event, roomNID, eventTypeNID, eventStateKeyNID, authNIDs, false)
|
||||
assert.NoError(t, err)
|
||||
authNIDs = append(authNIDs, evNID)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -332,8 +332,23 @@ func (r *Inputer) processRoomEvent(
|
|||
}
|
||||
}
|
||||
|
||||
roomNID, err := r.DB.GetOrCreateRoomNID(ctx, event)
|
||||
if err != nil {
|
||||
return fmt.Errorf("r.DB.GetOrCreateRoomNID: %w", err)
|
||||
}
|
||||
|
||||
eventTypeNID, err := r.DB.GetOrCreateEventTypeNID(ctx, event.Type())
|
||||
if err != nil {
|
||||
return fmt.Errorf("r.DB.GetOrCreateEventTypeNID: %w", err)
|
||||
}
|
||||
|
||||
eventStateKeyNID, err := r.DB.GetOrCreateEventStateKeyNID(ctx, event.StateKey())
|
||||
if err != nil {
|
||||
return fmt.Errorf("r.DB.GetOrCreateEventStateKeyNID: %w", err)
|
||||
}
|
||||
|
||||
// Store the event.
|
||||
_, _, stateAtEvent, redactionEvent, redactedEventID, err := r.DB.StoreEvent(ctx, event, authEventNIDs, isRejected)
|
||||
_, stateAtEvent, redactionEvent, redactedEventID, err := r.DB.StoreEvent(ctx, event, roomNID, eventTypeNID, eventStateKeyNID, authEventNIDs, isRejected)
|
||||
if err != nil {
|
||||
return fmt.Errorf("updater.StoreEvent: %w", err)
|
||||
}
|
||||
|
|
@ -568,6 +583,7 @@ func (r *Inputer) processStateBefore(
|
|||
// we've failed to retrieve the auth chain altogether (in which case
|
||||
// an error is returned) or we've successfully retrieved them all and
|
||||
// they are now in the database.
|
||||
// nolint: gocyclo
|
||||
func (r *Inputer) fetchAuthEvents(
|
||||
ctx context.Context,
|
||||
logger *logrus.Entry,
|
||||
|
|
@ -674,8 +690,23 @@ nextAuthEvent:
|
|||
logger.WithError(err).Warnf("Auth event %s rejected", authEvent.EventID())
|
||||
}
|
||||
|
||||
roomNID, err := r.DB.GetOrCreateRoomNID(ctx, authEvent)
|
||||
if err != nil {
|
||||
return fmt.Errorf("r.DB.GetOrCreateRoomNID: %w", err)
|
||||
}
|
||||
|
||||
eventTypeNID, err := r.DB.GetOrCreateEventTypeNID(ctx, authEvent.Type())
|
||||
if err != nil {
|
||||
return fmt.Errorf("r.DB.GetOrCreateEventTypeNID: %w", err)
|
||||
}
|
||||
|
||||
eventStateKeyNID, err := r.DB.GetOrCreateEventStateKeyNID(ctx, event.StateKey())
|
||||
if err != nil {
|
||||
return fmt.Errorf("r.DB.GetOrCreateEventStateKeyNID: %w", err)
|
||||
}
|
||||
|
||||
// Finally, store the event in the database.
|
||||
eventNID, _, _, _, _, err := r.DB.StoreEvent(ctx, authEvent, authEventNIDs, isRejected)
|
||||
eventNID, _, _, _, err := r.DB.StoreEvent(ctx, authEvent, roomNID, eventTypeNID, eventStateKeyNID, authEventNIDs, isRejected)
|
||||
if err != nil {
|
||||
return fmt.Errorf("updater.StoreEvent: %w", err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -605,9 +605,28 @@ func persistEvents(ctx context.Context, db storage.Database, events []*gomatrixs
|
|||
authNids[i] = nid.EventNID
|
||||
i++
|
||||
}
|
||||
|
||||
roomNID, err = db.GetOrCreateRoomNID(ctx, ev.Unwrap())
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("failed to get or create roomNID")
|
||||
continue
|
||||
}
|
||||
|
||||
eventTypeNID, err := db.GetOrCreateEventTypeNID(ctx, ev.Type())
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("failed to get or create eventType NID")
|
||||
continue
|
||||
}
|
||||
|
||||
eventStateKeyNID, err := db.GetOrCreateEventStateKeyNID(ctx, ev.StateKey())
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("failed to get or create eventStateKey NID")
|
||||
continue
|
||||
}
|
||||
|
||||
var redactedEventID string
|
||||
var redactionEvent *gomatrixserverlib.Event
|
||||
eventNID, roomNID, _, redactionEvent, redactedEventID, err = db.StoreEvent(ctx, ev.Unwrap(), authNids, false)
|
||||
eventNID, _, redactionEvent, redactedEventID, err = db.StoreEvent(ctx, ev.Unwrap(), roomNID, eventTypeNID, eventStateKeyNID, authNids, false)
|
||||
if err != nil {
|
||||
logrus.WithError(err).WithField("event_id", ev.EventID()).Error("Failed to persist event")
|
||||
continue
|
||||
|
|
|
|||
|
|
@ -74,10 +74,7 @@ type Database interface {
|
|||
SnapshotNIDFromEventID(ctx context.Context, eventID string) (types.StateSnapshotNID, error)
|
||||
BulkSelectSnapshotsFromEventIDs(ctx context.Context, eventIDs []string) (map[types.StateSnapshotNID][]string, error)
|
||||
// Stores a matrix room event in the database. Returns the room NID, the state snapshot and the redacted event ID if any, or an error.
|
||||
StoreEvent(
|
||||
ctx context.Context, event *gomatrixserverlib.Event, authEventNIDs []types.EventNID,
|
||||
isRejected bool,
|
||||
) (types.EventNID, types.RoomNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error)
|
||||
StoreEvent(ctx context.Context, event *gomatrixserverlib.Event, roomNID types.RoomNID, eventTypeNID types.EventTypeNID, eventStateKeyNID types.EventStateKeyNID, authEventNIDs []types.EventNID, isRejected bool) (types.EventNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error)
|
||||
// Look up the state entries for a list of string event IDs
|
||||
// Returns an error if the there is an error talking to the database
|
||||
// Returns a types.MissingEventError if the event IDs aren't in the database.
|
||||
|
|
@ -182,24 +179,11 @@ type Database interface {
|
|||
GetMembershipForHistoryVisibility(
|
||||
ctx context.Context, userNID types.EventStateKeyNID, info *types.RoomInfo, eventIDs ...string,
|
||||
) (map[string]*gomatrixserverlib.HeaderedEvent, error)
|
||||
GetOrCreateRoomNID(ctx context.Context, event *gomatrixserverlib.Event) (types.RoomNID, error)
|
||||
GetOrCreateEventTypeNID(ctx context.Context, eventType string) (eventTypeNID types.EventTypeNID, err error)
|
||||
GetOrCreateEventStateKeyNID(ctx context.Context, eventStateKey *string) (types.EventStateKeyNID, error)
|
||||
}
|
||||
|
||||
/*
|
||||
RoomsTable
|
||||
EventsTable
|
||||
EventJSONTable
|
||||
MembershipTable
|
||||
StateSnapshotTable
|
||||
StateBlockTable
|
||||
EventTypesTable
|
||||
EventStateKeysTable
|
||||
|
||||
|
||||
PublishedTable?
|
||||
RoomAliases?
|
||||
|
||||
*/
|
||||
|
||||
type RoomDatabase interface {
|
||||
// RoomInfo returns room information for the given room ID, or nil if there is no room.
|
||||
RoomInfo(ctx context.Context, roomID string) (*types.RoomInfo, error)
|
||||
|
|
@ -207,7 +191,7 @@ type RoomDatabase interface {
|
|||
IsEventRejected(ctx context.Context, roomNID types.RoomNID, eventID string) (rejected bool, err error)
|
||||
MissingAuthPrevEvents(ctx context.Context, e *gomatrixserverlib.Event) (missingAuth, missingPrev []string, err error)
|
||||
// Stores a matrix room event in the database. Returns the room NID, the state snapshot and the redacted event ID if any, or an error.
|
||||
StoreEvent(ctx context.Context, event *gomatrixserverlib.Event, authEventNIDs []types.EventNID, isRejected bool) (types.EventNID, types.RoomNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error)
|
||||
StoreEvent(ctx context.Context, event *gomatrixserverlib.Event, roomNID types.RoomNID, eventTypeNID types.EventTypeNID, eventStateKeyNID types.EventStateKeyNID, authEventNIDs []types.EventNID, isRejected bool) (types.EventNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error)
|
||||
UpgradeRoom(ctx context.Context, oldRoomID, newRoomID, eventSender string) error
|
||||
GetRoomUpdater(ctx context.Context, roomInfo *types.RoomInfo) (*shared.RoomUpdater, error)
|
||||
StateEntriesForEventIDs(ctx context.Context, eventIDs []string, excludeRejected bool) ([]types.StateEntry, error)
|
||||
|
|
@ -224,4 +208,7 @@ type RoomDatabase interface {
|
|||
LatestEventIDs(ctx context.Context, roomNID types.RoomNID) ([]gomatrixserverlib.EventReference, types.StateSnapshotNID, int64, error)
|
||||
EventTypeNIDs(ctx context.Context, eventTypes []string) (map[string]types.EventTypeNID, error)
|
||||
EventStateKeyNIDs(ctx context.Context, eventStateKeys []string) (map[string]types.EventStateKeyNID, error)
|
||||
GetOrCreateRoomNID(ctx context.Context, event *gomatrixserverlib.Event) (types.RoomNID, error)
|
||||
GetOrCreateEventTypeNID(ctx context.Context, eventType string) (eventTypeNID types.EventTypeNID, err error)
|
||||
GetOrCreateEventStateKeyNID(ctx context.Context, eventStateKey *string) (types.EventStateKeyNID, error)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -112,16 +112,31 @@ func (d *Database) eventStateKeyNIDs(
|
|||
) (map[string]types.EventStateKeyNID, error) {
|
||||
result := make(map[string]types.EventStateKeyNID)
|
||||
eventStateKeys = util.UniqueStrings(eventStateKeys)
|
||||
nids, err := d.EventStateKeysTable.BulkSelectEventStateKeyNID(ctx, txn, eventStateKeys)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
// first ask the cache about these keys
|
||||
fetchEventStateKeys := make([]string, 0, len(eventStateKeys))
|
||||
for _, eventStateKey := range eventStateKeys {
|
||||
eventStateKeyNID, ok := d.Cache.GetEventStateKeyNID(eventStateKey)
|
||||
if ok {
|
||||
result[eventStateKey] = eventStateKeyNID
|
||||
continue
|
||||
}
|
||||
fetchEventStateKeys = append(fetchEventStateKeys, eventStateKey)
|
||||
}
|
||||
for eventStateKey, nid := range nids {
|
||||
result[eventStateKey] = nid
|
||||
|
||||
if len(fetchEventStateKeys) > 0 {
|
||||
nids, err := d.EventStateKeysTable.BulkSelectEventStateKeyNID(ctx, txn, fetchEventStateKeys)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for eventStateKey, nid := range nids {
|
||||
result[eventStateKey] = nid
|
||||
}
|
||||
}
|
||||
|
||||
// We received some nids, but are still missing some, work out which and create them
|
||||
if len(eventStateKeys) > len(result) {
|
||||
var nid types.EventStateKeyNID
|
||||
var err error
|
||||
err = d.Writer.Do(d.DB, txn, func(txn *sql.Tx) error {
|
||||
for _, eventStateKey := range eventStateKeys {
|
||||
if _, ok := result[eventStateKey]; ok {
|
||||
|
|
@ -629,73 +644,71 @@ func (d *Database) IsEventRejected(ctx context.Context, roomNID types.RoomNID, e
|
|||
return d.EventsTable.SelectEventRejected(ctx, nil, roomNID, eventID)
|
||||
}
|
||||
|
||||
func (d *Database) StoreEvent(
|
||||
ctx context.Context, event *gomatrixserverlib.Event,
|
||||
authEventNIDs []types.EventNID, isRejected bool,
|
||||
) (types.EventNID, types.RoomNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error) {
|
||||
return d.storeEvent(ctx, nil, event, authEventNIDs, isRejected)
|
||||
// GetOrCreateRoomNID gets or creates a new roomNID for the given event
|
||||
func (d *Database) GetOrCreateRoomNID(ctx context.Context, event *gomatrixserverlib.Event) (roomNID types.RoomNID, err error) {
|
||||
// Get the default room version. If the client doesn't supply a room_version
|
||||
// then we will use our configured default to create the room.
|
||||
// https://matrix.org/docs/spec/client_server/r0.6.0#post-matrix-client-r0-createroom
|
||||
// Note that the below logic depends on the m.room.create event being the
|
||||
// first event that is persisted to the database when creating or joining a
|
||||
// room.
|
||||
var roomVersion gomatrixserverlib.RoomVersion
|
||||
if roomVersion, err = extractRoomVersionFromCreateEvent(event); err != nil {
|
||||
return 0, fmt.Errorf("extractRoomVersionFromCreateEvent: %w", err)
|
||||
}
|
||||
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||
roomNID, err = d.assignRoomNID(ctx, txn, event.RoomID(), roomVersion)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return roomNID, err
|
||||
}
|
||||
|
||||
func (d *Database) storeEvent(
|
||||
ctx context.Context, updater *RoomUpdater, event *gomatrixserverlib.Event,
|
||||
authEventNIDs []types.EventNID, isRejected bool,
|
||||
) (types.EventNID, types.RoomNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error) {
|
||||
var (
|
||||
roomNID types.RoomNID
|
||||
eventTypeNID types.EventTypeNID
|
||||
eventStateKeyNID types.EventStateKeyNID
|
||||
eventNID types.EventNID
|
||||
stateNID types.StateSnapshotNID
|
||||
redactionEvent *gomatrixserverlib.Event
|
||||
redactedEventID string
|
||||
err error
|
||||
)
|
||||
var txn *sql.Tx
|
||||
if updater != nil && updater.txn != nil {
|
||||
txn = updater.txn
|
||||
}
|
||||
// First writer is with a database-provided transaction, so that NIDs are assigned
|
||||
// globally outside of the updater context, to help avoid races.
|
||||
func (d *Database) GetOrCreateEventTypeNID(ctx context.Context, eventType string) (eventTypeNID types.EventTypeNID, err error) {
|
||||
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||
// TODO: Here we should aim to have two different code paths for new rooms
|
||||
// vs existing ones.
|
||||
|
||||
// Get the default room version. If the client doesn't supply a room_version
|
||||
// then we will use our configured default to create the room.
|
||||
// https://matrix.org/docs/spec/client_server/r0.6.0#post-matrix-client-r0-createroom
|
||||
// Note that the below logic depends on the m.room.create event being the
|
||||
// first event that is persisted to the database when creating or joining a
|
||||
// room.
|
||||
var roomVersion gomatrixserverlib.RoomVersion
|
||||
if roomVersion, err = extractRoomVersionFromCreateEvent(event); err != nil {
|
||||
return fmt.Errorf("extractRoomVersionFromCreateEvent: %w", err)
|
||||
}
|
||||
|
||||
if roomNID, err = d.assignRoomNID(ctx, txn, event.RoomID(), roomVersion); err != nil {
|
||||
return fmt.Errorf("d.assignRoomNID: %w", err)
|
||||
}
|
||||
|
||||
if eventTypeNID, err = d.assignEventTypeNID(ctx, txn, event.Type()); err != nil {
|
||||
if eventTypeNID, err = d.assignEventTypeNID(ctx, txn, eventType); err != nil {
|
||||
return fmt.Errorf("d.assignEventTypeNID: %w", err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return eventTypeNID, err
|
||||
}
|
||||
|
||||
eventStateKey := event.StateKey()
|
||||
// Assigned a numeric ID for the state_key if there is one present.
|
||||
// Otherwise set the numeric ID for the state_key to 0.
|
||||
if eventStateKey != nil {
|
||||
if eventStateKeyNID, err = d.assignStateKeyNID(ctx, txn, *eventStateKey); err != nil {
|
||||
return fmt.Errorf("d.assignStateKeyNID: %w", err)
|
||||
}
|
||||
func (d *Database) GetOrCreateEventStateKeyNID(ctx context.Context, eventStateKey *string) (eventStateKeyNID types.EventStateKeyNID, err error) {
|
||||
if eventStateKey == nil {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||
if eventStateKeyNID, err = d.assignStateKeyNID(ctx, txn, *eventStateKey); err != nil {
|
||||
return fmt.Errorf("d.assignStateKeyNID: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return 0, 0, types.StateAtEvent{}, nil, "", fmt.Errorf("d.Writer.Do: %w", err)
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return eventStateKeyNID, nil
|
||||
}
|
||||
|
||||
func (d *Database) StoreEvent(
|
||||
ctx context.Context, event *gomatrixserverlib.Event,
|
||||
roomNID types.RoomNID, eventTypeNID types.EventTypeNID, eventStateKeyNID types.EventStateKeyNID,
|
||||
authEventNIDs []types.EventNID, isRejected bool,
|
||||
) (types.EventNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error) {
|
||||
var (
|
||||
eventNID types.EventNID
|
||||
stateNID types.StateSnapshotNID
|
||||
redactionEvent *gomatrixserverlib.Event
|
||||
redactedEventID string
|
||||
err error
|
||||
)
|
||||
// Second writer is using the database-provided transaction, probably from the
|
||||
// room updater, for easy roll-back if required.
|
||||
err = d.Writer.Do(d.DB, txn, func(txn *sql.Tx) error {
|
||||
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||
if eventNID, stateNID, err = d.EventsTable.InsertEvent(
|
||||
ctx,
|
||||
txn,
|
||||
|
|
@ -731,7 +744,7 @@ func (d *Database) storeEvent(
|
|||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return 0, 0, types.StateAtEvent{}, nil, "", fmt.Errorf("d.Writer.Do: %w", err)
|
||||
return 0, types.StateAtEvent{}, nil, "", fmt.Errorf("d.Writer.Do: %w", err)
|
||||
}
|
||||
|
||||
// We should attempt to update the previous events table with any
|
||||
|
|
@ -746,28 +759,28 @@ func (d *Database) storeEvent(
|
|||
// any other so this is fine. If we ever update GetLatestEventsForUpdate or NewLatestEventsUpdater
|
||||
// to do writes however then this will need to go inside `Writer.Do`.
|
||||
succeeded := false
|
||||
if updater == nil {
|
||||
var roomInfo *types.RoomInfo
|
||||
roomInfo, err = d.roomInfo(ctx, txn, event.RoomID())
|
||||
if err != nil {
|
||||
return 0, 0, types.StateAtEvent{}, nil, "", fmt.Errorf("d.RoomInfo: %w", err)
|
||||
}
|
||||
if roomInfo == nil && len(prevEvents) > 0 {
|
||||
return 0, 0, types.StateAtEvent{}, nil, "", fmt.Errorf("expected room %q to exist", event.RoomID())
|
||||
}
|
||||
updater, err = d.GetRoomUpdater(ctx, roomInfo)
|
||||
if err != nil {
|
||||
return 0, 0, types.StateAtEvent{}, nil, "", fmt.Errorf("GetRoomUpdater: %w", err)
|
||||
}
|
||||
defer sqlutil.EndTransactionWithCheck(updater, &succeeded, &err)
|
||||
var roomInfo *types.RoomInfo
|
||||
roomInfo, err = d.roomInfo(ctx, nil, event.RoomID())
|
||||
if err != nil {
|
||||
return 0, types.StateAtEvent{}, nil, "", fmt.Errorf("d.RoomInfo: %w", err)
|
||||
}
|
||||
if roomInfo == nil && len(prevEvents) > 0 {
|
||||
return 0, types.StateAtEvent{}, nil, "", fmt.Errorf("expected room %q to exist", event.RoomID())
|
||||
}
|
||||
var updater *RoomUpdater
|
||||
updater, err = d.GetRoomUpdater(ctx, roomInfo)
|
||||
if err != nil {
|
||||
return 0, types.StateAtEvent{}, nil, "", fmt.Errorf("GetRoomUpdater: %w", err)
|
||||
}
|
||||
defer sqlutil.EndTransactionWithCheck(updater, &succeeded, &err)
|
||||
|
||||
if err = updater.StorePreviousEvents(eventNID, prevEvents); err != nil {
|
||||
return 0, 0, types.StateAtEvent{}, nil, "", fmt.Errorf("updater.StorePreviousEvents: %w", err)
|
||||
return 0, types.StateAtEvent{}, nil, "", fmt.Errorf("updater.StorePreviousEvents: %w", err)
|
||||
}
|
||||
succeeded = true
|
||||
}
|
||||
|
||||
return eventNID, roomNID, types.StateAtEvent{
|
||||
return eventNID, types.StateAtEvent{
|
||||
BeforeStateSnapshotNID: stateNID,
|
||||
StateEntry: types.StateEntry{
|
||||
StateKeyTuple: types.StateKeyTuple{
|
||||
|
|
@ -819,6 +832,10 @@ func (d *Database) MissingAuthPrevEvents(
|
|||
func (d *Database) assignRoomNID(
|
||||
ctx context.Context, txn *sql.Tx, roomID string, roomVersion gomatrixserverlib.RoomVersion,
|
||||
) (types.RoomNID, error) {
|
||||
roomNID, ok := d.Cache.GetRoomServerRoomNID(roomID)
|
||||
if ok {
|
||||
return roomNID, nil
|
||||
}
|
||||
// Check if we already have a numeric ID in the database.
|
||||
roomNID, err := d.RoomsTable.SelectRoomNID(ctx, txn, roomID)
|
||||
if err == sql.ErrNoRows {
|
||||
|
|
@ -829,12 +846,20 @@ func (d *Database) assignRoomNID(
|
|||
roomNID, err = d.RoomsTable.SelectRoomNID(ctx, txn, roomID)
|
||||
}
|
||||
}
|
||||
return roomNID, err
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
d.Cache.StoreRoomServerRoomID(roomNID, roomID)
|
||||
return roomNID, nil
|
||||
}
|
||||
|
||||
func (d *Database) assignEventTypeNID(
|
||||
ctx context.Context, txn *sql.Tx, eventType string,
|
||||
) (types.EventTypeNID, error) {
|
||||
eventTypeNID, ok := d.Cache.GetEventTypeKey(eventType)
|
||||
if ok {
|
||||
return eventTypeNID, nil
|
||||
}
|
||||
// Check if we already have a numeric ID in the database.
|
||||
eventTypeNID, err := d.EventTypesTable.SelectEventTypeNID(ctx, txn, eventType)
|
||||
if err == sql.ErrNoRows {
|
||||
|
|
@ -845,12 +870,20 @@ func (d *Database) assignEventTypeNID(
|
|||
eventTypeNID, err = d.EventTypesTable.SelectEventTypeNID(ctx, txn, eventType)
|
||||
}
|
||||
}
|
||||
return eventTypeNID, err
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
d.Cache.StoreEventTypeKey(eventTypeNID, eventType)
|
||||
return eventTypeNID, nil
|
||||
}
|
||||
|
||||
func (d *Database) assignStateKeyNID(
|
||||
ctx context.Context, txn *sql.Tx, eventStateKey string,
|
||||
) (types.EventStateKeyNID, error) {
|
||||
eventStateKeyNID, ok := d.Cache.GetEventStateKeyNID(eventStateKey)
|
||||
if ok {
|
||||
return eventStateKeyNID, nil
|
||||
}
|
||||
// Check if we already have a numeric ID in the database.
|
||||
eventStateKeyNID, err := d.EventStateKeysTable.SelectEventStateKeyNID(ctx, txn, eventStateKey)
|
||||
if err == sql.ErrNoRows {
|
||||
|
|
@ -861,6 +894,7 @@ func (d *Database) assignStateKeyNID(
|
|||
eventStateKeyNID, err = d.EventStateKeysTable.SelectEventStateKeyNID(ctx, txn, eventStateKey)
|
||||
}
|
||||
}
|
||||
d.Cache.StoreEventStateKey(eventStateKeyNID, eventStateKey)
|
||||
return eventStateKeyNID, err
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue