mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-21 13:03:09 -06:00
Refactor Events and create a new RoomDatabase interface
This commit is contained in:
parent
c8ca23acdb
commit
e564018343
|
|
@ -87,7 +87,7 @@ func main() {
|
||||||
}
|
}
|
||||||
|
|
||||||
var eventEntries []types.Event
|
var eventEntries []types.Event
|
||||||
eventEntries, err = roomserverDB.Events(ctx, eventNIDs)
|
eventEntries, err = roomserverDB.Events(ctx, 0, eventNIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
@ -145,7 +145,7 @@ func main() {
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Println("Fetching", len(eventNIDMap), "state events")
|
fmt.Println("Fetching", len(eventNIDMap), "state events")
|
||||||
eventEntries, err := roomserverDB.Events(ctx, eventNIDs)
|
eventEntries, err := roomserverDB.Events(ctx, 0, eventNIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
@ -165,7 +165,7 @@ func main() {
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Println("Fetching", len(authEventIDs), "auth events")
|
fmt.Println("Fetching", len(authEventIDs), "auth events")
|
||||||
authEventEntries, err := roomserverDB.EventsFromIDs(ctx, authEventIDs)
|
authEventEntries, err := roomserverDB.EventsFromIDs(ctx, 0, authEventIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -205,3 +205,44 @@ type FederationRoomserverAPI interface {
|
||||||
type KeyserverRoomserverAPI interface {
|
type KeyserverRoomserverAPI interface {
|
||||||
QueryLeftUsers(ctx context.Context, req *QueryLeftUsersRequest, res *QueryLeftUsersResponse) error
|
QueryLeftUsers(ctx context.Context, req *QueryLeftUsersRequest, res *QueryLeftUsersResponse) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type RoomAPI interface {
|
||||||
|
QueryLatestEventsAndStateAPI
|
||||||
|
QueryEventsAPI
|
||||||
|
QueryStateAfterEvents(ctx context.Context, req *QueryStateAfterEventsRequest, res *QueryStateAfterEventsResponse) error
|
||||||
|
|
||||||
|
// Query a given amount (or less) of events prior to a given set of events.
|
||||||
|
PerformBackfill(ctx context.Context, req *PerformBackfillRequest, res *PerformBackfillResponse) error
|
||||||
|
|
||||||
|
// QueryMembershipAtEvent queries the memberships at the given events.
|
||||||
|
// Returns a map from eventID to a slice of gomatrixserverlib.HeaderedEvent.
|
||||||
|
QueryMembershipAtEvent(ctx context.Context, request *QueryMembershipAtEventRequest, response *QueryMembershipAtEventResponse) error
|
||||||
|
|
||||||
|
// Query a list of membership events for a room
|
||||||
|
QueryMembershipsForRoom(ctx context.Context, req *QueryMembershipsForRoomRequest, res *QueryMembershipsForRoomResponse) error
|
||||||
|
|
||||||
|
// Get all known aliases for a room ID
|
||||||
|
GetAliasesForRoomID(ctx context.Context, req *GetAliasesForRoomIDRequest, res *GetAliasesForRoomIDResponse) error
|
||||||
|
|
||||||
|
QueryMembershipForUser(ctx context.Context, req *QueryMembershipForUserRequest, res *QueryMembershipForUserResponse) error
|
||||||
|
QueryRoomVersionForRoom(ctx context.Context, req *QueryRoomVersionForRoomRequest, res *QueryRoomVersionForRoomResponse) error
|
||||||
|
QueryPublishedRooms(ctx context.Context, req *QueryPublishedRoomsRequest, res *QueryPublishedRoomsResponse) error
|
||||||
|
|
||||||
|
GetRoomIDForAlias(ctx context.Context, req *GetRoomIDForAliasRequest, res *GetRoomIDForAliasResponse) error
|
||||||
|
|
||||||
|
// PerformRoomUpgrade upgrades a room to a newer version
|
||||||
|
PerformRoomUpgrade(ctx context.Context, req *PerformRoomUpgradeRequest, resp *PerformRoomUpgradeResponse) error
|
||||||
|
PerformAdminEvacuateRoom(ctx context.Context, req *PerformAdminEvacuateRoomRequest, res *PerformAdminEvacuateRoomResponse) error
|
||||||
|
PerformAdminPurgeRoom(ctx context.Context, req *PerformAdminPurgeRoomRequest, res *PerformAdminPurgeRoomResponse) error
|
||||||
|
PerformAdminDownloadState(ctx context.Context, req *PerformAdminDownloadStateRequest, res *PerformAdminDownloadStateResponse) error
|
||||||
|
PerformPeek(ctx context.Context, req *PerformPeekRequest, res *PerformPeekResponse) error
|
||||||
|
PerformUnpeek(ctx context.Context, req *PerformUnpeekRequest, res *PerformUnpeekResponse) error
|
||||||
|
PerformInvite(ctx context.Context, req *PerformInviteRequest, res *PerformInviteResponse) error
|
||||||
|
PerformJoin(ctx context.Context, req *PerformJoinRequest, res *PerformJoinResponse) error
|
||||||
|
PerformLeave(ctx context.Context, req *PerformLeaveRequest, res *PerformLeaveResponse) error
|
||||||
|
PerformPublish(ctx context.Context, req *PerformPublishRequest, res *PerformPublishResponse) error
|
||||||
|
SetRoomAlias(ctx context.Context, req *SetRoomAliasRequest, res *SetRoomAliasResponse) error
|
||||||
|
RemoveRoomAlias(ctx context.Context, req *RemoveRoomAliasRequest, res *RemoveRoomAliasResponse) error
|
||||||
|
// QueryServerBannedFromRoom returns whether a server is banned from a room by server ACLs.
|
||||||
|
QueryServerBannedFromRoom(ctx context.Context, req *QueryServerBannedFromRoomRequest, res *QueryServerBannedFromRoomResponse) error
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -30,26 +30,6 @@ import (
|
||||||
"github.com/tidwall/sjson"
|
"github.com/tidwall/sjson"
|
||||||
)
|
)
|
||||||
|
|
||||||
// RoomserverInternalAPIDatabase has the storage APIs needed to implement the alias API.
|
|
||||||
type RoomserverInternalAPIDatabase interface {
|
|
||||||
// Save a given room alias with the room ID it refers to.
|
|
||||||
// Returns an error if there was a problem talking to the database.
|
|
||||||
SetRoomAlias(ctx context.Context, alias string, roomID string, creatorUserID string) error
|
|
||||||
// Look up the room ID a given alias refers to.
|
|
||||||
// Returns an error if there was a problem talking to the database.
|
|
||||||
GetRoomIDForAlias(ctx context.Context, alias string) (string, error)
|
|
||||||
// Look up all aliases referring to a given room ID.
|
|
||||||
// Returns an error if there was a problem talking to the database.
|
|
||||||
GetAliasesForRoomID(ctx context.Context, roomID string) ([]string, error)
|
|
||||||
// Remove a given room alias.
|
|
||||||
// Returns an error if there was a problem talking to the database.
|
|
||||||
RemoveRoomAlias(ctx context.Context, alias string) error
|
|
||||||
// Look up the room version for a given room.
|
|
||||||
GetRoomVersionForRoom(
|
|
||||||
ctx context.Context, roomID string,
|
|
||||||
) (gomatrixserverlib.RoomVersion, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetRoomAlias implements alias.RoomserverInternalAPI
|
// SetRoomAlias implements alias.RoomserverInternalAPI
|
||||||
func (r *RoomserverInternalAPI) SetRoomAlias(
|
func (r *RoomserverInternalAPI) SetRoomAlias(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
|
|
|
||||||
|
|
@ -155,7 +155,6 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.RoomserverFederatio
|
||||||
r.Unpeeker = &perform.Unpeeker{
|
r.Unpeeker = &perform.Unpeeker{
|
||||||
ServerName: r.ServerName,
|
ServerName: r.ServerName,
|
||||||
Cfg: r.Cfg,
|
Cfg: r.Cfg,
|
||||||
DB: r.DB,
|
|
||||||
FSAPI: r.fsAPI,
|
FSAPI: r.fsAPI,
|
||||||
Inputer: r.Inputer,
|
Inputer: r.Inputer,
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -31,7 +31,8 @@ import (
|
||||||
// the soft-fail bool.
|
// the soft-fail bool.
|
||||||
func CheckForSoftFail(
|
func CheckForSoftFail(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
db storage.Database,
|
db storage.RoomDatabase,
|
||||||
|
roomInfo *types.RoomInfo,
|
||||||
event *gomatrixserverlib.HeaderedEvent,
|
event *gomatrixserverlib.HeaderedEvent,
|
||||||
stateEventIDs []string,
|
stateEventIDs []string,
|
||||||
) (bool, error) {
|
) (bool, error) {
|
||||||
|
|
@ -45,16 +46,6 @@ func CheckForSoftFail(
|
||||||
return true, fmt.Errorf("StateEntriesForEventIDs failed: %w", err)
|
return true, fmt.Errorf("StateEntriesForEventIDs failed: %w", err)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Work out if the room exists.
|
|
||||||
var roomInfo *types.RoomInfo
|
|
||||||
roomInfo, err = db.RoomInfo(ctx, event.RoomID())
|
|
||||||
if err != nil {
|
|
||||||
return false, fmt.Errorf("db.RoomNID: %w", err)
|
|
||||||
}
|
|
||||||
if roomInfo == nil || roomInfo.IsStub() {
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Then get the state entries for the current state snapshot.
|
// Then get the state entries for the current state snapshot.
|
||||||
// We'll use this to check if the event is allowed right now.
|
// We'll use this to check if the event is allowed right now.
|
||||||
roomState := state.NewStateResolution(db, roomInfo)
|
roomState := state.NewStateResolution(db, roomInfo)
|
||||||
|
|
@ -76,7 +67,7 @@ func CheckForSoftFail(
|
||||||
stateNeeded := gomatrixserverlib.StateNeededForAuth([]*gomatrixserverlib.Event{event.Unwrap()})
|
stateNeeded := gomatrixserverlib.StateNeededForAuth([]*gomatrixserverlib.Event{event.Unwrap()})
|
||||||
|
|
||||||
// Load the actual auth events from the database.
|
// Load the actual auth events from the database.
|
||||||
authEvents, err := loadAuthEvents(ctx, db, stateNeeded, authStateEntries)
|
authEvents, err := loadAuthEvents(ctx, db, roomInfo.RoomNID, stateNeeded, authStateEntries)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return true, fmt.Errorf("loadAuthEvents: %w", err)
|
return true, fmt.Errorf("loadAuthEvents: %w", err)
|
||||||
}
|
}
|
||||||
|
|
@ -93,7 +84,8 @@ func CheckForSoftFail(
|
||||||
// Returns the numeric IDs for the auth events.
|
// Returns the numeric IDs for the auth events.
|
||||||
func CheckAuthEvents(
|
func CheckAuthEvents(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
db storage.Database,
|
db storage.RoomDatabase,
|
||||||
|
roomNID types.RoomNID,
|
||||||
event *gomatrixserverlib.HeaderedEvent,
|
event *gomatrixserverlib.HeaderedEvent,
|
||||||
authEventIDs []string,
|
authEventIDs []string,
|
||||||
) ([]types.EventNID, error) {
|
) ([]types.EventNID, error) {
|
||||||
|
|
@ -108,7 +100,7 @@ func CheckAuthEvents(
|
||||||
stateNeeded := gomatrixserverlib.StateNeededForAuth([]*gomatrixserverlib.Event{event.Unwrap()})
|
stateNeeded := gomatrixserverlib.StateNeededForAuth([]*gomatrixserverlib.Event{event.Unwrap()})
|
||||||
|
|
||||||
// Load the actual auth events from the database.
|
// Load the actual auth events from the database.
|
||||||
authEvents, err := loadAuthEvents(ctx, db, stateNeeded, authStateEntries)
|
authEvents, err := loadAuthEvents(ctx, db, roomNID, stateNeeded, authStateEntries)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("loadAuthEvents: %w", err)
|
return nil, fmt.Errorf("loadAuthEvents: %w", err)
|
||||||
}
|
}
|
||||||
|
|
@ -201,6 +193,7 @@ func (ae *authEvents) lookupEvent(typeNID types.EventTypeNID, stateKey string) *
|
||||||
func loadAuthEvents(
|
func loadAuthEvents(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
db state.StateResolutionStorage,
|
db state.StateResolutionStorage,
|
||||||
|
roomNID types.RoomNID,
|
||||||
needed gomatrixserverlib.StateNeeded,
|
needed gomatrixserverlib.StateNeeded,
|
||||||
state []types.StateEntry,
|
state []types.StateEntry,
|
||||||
) (result authEvents, err error) {
|
) (result authEvents, err error) {
|
||||||
|
|
@ -223,7 +216,7 @@ func loadAuthEvents(
|
||||||
eventNIDs = append(eventNIDs, eventNID)
|
eventNIDs = append(eventNIDs, eventNID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if result.events, err = db.Events(ctx, eventNIDs); err != nil {
|
if result.events, err = db.Events(ctx, roomNID, eventNIDs); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
roomID := ""
|
roomID := ""
|
||||||
|
|
|
||||||
|
|
@ -85,7 +85,7 @@ func IsServerCurrentlyInRoom(ctx context.Context, db storage.Database, serverNam
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
events, err := db.Events(ctx, eventNIDs)
|
events, err := db.Events(ctx, info.RoomNID, eventNIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
@ -157,7 +157,7 @@ func IsInvitePending(
|
||||||
// only keep the "m.room.member" events with a "join" membership. These events are returned.
|
// only keep the "m.room.member" events with a "join" membership. These events are returned.
|
||||||
// Returns an error if there was an issue fetching the events.
|
// Returns an error if there was an issue fetching the events.
|
||||||
func GetMembershipsAtState(
|
func GetMembershipsAtState(
|
||||||
ctx context.Context, db storage.Database, stateEntries []types.StateEntry, joinedOnly bool,
|
ctx context.Context, db storage.RoomDatabase, roomNID types.RoomNID, stateEntries []types.StateEntry, joinedOnly bool,
|
||||||
) ([]types.Event, error) {
|
) ([]types.Event, error) {
|
||||||
|
|
||||||
var eventNIDs types.EventNIDs
|
var eventNIDs types.EventNIDs
|
||||||
|
|
@ -177,7 +177,7 @@ func GetMembershipsAtState(
|
||||||
util.Unique(eventNIDs)
|
util.Unique(eventNIDs)
|
||||||
|
|
||||||
// Get all of the events in this state
|
// Get all of the events in this state
|
||||||
stateEvents, err := db.Events(ctx, eventNIDs)
|
stateEvents, err := db.Events(ctx, roomNID, eventNIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
@ -220,16 +220,16 @@ func StateBeforeEvent(ctx context.Context, db storage.Database, info *types.Room
|
||||||
return roomState.LoadCombinedStateAfterEvents(ctx, prevState)
|
return roomState.LoadCombinedStateAfterEvents(ctx, prevState)
|
||||||
}
|
}
|
||||||
|
|
||||||
func MembershipAtEvent(ctx context.Context, db storage.Database, info *types.RoomInfo, eventIDs []string, stateKeyNID types.EventStateKeyNID) (map[string][]types.StateEntry, error) {
|
func MembershipAtEvent(ctx context.Context, db storage.RoomDatabase, info *types.RoomInfo, eventIDs []string, stateKeyNID types.EventStateKeyNID) (map[string][]types.StateEntry, error) {
|
||||||
roomState := state.NewStateResolution(db, info)
|
roomState := state.NewStateResolution(db, info)
|
||||||
// Fetch the state as it was when this event was fired
|
// Fetch the state as it was when this event was fired
|
||||||
return roomState.LoadMembershipAtEvent(ctx, eventIDs, stateKeyNID)
|
return roomState.LoadMembershipAtEvent(ctx, eventIDs, stateKeyNID)
|
||||||
}
|
}
|
||||||
|
|
||||||
func LoadEvents(
|
func LoadEvents(
|
||||||
ctx context.Context, db storage.Database, eventNIDs []types.EventNID,
|
ctx context.Context, db storage.RoomDatabase, roomNID types.RoomNID, eventNIDs []types.EventNID,
|
||||||
) ([]*gomatrixserverlib.Event, error) {
|
) ([]*gomatrixserverlib.Event, error) {
|
||||||
stateEvents, err := db.Events(ctx, eventNIDs)
|
stateEvents, err := db.Events(ctx, roomNID, eventNIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
@ -242,13 +242,13 @@ func LoadEvents(
|
||||||
}
|
}
|
||||||
|
|
||||||
func LoadStateEvents(
|
func LoadStateEvents(
|
||||||
ctx context.Context, db storage.Database, stateEntries []types.StateEntry,
|
ctx context.Context, db storage.RoomDatabase, roomNID types.RoomNID, stateEntries []types.StateEntry,
|
||||||
) ([]*gomatrixserverlib.Event, error) {
|
) ([]*gomatrixserverlib.Event, error) {
|
||||||
eventNIDs := make([]types.EventNID, len(stateEntries))
|
eventNIDs := make([]types.EventNID, len(stateEntries))
|
||||||
for i := range stateEntries {
|
for i := range stateEntries {
|
||||||
eventNIDs[i] = stateEntries[i].EventNID
|
eventNIDs[i] = stateEntries[i].EventNID
|
||||||
}
|
}
|
||||||
return LoadEvents(ctx, db, eventNIDs)
|
return LoadEvents(ctx, db, roomNID, eventNIDs)
|
||||||
}
|
}
|
||||||
|
|
||||||
func CheckServerAllowedToSeeEvent(
|
func CheckServerAllowedToSeeEvent(
|
||||||
|
|
@ -326,7 +326,7 @@ func slowGetHistoryVisibilityState(
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return LoadStateEvents(ctx, db, filteredEntries)
|
return LoadStateEvents(ctx, db, info.RoomNID, filteredEntries)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Remove this when we have tests to assert correctness of this function
|
// TODO: Remove this when we have tests to assert correctness of this function
|
||||||
|
|
@ -366,7 +366,7 @@ BFSLoop:
|
||||||
next = make([]string, 0)
|
next = make([]string, 0)
|
||||||
}
|
}
|
||||||
// Retrieve the events to process from the database.
|
// Retrieve the events to process from the database.
|
||||||
events, err = db.EventsFromIDs(ctx, front)
|
events, err = db.EventsFromIDs(ctx, info.RoomNID, front)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return resultNIDs, redactEventIDs, err
|
return resultNIDs, redactEventIDs, err
|
||||||
}
|
}
|
||||||
|
|
@ -467,7 +467,7 @@ func QueryLatestEventsAndState(
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
stateEvents, err := LoadStateEvents(ctx, db, stateEntries)
|
stateEvents, err := LoadStateEvents(ctx, db, roomInfo.RoomNID, stateEntries)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -76,7 +76,7 @@ type Inputer struct {
|
||||||
Cfg *config.RoomServer
|
Cfg *config.RoomServer
|
||||||
Base *base.BaseDendrite
|
Base *base.BaseDendrite
|
||||||
ProcessContext *process.ProcessContext
|
ProcessContext *process.ProcessContext
|
||||||
DB storage.Database
|
DB storage.RoomDatabase
|
||||||
NATSClient *nats.Conn
|
NATSClient *nats.Conn
|
||||||
JetStream nats.JetStreamContext
|
JetStream nats.JetStreamContext
|
||||||
Durable nats.SubOpt
|
Durable nats.SubOpt
|
||||||
|
|
|
||||||
|
|
@ -308,10 +308,10 @@ func (r *Inputer) processRoomEvent(
|
||||||
}
|
}
|
||||||
|
|
||||||
var softfail bool
|
var softfail bool
|
||||||
if input.Kind == api.KindNew {
|
if input.Kind == api.KindNew && !isCreateEvent {
|
||||||
// Check that the event passes authentication checks based on the
|
// Check that the event passes authentication checks based on the
|
||||||
// current room state.
|
// current room state.
|
||||||
softfail, err = helpers.CheckForSoftFail(ctx, r.DB, headered, input.StateEventIDs)
|
softfail, err = helpers.CheckForSoftFail(ctx, r.DB, roomInfo, headered, input.StateEventIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.WithError(err).Warn("Error authing soft-failed event")
|
logger.WithError(err).Warn("Error authing soft-failed event")
|
||||||
}
|
}
|
||||||
|
|
@ -322,8 +322,8 @@ func (r *Inputer) processRoomEvent(
|
||||||
// bother doing this if the event was already rejected as it just ends up
|
// bother doing this if the event was already rejected as it just ends up
|
||||||
// burning CPU time.
|
// burning CPU time.
|
||||||
historyVisibility := gomatrixserverlib.HistoryVisibilityShared // Default to shared.
|
historyVisibility := gomatrixserverlib.HistoryVisibilityShared // Default to shared.
|
||||||
if input.Kind != api.KindOutlier && rejectionErr == nil && !isRejected {
|
if input.Kind != api.KindOutlier && rejectionErr == nil && !isRejected && !isCreateEvent {
|
||||||
historyVisibility, rejectionErr, err = r.processStateBefore(ctx, input, missingPrev)
|
historyVisibility, rejectionErr, err = r.processStateBefore(ctx, roomInfo.RoomNID, input, missingPrev)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("r.processStateBefore: %w", err)
|
return fmt.Errorf("r.processStateBefore: %w", err)
|
||||||
}
|
}
|
||||||
|
|
@ -474,6 +474,7 @@ func (r *Inputer) handleRemoteRoomUpgrade(ctx context.Context, event *gomatrixse
|
||||||
// nolint:nakedret
|
// nolint:nakedret
|
||||||
func (r *Inputer) processStateBefore(
|
func (r *Inputer) processStateBefore(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
|
roomNID types.RoomNID,
|
||||||
input *api.InputRoomEvent,
|
input *api.InputRoomEvent,
|
||||||
missingPrev bool,
|
missingPrev bool,
|
||||||
) (historyVisibility gomatrixserverlib.HistoryVisibility, rejectionErr error, err error) {
|
) (historyVisibility gomatrixserverlib.HistoryVisibility, rejectionErr error, err error) {
|
||||||
|
|
@ -489,7 +490,7 @@ func (r *Inputer) processStateBefore(
|
||||||
case input.HasState:
|
case input.HasState:
|
||||||
// If we're overriding the state then we need to go and retrieve
|
// If we're overriding the state then we need to go and retrieve
|
||||||
// them from the database. It's a hard error if they are missing.
|
// them from the database. It's a hard error if they are missing.
|
||||||
stateEvents, err := r.DB.EventsFromIDs(ctx, input.StateEventIDs)
|
stateEvents, err := r.DB.EventsFromIDs(ctx, roomNID, input.StateEventIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", nil, fmt.Errorf("r.DB.EventsFromIDs: %w", err)
|
return "", nil, fmt.Errorf("r.DB.EventsFromIDs: %w", err)
|
||||||
}
|
}
|
||||||
|
|
@ -587,7 +588,7 @@ func (r *Inputer) fetchAuthEvents(
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, authEventID := range authEventIDs {
|
for _, authEventID := range authEventIDs {
|
||||||
authEvents, err := r.DB.EventsFromIDs(ctx, []string{authEventID})
|
authEvents, err := r.DB.EventsFromIDs(ctx, roomInfo.RoomNID, []string{authEventID})
|
||||||
if err != nil || len(authEvents) == 0 || authEvents[0].Event == nil {
|
if err != nil || len(authEvents) == 0 || authEvents[0].Event == nil {
|
||||||
unknown[authEventID] = struct{}{}
|
unknown[authEventID] = struct{}{}
|
||||||
continue
|
continue
|
||||||
|
|
@ -750,7 +751,7 @@ func (r *Inputer) kickGuests(ctx context.Context, event *gomatrixserverlib.Event
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
memberEvents, err := r.DB.Events(ctx, membershipNIDs)
|
memberEvents, err := r.DB.Events(ctx, roomInfo.RoomNID, membershipNIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -53,7 +53,7 @@ func (r *Inputer) updateMemberships(
|
||||||
// Load the event JSON so we can look up the "membership" key.
|
// Load the event JSON so we can look up the "membership" key.
|
||||||
// TODO: Maybe add a membership key to the events table so we can load that
|
// TODO: Maybe add a membership key to the events table so we can load that
|
||||||
// key without having to load the entire event JSON?
|
// key without having to load the entire event JSON?
|
||||||
events, err := updater.Events(ctx, eventNIDs)
|
events, err := updater.Events(ctx, 0, eventNIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -43,7 +43,7 @@ type missingStateReq struct {
|
||||||
log *logrus.Entry
|
log *logrus.Entry
|
||||||
virtualHost gomatrixserverlib.ServerName
|
virtualHost gomatrixserverlib.ServerName
|
||||||
origin gomatrixserverlib.ServerName
|
origin gomatrixserverlib.ServerName
|
||||||
db storage.Database
|
db storage.RoomDatabase
|
||||||
roomInfo *types.RoomInfo
|
roomInfo *types.RoomInfo
|
||||||
inputer *Inputer
|
inputer *Inputer
|
||||||
keys gomatrixserverlib.JSONVerifier
|
keys gomatrixserverlib.JSONVerifier
|
||||||
|
|
@ -395,7 +395,7 @@ func (t *missingStateReq) lookupStateAfterEventLocally(ctx context.Context, even
|
||||||
for _, entry := range stateEntries {
|
for _, entry := range stateEntries {
|
||||||
stateEventNIDs = append(stateEventNIDs, entry.EventNID)
|
stateEventNIDs = append(stateEventNIDs, entry.EventNID)
|
||||||
}
|
}
|
||||||
stateEvents, err := t.db.Events(ctx, stateEventNIDs)
|
stateEvents, err := t.db.Events(ctx, t.roomInfo.RoomNID, stateEventNIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.log.WithError(err).Warnf("failed to load state events locally")
|
t.log.WithError(err).Warnf("failed to load state events locally")
|
||||||
return nil
|
return nil
|
||||||
|
|
@ -432,7 +432,7 @@ func (t *missingStateReq) lookupStateAfterEventLocally(ctx context.Context, even
|
||||||
missingEventList = append(missingEventList, evID)
|
missingEventList = append(missingEventList, evID)
|
||||||
}
|
}
|
||||||
t.log.WithField("count", len(missingEventList)).Debugf("Fetching missing auth events")
|
t.log.WithField("count", len(missingEventList)).Debugf("Fetching missing auth events")
|
||||||
events, err := t.db.EventsFromIDs(ctx, missingEventList)
|
events, err := t.db.EventsFromIDs(ctx, t.roomInfo.RoomNID, missingEventList)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
@ -702,7 +702,7 @@ func (t *missingStateReq) lookupMissingStateViaStateIDs(ctx context.Context, roo
|
||||||
}
|
}
|
||||||
t.haveEventsMutex.Unlock()
|
t.haveEventsMutex.Unlock()
|
||||||
|
|
||||||
events, err := t.db.EventsFromIDs(ctx, missingEventList)
|
events, err := t.db.EventsFromIDs(ctx, t.roomInfo.RoomNID, missingEventList)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("t.db.EventsFromIDs: %w", err)
|
return nil, fmt.Errorf("t.db.EventsFromIDs: %w", err)
|
||||||
}
|
}
|
||||||
|
|
@ -844,7 +844,7 @@ func (t *missingStateReq) lookupEvent(ctx context.Context, roomVersion gomatrixs
|
||||||
|
|
||||||
if localFirst {
|
if localFirst {
|
||||||
// fetch from the roomserver
|
// fetch from the roomserver
|
||||||
events, err := t.db.EventsFromIDs(ctx, []string{missingEventID})
|
events, err := t.db.EventsFromIDs(ctx, t.roomInfo.RoomNID, []string{missingEventID})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.log.Warnf("Failed to query roomserver for missing event %s: %s - falling back to remote", missingEventID, err)
|
t.log.Warnf("Failed to query roomserver for missing event %s: %s - falling back to remote", missingEventID, err)
|
||||||
} else if len(events) == 1 {
|
} else if len(events) == 1 {
|
||||||
|
|
|
||||||
|
|
@ -70,7 +70,7 @@ func (r *Admin) PerformAdminEvacuateRoom(
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
memberEvents, err := r.DB.Events(ctx, memberNIDs)
|
memberEvents, err := r.DB.Events(ctx, roomInfo.RoomNID, memberNIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
res.Error = &api.PerformError{
|
res.Error = &api.PerformError{
|
||||||
Code: api.PerformErrorBadRequest,
|
Code: api.PerformErrorBadRequest,
|
||||||
|
|
|
||||||
|
|
@ -86,7 +86,7 @@ func (r *Backfiller) PerformBackfill(
|
||||||
// Retrieve events from the list that was filled previously. If we fail to get
|
// Retrieve events from the list that was filled previously. If we fail to get
|
||||||
// events from the database then attempt once to get them from federation instead.
|
// events from the database then attempt once to get them from federation instead.
|
||||||
var loadedEvents []*gomatrixserverlib.Event
|
var loadedEvents []*gomatrixserverlib.Event
|
||||||
loadedEvents, err = helpers.LoadEvents(ctx, r.DB, resultNIDs)
|
loadedEvents, err = helpers.LoadEvents(ctx, r.DB, info.RoomNID, resultNIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if _, ok := err.(types.MissingEventError); ok {
|
if _, ok := err.(types.MissingEventError); ok {
|
||||||
return r.backfillViaFederation(ctx, request, response)
|
return r.backfillViaFederation(ctx, request, response)
|
||||||
|
|
@ -258,6 +258,7 @@ type backfillRequester struct {
|
||||||
eventIDToBeforeStateIDs map[string][]string
|
eventIDToBeforeStateIDs map[string][]string
|
||||||
eventIDMap map[string]*gomatrixserverlib.Event
|
eventIDMap map[string]*gomatrixserverlib.Event
|
||||||
historyVisiblity gomatrixserverlib.HistoryVisibility
|
historyVisiblity gomatrixserverlib.HistoryVisibility
|
||||||
|
roomInfo types.RoomInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
func newBackfillRequester(
|
func newBackfillRequester(
|
||||||
|
|
@ -454,14 +455,14 @@ FindSuccessor:
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
stateEntries, err := helpers.StateBeforeEvent(ctx, b.db, info, NIDs[eventID])
|
stateEntries, err := helpers.StateBeforeEvent(ctx, b.db, info, NIDs[eventID].EventNID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithField("event_id", eventID).WithError(err).Error("ServersAtEvent: failed to load state before event")
|
logrus.WithField("event_id", eventID).WithError(err).Error("ServersAtEvent: failed to load state before event")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// possibly return all joined servers depending on history visiblity
|
// possibly return all joined servers depending on history visiblity
|
||||||
memberEventsFromVis, visibility, err := joinEventsFromHistoryVisibility(ctx, b.db, roomID, stateEntries, b.virtualHost)
|
memberEventsFromVis, visibility, err := joinEventsFromHistoryVisibility(ctx, b.db, info, stateEntries, b.virtualHost)
|
||||||
b.historyVisiblity = visibility
|
b.historyVisiblity = visibility
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Error("ServersAtEvent: failed calculate servers from history visibility rules")
|
logrus.WithError(err).Error("ServersAtEvent: failed calculate servers from history visibility rules")
|
||||||
|
|
@ -472,7 +473,7 @@ FindSuccessor:
|
||||||
// Retrieve all "m.room.member" state events of "join" membership, which
|
// Retrieve all "m.room.member" state events of "join" membership, which
|
||||||
// contains the list of users in the room before the event, therefore all
|
// contains the list of users in the room before the event, therefore all
|
||||||
// the servers in it at that moment.
|
// the servers in it at that moment.
|
||||||
memberEvents, err := helpers.GetMembershipsAtState(ctx, b.db, stateEntries, true)
|
memberEvents, err := helpers.GetMembershipsAtState(ctx, b.db, info.RoomNID, stateEntries, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithField("event_id", eventID).WithError(err).Error("ServersAtEvent: failed to get memberships before event")
|
logrus.WithField("event_id", eventID).WithError(err).Error("ServersAtEvent: failed to get memberships before event")
|
||||||
return nil
|
return nil
|
||||||
|
|
@ -523,11 +524,15 @@ func (b *backfillRequester) ProvideEvents(roomVer gomatrixserverlib.RoomVersion,
|
||||||
}
|
}
|
||||||
eventNIDs := make([]types.EventNID, len(nidMap))
|
eventNIDs := make([]types.EventNID, len(nidMap))
|
||||||
i := 0
|
i := 0
|
||||||
|
roomNID := b.roomInfo.RoomNID
|
||||||
for _, nid := range nidMap {
|
for _, nid := range nidMap {
|
||||||
eventNIDs[i] = nid
|
eventNIDs[i] = nid.EventNID
|
||||||
i++
|
i++
|
||||||
|
if roomNID == 0 {
|
||||||
|
roomNID = nid.RoomNID
|
||||||
|
}
|
||||||
}
|
}
|
||||||
eventsWithNids, err := b.db.Events(ctx, eventNIDs)
|
eventsWithNids, err := b.db.Events(ctx, roomNID, eventNIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).WithField("event_nids", eventNIDs).Error("Failed to load events")
|
logrus.WithError(err).WithField("event_nids", eventNIDs).Error("Failed to load events")
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
@ -544,7 +549,7 @@ func (b *backfillRequester) ProvideEvents(roomVer gomatrixserverlib.RoomVersion,
|
||||||
// TODO: Long term we probably want a history_visibility table which stores eventNID | visibility_enum so we can just
|
// TODO: Long term we probably want a history_visibility table which stores eventNID | visibility_enum so we can just
|
||||||
// pull all events and then filter by that table.
|
// pull all events and then filter by that table.
|
||||||
func joinEventsFromHistoryVisibility(
|
func joinEventsFromHistoryVisibility(
|
||||||
ctx context.Context, db storage.Database, roomID string, stateEntries []types.StateEntry,
|
ctx context.Context, db storage.RoomDatabase, roomInfo *types.RoomInfo, stateEntries []types.StateEntry,
|
||||||
thisServer gomatrixserverlib.ServerName) ([]types.Event, gomatrixserverlib.HistoryVisibility, error) {
|
thisServer gomatrixserverlib.ServerName) ([]types.Event, gomatrixserverlib.HistoryVisibility, error) {
|
||||||
|
|
||||||
var eventNIDs []types.EventNID
|
var eventNIDs []types.EventNID
|
||||||
|
|
@ -557,7 +562,7 @@ func joinEventsFromHistoryVisibility(
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get all of the events in this state
|
// Get all of the events in this state
|
||||||
stateEvents, err := db.Events(ctx, eventNIDs)
|
stateEvents, err := db.Events(ctx, roomInfo.RoomNID, eventNIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// even though the default should be shared, restricting the visibility to joined
|
// even though the default should be shared, restricting the visibility to joined
|
||||||
// feels more secure here.
|
// feels more secure here.
|
||||||
|
|
@ -570,21 +575,17 @@ func joinEventsFromHistoryVisibility(
|
||||||
|
|
||||||
// Can we see events in the room?
|
// Can we see events in the room?
|
||||||
canSeeEvents := auth.IsServerAllowed(thisServer, true, events)
|
canSeeEvents := auth.IsServerAllowed(thisServer, true, events)
|
||||||
visibility := gomatrixserverlib.HistoryVisibility(auth.HistoryVisibilityForRoom(events))
|
visibility := auth.HistoryVisibilityForRoom(events)
|
||||||
if !canSeeEvents {
|
if !canSeeEvents {
|
||||||
logrus.Infof("ServersAtEvent history not visible to us: %s", visibility)
|
logrus.Infof("ServersAtEvent history not visible to us: %s", visibility)
|
||||||
return nil, visibility, nil
|
return nil, visibility, nil
|
||||||
}
|
}
|
||||||
// get joined members
|
// get joined members
|
||||||
info, err := db.RoomInfo(ctx, roomID)
|
joinEventNIDs, err := db.GetMembershipEventNIDsForRoom(ctx, roomInfo.RoomNID, true, false)
|
||||||
if err != nil {
|
|
||||||
return nil, visibility, nil
|
|
||||||
}
|
|
||||||
joinEventNIDs, err := db.GetMembershipEventNIDsForRoom(ctx, info.RoomNID, true, false)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, visibility, err
|
return nil, visibility, err
|
||||||
}
|
}
|
||||||
evs, err := db.Events(ctx, joinEventNIDs)
|
evs, err := db.Events(ctx, roomInfo.RoomNID, joinEventNIDs)
|
||||||
return evs, visibility, err
|
return evs, visibility, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -601,7 +602,7 @@ func persistEvents(ctx context.Context, db storage.Database, events []*gomatrixs
|
||||||
authNids := make([]types.EventNID, len(nidMap))
|
authNids := make([]types.EventNID, len(nidMap))
|
||||||
i := 0
|
i := 0
|
||||||
for _, nid := range nidMap {
|
for _, nid := range nidMap {
|
||||||
authNids[i] = nid
|
authNids[i] = nid.EventNID
|
||||||
i++
|
i++
|
||||||
}
|
}
|
||||||
var redactedEventID string
|
var redactedEventID string
|
||||||
|
|
|
||||||
|
|
@ -29,7 +29,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type InboundPeeker struct {
|
type InboundPeeker struct {
|
||||||
DB storage.Database
|
DB storage.RoomDatabase
|
||||||
Inputer *input.Inputer
|
Inputer *input.Inputer
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -64,7 +64,7 @@ func (r *InboundPeeker) PerformInboundPeek(
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
latestEvents, err := r.DB.EventsFromIDs(ctx, []string{latestEventRefs[0].EventID})
|
latestEvents, err := r.DB.EventsFromIDs(ctx, info.RoomNID, []string{latestEventRefs[0].EventID})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
@ -88,7 +88,7 @@ func (r *InboundPeeker) PerformInboundPeek(
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
stateEvents, err = helpers.LoadStateEvents(ctx, r.DB, stateEntries)
|
stateEvents, err = helpers.LoadStateEvents(ctx, r.DB, info.RoomNID, stateEntries)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -194,7 +194,7 @@ func (r *Inviter) PerformInvite(
|
||||||
// try and see if the user is allowed to make this invite. We can't do
|
// try and see if the user is allowed to make this invite. We can't do
|
||||||
// this for invites coming in over federation - we have to take those on
|
// this for invites coming in over federation - we have to take those on
|
||||||
// trust.
|
// trust.
|
||||||
_, err = helpers.CheckAuthEvents(ctx, r.DB, event, event.AuthEventIDs())
|
_, err = helpers.CheckAuthEvents(ctx, r.DB, info.RoomNID, event, event.AuthEventIDs())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.WithError(err).WithField("event_id", event.EventID()).WithField("auth_event_ids", event.AuthEventIDs()).Error(
|
logger.WithError(err).WithField("event_id", event.EventID()).WithField("auth_event_ids", event.AuthEventIDs()).Error(
|
||||||
"processInviteEvent.checkAuthEvents failed for event",
|
"processInviteEvent.checkAuthEvents failed for event",
|
||||||
|
|
@ -291,7 +291,7 @@ func buildInviteStrippedState(
|
||||||
for _, stateNID := range stateEntries {
|
for _, stateNID := range stateEntries {
|
||||||
stateNIDs = append(stateNIDs, stateNID.EventNID)
|
stateNIDs = append(stateNIDs, stateNID.EventNID)
|
||||||
}
|
}
|
||||||
stateEvents, err := db.Events(ctx, stateNIDs)
|
stateEvents, err := db.Events(ctx, info.RoomNID, stateNIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -22,7 +22,6 @@ import (
|
||||||
fsAPI "github.com/matrix-org/dendrite/federationapi/api"
|
fsAPI "github.com/matrix-org/dendrite/federationapi/api"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/dendrite/roomserver/internal/input"
|
"github.com/matrix-org/dendrite/roomserver/internal/input"
|
||||||
"github.com/matrix-org/dendrite/roomserver/storage"
|
|
||||||
"github.com/matrix-org/dendrite/setup/config"
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
)
|
)
|
||||||
|
|
@ -31,9 +30,7 @@ type Unpeeker struct {
|
||||||
ServerName gomatrixserverlib.ServerName
|
ServerName gomatrixserverlib.ServerName
|
||||||
Cfg *config.RoomServer
|
Cfg *config.RoomServer
|
||||||
FSAPI fsAPI.RoomserverFederationAPI
|
FSAPI fsAPI.RoomserverFederationAPI
|
||||||
DB storage.Database
|
Inputer *input.Inputer
|
||||||
|
|
||||||
Inputer *input.Inputer
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// PerformPeek handles peeking into matrix rooms, including over federation by talking to the federationapi.
|
// PerformPeek handles peeking into matrix rooms, including over federation by talking to the federationapi.
|
||||||
|
|
|
||||||
|
|
@ -102,7 +102,7 @@ func (r *Queryer) QueryStateAfterEvents(
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
stateEvents, err := helpers.LoadStateEvents(ctx, r.DB, stateEntries)
|
stateEvents, err := helpers.LoadStateEvents(ctx, r.DB, info.RoomNID, stateEntries)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
@ -138,17 +138,7 @@ func (r *Queryer) QueryEventsByID(
|
||||||
request *api.QueryEventsByIDRequest,
|
request *api.QueryEventsByIDRequest,
|
||||||
response *api.QueryEventsByIDResponse,
|
response *api.QueryEventsByIDResponse,
|
||||||
) error {
|
) error {
|
||||||
eventNIDMap, err := r.DB.EventNIDs(ctx, request.EventIDs)
|
events, err := r.DB.EventsFromIDs(ctx, 0, request.EventIDs)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
var eventNIDs []types.EventNID
|
|
||||||
for _, nid := range eventNIDMap {
|
|
||||||
eventNIDs = append(eventNIDs, nid)
|
|
||||||
}
|
|
||||||
|
|
||||||
events, err := helpers.LoadEvents(ctx, r.DB, eventNIDs)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
@ -196,7 +186,7 @@ func (r *Queryer) QueryMembershipForUser(
|
||||||
response.IsInRoom = stillInRoom
|
response.IsInRoom = stillInRoom
|
||||||
response.HasBeenInRoom = true
|
response.HasBeenInRoom = true
|
||||||
|
|
||||||
evs, err := r.DB.Events(ctx, []types.EventNID{membershipEventNID})
|
evs, err := r.DB.Events(ctx, info.RoomNID, []types.EventNID{membershipEventNID})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
@ -278,10 +268,10 @@ func (r *Queryer) QueryMembershipAtEvent(
|
||||||
// once. If we have more than one membership event, we need to get the state for each state entry.
|
// once. If we have more than one membership event, we need to get the state for each state entry.
|
||||||
if canShortCircuit {
|
if canShortCircuit {
|
||||||
if len(memberships) == 0 {
|
if len(memberships) == 0 {
|
||||||
memberships, err = helpers.GetMembershipsAtState(ctx, r.DB, stateEntry, false)
|
memberships, err = helpers.GetMembershipsAtState(ctx, r.DB, info.RoomNID, stateEntry, false)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
memberships, err = helpers.GetMembershipsAtState(ctx, r.DB, stateEntry, false)
|
memberships, err = helpers.GetMembershipsAtState(ctx, r.DB, info.RoomNID, stateEntry, false)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("unable to get memberships at state: %w", err)
|
return fmt.Errorf("unable to get memberships at state: %w", err)
|
||||||
|
|
@ -328,7 +318,7 @@ func (r *Queryer) QueryMembershipsForRoom(
|
||||||
}
|
}
|
||||||
return fmt.Errorf("r.DB.GetMembershipEventNIDsForRoom: %w", err)
|
return fmt.Errorf("r.DB.GetMembershipEventNIDsForRoom: %w", err)
|
||||||
}
|
}
|
||||||
events, err = r.DB.Events(ctx, eventNIDs)
|
events, err = r.DB.Events(ctx, info.RoomNID, eventNIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("r.DB.Events: %w", err)
|
return fmt.Errorf("r.DB.Events: %w", err)
|
||||||
}
|
}
|
||||||
|
|
@ -367,14 +357,14 @@ func (r *Queryer) QueryMembershipsForRoom(
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
events, err = r.DB.Events(ctx, eventNIDs)
|
events, err = r.DB.Events(ctx, info.RoomNID, eventNIDs)
|
||||||
} else {
|
} else {
|
||||||
stateEntries, err = helpers.StateBeforeEvent(ctx, r.DB, info, membershipEventNID)
|
stateEntries, err = helpers.StateBeforeEvent(ctx, r.DB, info, membershipEventNID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithField("membership_event_nid", membershipEventNID).WithError(err).Error("failed to load state before event")
|
logrus.WithField("membership_event_nid", membershipEventNID).WithError(err).Error("failed to load state before event")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
events, err = helpers.GetMembershipsAtState(ctx, r.DB, stateEntries, request.JoinedOnly)
|
events, err = helpers.GetMembershipsAtState(ctx, r.DB, info.RoomNID, stateEntries, request.JoinedOnly)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -425,7 +415,7 @@ func (r *Queryer) QueryServerAllowedToSeeEvent(
|
||||||
request *api.QueryServerAllowedToSeeEventRequest,
|
request *api.QueryServerAllowedToSeeEventRequest,
|
||||||
response *api.QueryServerAllowedToSeeEventResponse,
|
response *api.QueryServerAllowedToSeeEventResponse,
|
||||||
) (err error) {
|
) (err error) {
|
||||||
events, err := r.DB.EventsFromIDs(ctx, []string{request.EventID})
|
events, err := r.DB.EventsFromIDs(ctx, 0, []string{request.EventID})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
@ -476,7 +466,7 @@ func (r *Queryer) QueryMissingEvents(
|
||||||
eventsToFilter[id] = true
|
eventsToFilter[id] = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
events, err := r.DB.EventsFromIDs(ctx, front)
|
events, err := r.DB.EventsFromIDs(ctx, 0, front)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
@ -496,7 +486,7 @@ func (r *Queryer) QueryMissingEvents(
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
loadedEvents, err := helpers.LoadEvents(ctx, r.DB, resultNIDs)
|
loadedEvents, err := helpers.LoadEvents(ctx, r.DB, info.RoomNID, resultNIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
@ -621,11 +611,11 @@ func (r *Queryer) loadStateAtEventIDs(ctx context.Context, roomInfo *types.RoomI
|
||||||
return nil, rejected, false, err
|
return nil, rejected, false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
events, err := helpers.LoadStateEvents(ctx, r.DB, stateEntries)
|
events, err := helpers.LoadStateEvents(ctx, r.DB, roomInfo.RoomNID, stateEntries)
|
||||||
return events, rejected, false, err
|
return events, rejected, false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
type eventsFromIDs func(context.Context, []string) ([]types.Event, error)
|
type eventsFromIDs func(context.Context, types.RoomNID, []string) ([]types.Event, error)
|
||||||
|
|
||||||
// GetAuthChain fetches the auth chain for the given auth events. An auth chain
|
// GetAuthChain fetches the auth chain for the given auth events. An auth chain
|
||||||
// is the list of all events that are referenced in the auth_events section, and
|
// is the list of all events that are referenced in the auth_events section, and
|
||||||
|
|
@ -643,7 +633,7 @@ func GetAuthChain(
|
||||||
|
|
||||||
for len(eventsToFetch) > 0 {
|
for len(eventsToFetch) > 0 {
|
||||||
// Try to retrieve the events from the database.
|
// Try to retrieve the events from the database.
|
||||||
events, err := fn(ctx, eventsToFetch)
|
events, err := fn(ctx, 0, eventsToFetch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
@ -981,7 +971,7 @@ func (r *Queryer) QueryRestrictedJoinAllowed(ctx context.Context, req *api.Query
|
||||||
// For each of the joined users, let's see if we can get a valid
|
// For each of the joined users, let's see if we can get a valid
|
||||||
// membership event.
|
// membership event.
|
||||||
for _, joinNID := range joinNIDs {
|
for _, joinNID := range joinNIDs {
|
||||||
events, err := r.DB.Events(ctx, []types.EventNID{joinNID})
|
events, err := r.DB.Events(ctx, roomInfo.RoomNID, []types.EventNID{joinNID})
|
||||||
if err != nil || len(events) != 1 {
|
if err != nil || len(events) != 1 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -80,7 +80,7 @@ func (db *getEventDB) addFakeEvents(graph map[string][]string) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// EventsFromIDs implements RoomserverInternalAPIEventDB
|
// EventsFromIDs implements RoomserverInternalAPIEventDB
|
||||||
func (db *getEventDB) EventsFromIDs(ctx context.Context, eventIDs []string) (res []types.Event, err error) {
|
func (db *getEventDB) EventsFromIDs(ctx context.Context, roomNID types.RoomNID, eventIDs []string) (res []types.Event, err error) {
|
||||||
for _, evID := range eventIDs {
|
for _, evID := range eventIDs {
|
||||||
res = append(res, types.Event{
|
res = append(res, types.Event{
|
||||||
EventNID: 0,
|
EventNID: 0,
|
||||||
|
|
|
||||||
|
|
@ -23,8 +23,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/setup/base"
|
"github.com/matrix-org/dendrite/setup/base"
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewInternalAPI returns a concerete implementation of the internal API. Callers
|
// NewInternalAPI returns a concrete implementation of the internal API.
|
||||||
// can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes.
|
|
||||||
func NewInternalAPI(
|
func NewInternalAPI(
|
||||||
base *base.BaseDendrite,
|
base *base.BaseDendrite,
|
||||||
) api.RoomserverInternalAPI {
|
) api.RoomserverInternalAPI {
|
||||||
|
|
|
||||||
|
|
@ -41,8 +41,8 @@ type StateResolutionStorage interface {
|
||||||
StateEntriesForTuples(ctx context.Context, stateBlockNIDs []types.StateBlockNID, stateKeyTuples []types.StateKeyTuple) ([]types.StateEntryList, error)
|
StateEntriesForTuples(ctx context.Context, stateBlockNIDs []types.StateBlockNID, stateKeyTuples []types.StateKeyTuple) ([]types.StateEntryList, error)
|
||||||
StateAtEventIDs(ctx context.Context, eventIDs []string) ([]types.StateAtEvent, error)
|
StateAtEventIDs(ctx context.Context, eventIDs []string) ([]types.StateAtEvent, error)
|
||||||
AddState(ctx context.Context, roomNID types.RoomNID, stateBlockNIDs []types.StateBlockNID, state []types.StateEntry) (types.StateSnapshotNID, error)
|
AddState(ctx context.Context, roomNID types.RoomNID, stateBlockNIDs []types.StateBlockNID, state []types.StateEntry) (types.StateSnapshotNID, error)
|
||||||
Events(ctx context.Context, eventNIDs []types.EventNID) ([]types.Event, error)
|
Events(ctx context.Context, roomNID types.RoomNID, eventNIDs []types.EventNID) ([]types.Event, error)
|
||||||
EventsFromIDs(ctx context.Context, eventIDs []string) ([]types.Event, error)
|
EventsFromIDs(ctx context.Context, roomNID types.RoomNID, eventIDs []string) ([]types.Event, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type StateResolution struct {
|
type StateResolution struct {
|
||||||
|
|
@ -975,7 +975,7 @@ func (v *StateResolution) resolveConflictsV2(
|
||||||
|
|
||||||
// Store the newly found auth events in the auth set for this event.
|
// Store the newly found auth events in the auth set for this event.
|
||||||
var authEventMap map[string]types.StateEntry
|
var authEventMap map[string]types.StateEntry
|
||||||
authSets[key], authEventMap, err = loader.loadAuthEvents(sctx, conflictedEvent, knownAuthEvents)
|
authSets[key], authEventMap, err = loader.loadAuthEvents(sctx, v.roomInfo.RoomNID, conflictedEvent, knownAuthEvents)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
@ -1091,7 +1091,7 @@ func (v *StateResolution) loadStateEvents(
|
||||||
eventNIDs = append(eventNIDs, entry.EventNID)
|
eventNIDs = append(eventNIDs, entry.EventNID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
events, err := v.db.Events(ctx, eventNIDs)
|
events, err := v.db.Events(ctx, v.roomInfo.RoomNID, eventNIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
@ -1120,7 +1120,7 @@ type authEventLoader struct {
|
||||||
// loadAuthEvents loads all of the auth events for a given event recursively,
|
// loadAuthEvents loads all of the auth events for a given event recursively,
|
||||||
// along with a map that contains state entries for all of the auth events.
|
// along with a map that contains state entries for all of the auth events.
|
||||||
func (l *authEventLoader) loadAuthEvents(
|
func (l *authEventLoader) loadAuthEvents(
|
||||||
ctx context.Context, event *gomatrixserverlib.Event, eventMap map[string]types.Event,
|
ctx context.Context, roomNID types.RoomNID, event *gomatrixserverlib.Event, eventMap map[string]types.Event,
|
||||||
) ([]*gomatrixserverlib.Event, map[string]types.StateEntry, error) {
|
) ([]*gomatrixserverlib.Event, map[string]types.StateEntry, error) {
|
||||||
l.Lock()
|
l.Lock()
|
||||||
defer l.Unlock()
|
defer l.Unlock()
|
||||||
|
|
@ -1155,7 +1155,7 @@ func (l *authEventLoader) loadAuthEvents(
|
||||||
// If we need to get events from the database, go and fetch
|
// If we need to get events from the database, go and fetch
|
||||||
// those now.
|
// those now.
|
||||||
if len(l.lookupFromDB) > 0 {
|
if len(l.lookupFromDB) > 0 {
|
||||||
eventsFromDB, err := l.v.db.EventsFromIDs(ctx, l.lookupFromDB)
|
eventsFromDB, err := l.v.db.EventsFromIDs(ctx, roomNID, l.lookupFromDB)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, fmt.Errorf("v.db.EventsFromIDs: %w", err)
|
return nil, nil, fmt.Errorf("v.db.EventsFromIDs: %w", err)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -69,7 +69,7 @@ type Database interface {
|
||||||
) ([]types.StateEntryList, error)
|
) ([]types.StateEntryList, error)
|
||||||
// Look up the Events for a list of numeric event IDs.
|
// Look up the Events for a list of numeric event IDs.
|
||||||
// Returns a sorted list of events.
|
// Returns a sorted list of events.
|
||||||
Events(ctx context.Context, eventNIDs []types.EventNID) ([]types.Event, error)
|
Events(ctx context.Context, roomNID types.RoomNID, eventNIDs []types.EventNID) ([]types.Event, error)
|
||||||
// Look up snapshot NID for an event ID string
|
// Look up snapshot NID for an event ID string
|
||||||
SnapshotNIDFromEventID(ctx context.Context, eventID string) (types.StateSnapshotNID, error)
|
SnapshotNIDFromEventID(ctx context.Context, eventID string) (types.StateSnapshotNID, error)
|
||||||
BulkSelectSnapshotsFromEventIDs(ctx context.Context, eventIDs []string) (map[types.StateSnapshotNID][]string, error)
|
BulkSelectSnapshotsFromEventIDs(ctx context.Context, eventIDs []string) (map[types.StateSnapshotNID][]string, error)
|
||||||
|
|
@ -87,7 +87,7 @@ type Database interface {
|
||||||
EventStateKeys(ctx context.Context, eventStateKeyNIDs []types.EventStateKeyNID) (map[types.EventStateKeyNID]string, error)
|
EventStateKeys(ctx context.Context, eventStateKeyNIDs []types.EventStateKeyNID) (map[types.EventStateKeyNID]string, error)
|
||||||
// Look up the numeric IDs for a list of events.
|
// Look up the numeric IDs for a list of events.
|
||||||
// Returns an error if there was a problem talking to the database.
|
// Returns an error if there was a problem talking to the database.
|
||||||
EventNIDs(ctx context.Context, eventIDs []string) (map[string]types.EventNID, error)
|
EventNIDs(ctx context.Context, eventIDs []string) (map[string]types.EventMetadata, error)
|
||||||
// Set the state at an event. FIXME TODO: "at"
|
// Set the state at an event. FIXME TODO: "at"
|
||||||
SetState(ctx context.Context, eventNID types.EventNID, stateNID types.StateSnapshotNID) error
|
SetState(ctx context.Context, eventNID types.EventNID, stateNID types.StateSnapshotNID) error
|
||||||
// Lookup the event IDs for a batch of event numeric IDs.
|
// Lookup the event IDs for a batch of event numeric IDs.
|
||||||
|
|
@ -138,7 +138,7 @@ type Database interface {
|
||||||
// EventsFromIDs looks up the Events for a list of event IDs. Does not error if event was
|
// EventsFromIDs looks up the Events for a list of event IDs. Does not error if event was
|
||||||
// not found.
|
// not found.
|
||||||
// Returns an error if the retrieval went wrong.
|
// Returns an error if the retrieval went wrong.
|
||||||
EventsFromIDs(ctx context.Context, eventIDs []string) ([]types.Event, error)
|
EventsFromIDs(ctx context.Context, roomNID types.RoomNID, eventIDs []string) ([]types.Event, error)
|
||||||
// Publish or unpublish a room from the room directory.
|
// Publish or unpublish a room from the room directory.
|
||||||
PublishRoom(ctx context.Context, roomID, appserviceID, networkID string, publish bool) error
|
PublishRoom(ctx context.Context, roomID, appserviceID, networkID string, publish bool) error
|
||||||
// Returns a list of room IDs for rooms which are published.
|
// Returns a list of room IDs for rooms which are published.
|
||||||
|
|
@ -183,3 +183,45 @@ type Database interface {
|
||||||
ctx context.Context, userNID types.EventStateKeyNID, info *types.RoomInfo, eventIDs ...string,
|
ctx context.Context, userNID types.EventStateKeyNID, info *types.RoomInfo, eventIDs ...string,
|
||||||
) (map[string]*gomatrixserverlib.HeaderedEvent, error)
|
) (map[string]*gomatrixserverlib.HeaderedEvent, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
RoomsTable
|
||||||
|
EventsTable
|
||||||
|
EventJSONTable
|
||||||
|
MembershipTable
|
||||||
|
StateSnapshotTable
|
||||||
|
StateBlockTable
|
||||||
|
EventTypesTable
|
||||||
|
EventStateKeysTable
|
||||||
|
|
||||||
|
|
||||||
|
PublishedTable?
|
||||||
|
RoomAliases?
|
||||||
|
|
||||||
|
*/
|
||||||
|
|
||||||
|
type RoomDatabase interface {
|
||||||
|
// RoomInfo returns room information for the given room ID, or nil if there is no room.
|
||||||
|
RoomInfo(ctx context.Context, roomID string) (*types.RoomInfo, error)
|
||||||
|
// IsEventRejected returns true if the event is known and rejected.
|
||||||
|
IsEventRejected(ctx context.Context, roomNID types.RoomNID, eventID string) (rejected bool, err error)
|
||||||
|
MissingAuthPrevEvents(ctx context.Context, e *gomatrixserverlib.Event) (missingAuth, missingPrev []string, err error)
|
||||||
|
// Stores a matrix room event in the database. Returns the room NID, the state snapshot and the redacted event ID if any, or an error.
|
||||||
|
StoreEvent(ctx context.Context, event *gomatrixserverlib.Event, authEventNIDs []types.EventNID, isRejected bool) (types.EventNID, types.RoomNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error)
|
||||||
|
UpgradeRoom(ctx context.Context, oldRoomID, newRoomID, eventSender string) error
|
||||||
|
GetRoomUpdater(ctx context.Context, roomInfo *types.RoomInfo) (*shared.RoomUpdater, error)
|
||||||
|
StateEntriesForEventIDs(ctx context.Context, eventIDs []string, excludeRejected bool) ([]types.StateEntry, error)
|
||||||
|
GetMembershipEventNIDsForRoom(ctx context.Context, roomNID types.RoomNID, joinOnly bool, localOnly bool) ([]types.EventNID, error)
|
||||||
|
StateBlockNIDs(ctx context.Context, stateNIDs []types.StateSnapshotNID) ([]types.StateBlockNIDList, error)
|
||||||
|
StateEntries(ctx context.Context, stateBlockNIDs []types.StateBlockNID) ([]types.StateEntryList, error)
|
||||||
|
SnapshotNIDFromEventID(ctx context.Context, eventID string) (types.StateSnapshotNID, error)
|
||||||
|
BulkSelectSnapshotsFromEventIDs(ctx context.Context, eventIDs []string) (map[types.StateSnapshotNID][]string, error)
|
||||||
|
StateEntriesForTuples(ctx context.Context, stateBlockNIDs []types.StateBlockNID, stateKeyTuples []types.StateKeyTuple) ([]types.StateEntryList, error)
|
||||||
|
StateAtEventIDs(ctx context.Context, eventIDs []string) ([]types.StateAtEvent, error)
|
||||||
|
AddState(ctx context.Context, roomNID types.RoomNID, stateBlockNIDs []types.StateBlockNID, state []types.StateEntry) (types.StateSnapshotNID, error)
|
||||||
|
Events(ctx context.Context, roomNID types.RoomNID, eventNIDs []types.EventNID) ([]types.Event, error)
|
||||||
|
EventsFromIDs(ctx context.Context, roomNID types.RoomNID, eventIDs []string) ([]types.Event, error)
|
||||||
|
LatestEventIDs(ctx context.Context, roomNID types.RoomNID) ([]gomatrixserverlib.EventReference, types.StateSnapshotNID, int64, error)
|
||||||
|
EventTypeNIDs(ctx context.Context, eventTypes []string) (map[string]types.EventTypeNID, error)
|
||||||
|
EventStateKeyNIDs(ctx context.Context, eventStateKeys []string) (map[string]types.EventStateKeyNID, error)
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -140,10 +140,10 @@ const bulkSelectEventIDSQL = "" +
|
||||||
"SELECT event_nid, event_id FROM roomserver_events WHERE event_nid = ANY($1)"
|
"SELECT event_nid, event_id FROM roomserver_events WHERE event_nid = ANY($1)"
|
||||||
|
|
||||||
const bulkSelectEventNIDSQL = "" +
|
const bulkSelectEventNIDSQL = "" +
|
||||||
"SELECT event_id, event_nid FROM roomserver_events WHERE event_id = ANY($1)"
|
"SELECT event_id, event_nid, room_nid FROM roomserver_events WHERE event_id = ANY($1)"
|
||||||
|
|
||||||
const bulkSelectUnsentEventNIDSQL = "" +
|
const bulkSelectUnsentEventNIDSQL = "" +
|
||||||
"SELECT event_id, event_nid FROM roomserver_events WHERE event_id = ANY($1) AND sent_to_output = FALSE"
|
"SELECT event_id, event_nid, room_nid FROM roomserver_events WHERE event_id = ANY($1) AND sent_to_output = FALSE"
|
||||||
|
|
||||||
const selectMaxEventDepthSQL = "" +
|
const selectMaxEventDepthSQL = "" +
|
||||||
"SELECT COALESCE(MAX(depth) + 1, 0) FROM roomserver_events WHERE event_nid = ANY($1)"
|
"SELECT COALESCE(MAX(depth) + 1, 0) FROM roomserver_events WHERE event_nid = ANY($1)"
|
||||||
|
|
@ -520,20 +520,20 @@ func (s *eventStatements) BulkSelectEventID(ctx context.Context, txn *sql.Tx, ev
|
||||||
|
|
||||||
// BulkSelectEventNIDs returns a map from string event ID to numeric event ID.
|
// BulkSelectEventNIDs returns a map from string event ID to numeric event ID.
|
||||||
// If an event ID is not in the database then it is omitted from the map.
|
// If an event ID is not in the database then it is omitted from the map.
|
||||||
func (s *eventStatements) BulkSelectEventNID(ctx context.Context, txn *sql.Tx, eventIDs []string) (map[string]types.EventNID, error) {
|
func (s *eventStatements) BulkSelectEventNID(ctx context.Context, txn *sql.Tx, eventIDs []string) (map[string]types.EventMetadata, error) {
|
||||||
return s.bulkSelectEventNID(ctx, txn, eventIDs, false)
|
return s.bulkSelectEventNID(ctx, txn, eventIDs, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
// BulkSelectEventNIDs returns a map from string event ID to numeric event ID
|
// BulkSelectEventNIDs returns a map from string event ID to numeric event ID
|
||||||
// only for events that haven't already been sent to the roomserver output.
|
// only for events that haven't already been sent to the roomserver output.
|
||||||
// If an event ID is not in the database then it is omitted from the map.
|
// If an event ID is not in the database then it is omitted from the map.
|
||||||
func (s *eventStatements) BulkSelectUnsentEventNID(ctx context.Context, txn *sql.Tx, eventIDs []string) (map[string]types.EventNID, error) {
|
func (s *eventStatements) BulkSelectUnsentEventNID(ctx context.Context, txn *sql.Tx, eventIDs []string) (map[string]types.EventMetadata, error) {
|
||||||
return s.bulkSelectEventNID(ctx, txn, eventIDs, true)
|
return s.bulkSelectEventNID(ctx, txn, eventIDs, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
// bulkSelectEventNIDs returns a map from string event ID to numeric event ID.
|
// bulkSelectEventNIDs returns a map from string event ID to numeric event ID.
|
||||||
// If an event ID is not in the database then it is omitted from the map.
|
// If an event ID is not in the database then it is omitted from the map.
|
||||||
func (s *eventStatements) bulkSelectEventNID(ctx context.Context, txn *sql.Tx, eventIDs []string, onlyUnsent bool) (map[string]types.EventNID, error) {
|
func (s *eventStatements) bulkSelectEventNID(ctx context.Context, txn *sql.Tx, eventIDs []string, onlyUnsent bool) (map[string]types.EventMetadata, error) {
|
||||||
var stmt *sql.Stmt
|
var stmt *sql.Stmt
|
||||||
if onlyUnsent {
|
if onlyUnsent {
|
||||||
stmt = sqlutil.TxStmt(txn, s.bulkSelectUnsentEventNIDStmt)
|
stmt = sqlutil.TxStmt(txn, s.bulkSelectUnsentEventNIDStmt)
|
||||||
|
|
@ -545,14 +545,18 @@ func (s *eventStatements) bulkSelectEventNID(ctx context.Context, txn *sql.Tx, e
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
defer internal.CloseAndLogIfError(ctx, rows, "bulkSelectEventNID: rows.close() failed")
|
defer internal.CloseAndLogIfError(ctx, rows, "bulkSelectEventNID: rows.close() failed")
|
||||||
results := make(map[string]types.EventNID, len(eventIDs))
|
results := make(map[string]types.EventMetadata, len(eventIDs))
|
||||||
var eventID string
|
var eventID string
|
||||||
var eventNID int64
|
var eventNID int64
|
||||||
|
var roomNID int64
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
if err = rows.Scan(&eventID, &eventNID); err != nil {
|
if err = rows.Scan(&eventID, &eventNID, &roomNID); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
results[eventID] = types.EventNID(eventNID)
|
results[eventID] = types.EventMetadata{
|
||||||
|
EventNID: types.EventNID(eventNID),
|
||||||
|
RoomNID: types.RoomNID(roomNID),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return results, rows.Err()
|
return results, rows.Err()
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -116,10 +116,8 @@ func (u *RoomUpdater) StorePreviousEvents(eventNID types.EventNID, previousEvent
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (u *RoomUpdater) Events(
|
func (u *RoomUpdater) Events(ctx context.Context, _ types.RoomNID, eventNIDs []types.EventNID) ([]types.Event, error) {
|
||||||
ctx context.Context, eventNIDs []types.EventNID,
|
return u.d.events(ctx, u.txn, u.roomInfo.RoomNID, eventNIDs)
|
||||||
) ([]types.Event, error) {
|
|
||||||
return u.d.events(ctx, u.txn, eventNIDs)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (u *RoomUpdater) SnapshotNIDFromEventID(
|
func (u *RoomUpdater) SnapshotNIDFromEventID(
|
||||||
|
|
@ -197,12 +195,8 @@ func (u *RoomUpdater) StateAtEventIDs(
|
||||||
return u.d.EventsTable.BulkSelectStateAtEventByID(ctx, u.txn, eventIDs)
|
return u.d.EventsTable.BulkSelectStateAtEventByID(ctx, u.txn, eventIDs)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (u *RoomUpdater) EventsFromIDs(ctx context.Context, eventIDs []string) ([]types.Event, error) {
|
func (u *RoomUpdater) EventsFromIDs(ctx context.Context, roomNID types.RoomNID, eventIDs []string) ([]types.Event, error) {
|
||||||
return u.d.eventsFromIDs(ctx, u.txn, eventIDs, false)
|
return u.d.eventsFromIDs(ctx, u.txn, roomNID, eventIDs, NoFilter)
|
||||||
}
|
|
||||||
|
|
||||||
func (u *RoomUpdater) UnsentEventsFromIDs(ctx context.Context, eventIDs []string) ([]types.Event, error) {
|
|
||||||
return u.d.eventsFromIDs(ctx, u.txn, eventIDs, true)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsReferenced implements types.RoomRecentEventsUpdater
|
// IsReferenced implements types.RoomRecentEventsUpdater
|
||||||
|
|
|
||||||
|
|
@ -9,6 +9,7 @@ import (
|
||||||
|
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
"github.com/tidwall/gjson"
|
"github.com/tidwall/gjson"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal/caching"
|
"github.com/matrix-org/dendrite/internal/caching"
|
||||||
|
|
@ -262,7 +263,7 @@ func (d *Database) addState(
|
||||||
|
|
||||||
func (d *Database) EventNIDs(
|
func (d *Database) EventNIDs(
|
||||||
ctx context.Context, eventIDs []string,
|
ctx context.Context, eventIDs []string,
|
||||||
) (map[string]types.EventNID, error) {
|
) (map[string]types.EventMetadata, error) {
|
||||||
return d.eventNIDs(ctx, nil, eventIDs, NoFilter)
|
return d.eventNIDs(ctx, nil, eventIDs, NoFilter)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -275,7 +276,7 @@ const (
|
||||||
|
|
||||||
func (d *Database) eventNIDs(
|
func (d *Database) eventNIDs(
|
||||||
ctx context.Context, txn *sql.Tx, eventIDs []string, filter UnsentFilter,
|
ctx context.Context, txn *sql.Tx, eventIDs []string, filter UnsentFilter,
|
||||||
) (map[string]types.EventNID, error) {
|
) (map[string]types.EventMetadata, error) {
|
||||||
switch filter {
|
switch filter {
|
||||||
case FilterUnsentOnly:
|
case FilterUnsentOnly:
|
||||||
return d.EventsTable.BulkSelectUnsentEventNID(ctx, txn, eventIDs)
|
return d.EventsTable.BulkSelectUnsentEventNID(ctx, txn, eventIDs)
|
||||||
|
|
@ -325,11 +326,11 @@ func (d *Database) EventIDs(
|
||||||
return d.EventsTable.BulkSelectEventID(ctx, nil, eventNIDs)
|
return d.EventsTable.BulkSelectEventID(ctx, nil, eventNIDs)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Database) EventsFromIDs(ctx context.Context, eventIDs []string) ([]types.Event, error) {
|
func (d *Database) EventsFromIDs(ctx context.Context, roomNID types.RoomNID, eventIDs []string) ([]types.Event, error) {
|
||||||
return d.eventsFromIDs(ctx, nil, eventIDs, NoFilter)
|
return d.eventsFromIDs(ctx, nil, roomNID, eventIDs, NoFilter)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Database) eventsFromIDs(ctx context.Context, txn *sql.Tx, eventIDs []string, filter UnsentFilter) ([]types.Event, error) {
|
func (d *Database) eventsFromIDs(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, eventIDs []string, filter UnsentFilter) ([]types.Event, error) {
|
||||||
nidMap, err := d.eventNIDs(ctx, txn, eventIDs, filter)
|
nidMap, err := d.eventNIDs(ctx, txn, eventIDs, filter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
@ -337,10 +338,16 @@ func (d *Database) eventsFromIDs(ctx context.Context, txn *sql.Tx, eventIDs []st
|
||||||
|
|
||||||
var nids []types.EventNID
|
var nids []types.EventNID
|
||||||
for _, nid := range nidMap {
|
for _, nid := range nidMap {
|
||||||
nids = append(nids, nid)
|
nids = append(nids, nid.EventNID)
|
||||||
|
if roomNID != 0 && roomNID != nid.RoomNID {
|
||||||
|
logrus.Errorf("expected events from room %d, but also found %d", roomNID, nid.RoomNID)
|
||||||
|
}
|
||||||
|
if roomNID == 0 {
|
||||||
|
roomNID = nid.RoomNID
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return d.events(ctx, txn, nids)
|
return d.events(ctx, txn, roomNID, nids)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Database) LatestEventIDs(
|
func (d *Database) LatestEventIDs(
|
||||||
|
|
@ -480,14 +487,18 @@ func (d *Database) GetInvitesForUser(
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Database) Events(
|
func (d *Database) Events(
|
||||||
ctx context.Context, eventNIDs []types.EventNID,
|
ctx context.Context, roomNID types.RoomNID, eventNIDs []types.EventNID,
|
||||||
) ([]types.Event, error) {
|
) ([]types.Event, error) {
|
||||||
return d.events(ctx, nil, eventNIDs)
|
return d.events(ctx, nil, roomNID, eventNIDs)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Database) events(
|
func (d *Database) events(
|
||||||
ctx context.Context, txn *sql.Tx, inputEventNIDs types.EventNIDs,
|
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, inputEventNIDs types.EventNIDs,
|
||||||
) ([]types.Event, error) {
|
) ([]types.Event, error) {
|
||||||
|
if roomNID == 0 {
|
||||||
|
// No need to go further, as we won't find any events for this room.
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
sort.Sort(inputEventNIDs)
|
sort.Sort(inputEventNIDs)
|
||||||
events := make(map[types.EventNID]*gomatrixserverlib.Event, len(inputEventNIDs))
|
events := make(map[types.EventNID]*gomatrixserverlib.Event, len(inputEventNIDs))
|
||||||
eventNIDs := make([]types.EventNID, 0, len(inputEventNIDs))
|
eventNIDs := make([]types.EventNID, 0, len(inputEventNIDs))
|
||||||
|
|
@ -519,40 +530,34 @@ func (d *Database) events(
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
eventIDs, _ := d.EventsTable.BulkSelectEventID(ctx, txn, eventNIDs)
|
eventIDs, err := d.EventsTable.BulkSelectEventID(ctx, txn, eventNIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
eventIDs = map[types.EventNID]string{}
|
eventIDs = map[types.EventNID]string{}
|
||||||
}
|
}
|
||||||
var roomNIDs map[types.EventNID]types.RoomNID
|
|
||||||
roomNIDs, err = d.EventsTable.SelectRoomNIDsForEventNIDs(ctx, txn, eventNIDs)
|
var roomVersion gomatrixserverlib.RoomVersion
|
||||||
if err != nil {
|
var fetchRoomVersion bool
|
||||||
return nil, err
|
var ok bool
|
||||||
}
|
var roomID string
|
||||||
uniqueRoomNIDs := make(map[types.RoomNID]struct{})
|
if roomID, ok = d.Cache.GetRoomServerRoomID(roomNID); ok {
|
||||||
for _, n := range roomNIDs {
|
roomVersion, ok = d.Cache.GetRoomVersion(roomID)
|
||||||
uniqueRoomNIDs[n] = struct{}{}
|
if !ok {
|
||||||
}
|
fetchRoomVersion = true
|
||||||
roomVersions := make(map[types.RoomNID]gomatrixserverlib.RoomVersion)
|
|
||||||
fetchNIDList := make([]types.RoomNID, 0, len(uniqueRoomNIDs))
|
|
||||||
for n := range uniqueRoomNIDs {
|
|
||||||
if roomID, ok := d.Cache.GetRoomServerRoomID(n); ok {
|
|
||||||
if roomVersion, ok := d.Cache.GetRoomVersion(roomID); ok {
|
|
||||||
roomVersions[n] = roomVersion
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
fetchNIDList = append(fetchNIDList, n)
|
|
||||||
}
|
}
|
||||||
dbRoomVersions, err := d.RoomsTable.SelectRoomVersionsForRoomNIDs(ctx, txn, fetchNIDList)
|
|
||||||
if err != nil {
|
if roomVersion == "" || fetchRoomVersion {
|
||||||
return nil, err
|
var dbRoomVersions map[types.RoomNID]gomatrixserverlib.RoomVersion
|
||||||
}
|
dbRoomVersions, err = d.RoomsTable.SelectRoomVersionsForRoomNIDs(ctx, txn, []types.RoomNID{roomNID})
|
||||||
for n, v := range dbRoomVersions {
|
if err != nil {
|
||||||
roomVersions[n] = v
|
return nil, err
|
||||||
|
}
|
||||||
|
if roomVersion, ok = dbRoomVersions[roomNID]; !ok {
|
||||||
|
return nil, fmt.Errorf("unable to find roomversion for room %d", roomNID)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, eventJSON := range eventJSONs {
|
for _, eventJSON := range eventJSONs {
|
||||||
roomNID := roomNIDs[eventJSON.EventNID]
|
|
||||||
roomVersion := roomVersions[roomNID]
|
|
||||||
events[eventJSON.EventNID], err = gomatrixserverlib.NewEventFromTrustedJSONWithEventID(
|
events[eventJSON.EventNID], err = gomatrixserverlib.NewEventFromTrustedJSONWithEventID(
|
||||||
eventIDs[eventJSON.EventNID], eventJSON.EventJSON, false, roomVersion,
|
eventIDs[eventJSON.EventNID], eventJSON.EventJSON, false, roomVersion,
|
||||||
)
|
)
|
||||||
|
|
@ -718,7 +723,7 @@ func (d *Database) storeEvent(
|
||||||
return fmt.Errorf("d.EventJSONTable.InsertEventJSON: %w", err)
|
return fmt.Errorf("d.EventJSONTable.InsertEventJSON: %w", err)
|
||||||
}
|
}
|
||||||
if !isRejected { // ignore rejected redaction events
|
if !isRejected { // ignore rejected redaction events
|
||||||
redactionEvent, redactedEventID, err = d.handleRedactions(ctx, txn, eventNID, event)
|
redactionEvent, redactedEventID, err = d.handleRedactions(ctx, txn, roomNID, eventNID, event)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("d.handleRedactions: %w", err)
|
return fmt.Errorf("d.handleRedactions: %w", err)
|
||||||
}
|
}
|
||||||
|
|
@ -899,7 +904,7 @@ func extractRoomVersionFromCreateEvent(event *gomatrixserverlib.Event) (
|
||||||
//
|
//
|
||||||
// Returns the redaction event and the event ID of the redacted event if this call resulted in a redaction.
|
// Returns the redaction event and the event ID of the redacted event if this call resulted in a redaction.
|
||||||
func (d *Database) handleRedactions(
|
func (d *Database) handleRedactions(
|
||||||
ctx context.Context, txn *sql.Tx, eventNID types.EventNID, event *gomatrixserverlib.Event,
|
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, eventNID types.EventNID, event *gomatrixserverlib.Event,
|
||||||
) (*gomatrixserverlib.Event, string, error) {
|
) (*gomatrixserverlib.Event, string, error) {
|
||||||
var err error
|
var err error
|
||||||
isRedactionEvent := event.Type() == gomatrixserverlib.MRoomRedaction && event.StateKey() == nil
|
isRedactionEvent := event.Type() == gomatrixserverlib.MRoomRedaction && event.StateKey() == nil
|
||||||
|
|
@ -919,7 +924,7 @@ func (d *Database) handleRedactions(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
redactionEvent, redactedEvent, validated, err := d.loadRedactionPair(ctx, txn, eventNID, event)
|
redactionEvent, redactedEvent, validated, err := d.loadRedactionPair(ctx, txn, roomNID, eventNID, event)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, "", fmt.Errorf("d.loadRedactionPair: %w", err)
|
return nil, "", fmt.Errorf("d.loadRedactionPair: %w", err)
|
||||||
}
|
}
|
||||||
|
|
@ -985,7 +990,7 @@ func (d *Database) handleRedactions(
|
||||||
|
|
||||||
// loadRedactionPair returns both the redaction event and the redacted event, else nil.
|
// loadRedactionPair returns both the redaction event and the redacted event, else nil.
|
||||||
func (d *Database) loadRedactionPair(
|
func (d *Database) loadRedactionPair(
|
||||||
ctx context.Context, txn *sql.Tx, eventNID types.EventNID, event *gomatrixserverlib.Event,
|
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, eventNID types.EventNID, event *gomatrixserverlib.Event,
|
||||||
) (*types.Event, *types.Event, bool, error) {
|
) (*types.Event, *types.Event, bool, error) {
|
||||||
var redactionEvent, redactedEvent *types.Event
|
var redactionEvent, redactedEvent *types.Event
|
||||||
var info *tables.RedactionInfo
|
var info *tables.RedactionInfo
|
||||||
|
|
@ -1017,9 +1022,9 @@ func (d *Database) loadRedactionPair(
|
||||||
}
|
}
|
||||||
|
|
||||||
if isRedactionEvent {
|
if isRedactionEvent {
|
||||||
redactedEvent = d.loadEvent(ctx, info.RedactsEventID)
|
redactedEvent = d.loadEvent(ctx, roomNID, info.RedactsEventID)
|
||||||
} else {
|
} else {
|
||||||
redactionEvent = d.loadEvent(ctx, info.RedactionEventID)
|
redactionEvent = d.loadEvent(ctx, roomNID, info.RedactionEventID)
|
||||||
}
|
}
|
||||||
|
|
||||||
return redactionEvent, redactedEvent, info.Validated, nil
|
return redactionEvent, redactedEvent, info.Validated, nil
|
||||||
|
|
@ -1035,7 +1040,7 @@ func (d *Database) applyRedactions(events []types.Event) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// loadEvent loads a single event or returns nil on any problems/missing event
|
// loadEvent loads a single event or returns nil on any problems/missing event
|
||||||
func (d *Database) loadEvent(ctx context.Context, eventID string) *types.Event {
|
func (d *Database) loadEvent(ctx context.Context, roomNID types.RoomNID, eventID string) *types.Event {
|
||||||
nids, err := d.EventNIDs(ctx, []string{eventID})
|
nids, err := d.EventNIDs(ctx, []string{eventID})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil
|
return nil
|
||||||
|
|
@ -1043,7 +1048,7 @@ func (d *Database) loadEvent(ctx context.Context, eventID string) *types.Event {
|
||||||
if len(nids) == 0 {
|
if len(nids) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
evs, err := d.Events(ctx, []types.EventNID{nids[eventID]})
|
evs, err := d.Events(ctx, roomNID, []types.EventNID{nids[eventID].EventNID})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
@ -1470,13 +1475,19 @@ func (d *Database) PurgeRoom(ctx context.Context, roomID string) error {
|
||||||
func (d *Database) UpgradeRoom(ctx context.Context, oldRoomID, newRoomID, eventSender string) error {
|
func (d *Database) UpgradeRoom(ctx context.Context, oldRoomID, newRoomID, eventSender string) error {
|
||||||
|
|
||||||
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||||
// un-publish old room
|
published, err := d.PublishedTable.SelectPublishedFromRoomID(ctx, txn, oldRoomID)
|
||||||
if err := d.PublishedTable.UpsertRoomPublished(ctx, txn, oldRoomID, "", "", false); err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to unpublish room: %w", err)
|
return fmt.Errorf("failed to get published room: %w", err)
|
||||||
}
|
}
|
||||||
// publish new room
|
if published {
|
||||||
if err := d.PublishedTable.UpsertRoomPublished(ctx, txn, newRoomID, "", "", true); err != nil {
|
// un-publish old room
|
||||||
return fmt.Errorf("failed to publish room: %w", err)
|
if err = d.PublishedTable.UpsertRoomPublished(ctx, txn, oldRoomID, "", "", false); err != nil {
|
||||||
|
return fmt.Errorf("failed to unpublish room: %w", err)
|
||||||
|
}
|
||||||
|
// publish new room
|
||||||
|
if err = d.PublishedTable.UpsertRoomPublished(ctx, txn, newRoomID, "", "", true); err != nil {
|
||||||
|
return fmt.Errorf("failed to publish room: %w", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Migrate any existing room aliases
|
// Migrate any existing room aliases
|
||||||
|
|
|
||||||
|
|
@ -110,10 +110,10 @@ const bulkSelectEventIDSQL = "" +
|
||||||
"SELECT event_nid, event_id FROM roomserver_events WHERE event_nid IN ($1)"
|
"SELECT event_nid, event_id FROM roomserver_events WHERE event_nid IN ($1)"
|
||||||
|
|
||||||
const bulkSelectEventNIDSQL = "" +
|
const bulkSelectEventNIDSQL = "" +
|
||||||
"SELECT event_id, event_nid FROM roomserver_events WHERE event_id IN ($1)"
|
"SELECT event_id, event_nid, room_nid FROM roomserver_events WHERE event_id IN ($1)"
|
||||||
|
|
||||||
const bulkSelectUnsentEventNIDSQL = "" +
|
const bulkSelectUnsentEventNIDSQL = "" +
|
||||||
"SELECT event_id, event_nid FROM roomserver_events WHERE sent_to_output = 0 AND event_id IN ($1)"
|
"SELECT event_id, event_nid, room_nid FROM roomserver_events WHERE sent_to_output = 0 AND event_id IN ($1)"
|
||||||
|
|
||||||
const selectMaxEventDepthSQL = "" +
|
const selectMaxEventDepthSQL = "" +
|
||||||
"SELECT COALESCE(MAX(depth) + 1, 0) FROM roomserver_events WHERE event_nid IN ($1)"
|
"SELECT COALESCE(MAX(depth) + 1, 0) FROM roomserver_events WHERE event_nid IN ($1)"
|
||||||
|
|
@ -572,20 +572,20 @@ func (s *eventStatements) BulkSelectEventID(ctx context.Context, txn *sql.Tx, ev
|
||||||
|
|
||||||
// BulkSelectEventNIDs returns a map from string event ID to numeric event ID.
|
// BulkSelectEventNIDs returns a map from string event ID to numeric event ID.
|
||||||
// If an event ID is not in the database then it is omitted from the map.
|
// If an event ID is not in the database then it is omitted from the map.
|
||||||
func (s *eventStatements) BulkSelectEventNID(ctx context.Context, txn *sql.Tx, eventIDs []string) (map[string]types.EventNID, error) {
|
func (s *eventStatements) BulkSelectEventNID(ctx context.Context, txn *sql.Tx, eventIDs []string) (map[string]types.EventMetadata, error) {
|
||||||
return s.bulkSelectEventNID(ctx, txn, eventIDs, false)
|
return s.bulkSelectEventNID(ctx, txn, eventIDs, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
// BulkSelectEventNIDs returns a map from string event ID to numeric event ID
|
// BulkSelectEventNIDs returns a map from string event ID to numeric event ID
|
||||||
// only for events that haven't already been sent to the roomserver output.
|
// only for events that haven't already been sent to the roomserver output.
|
||||||
// If an event ID is not in the database then it is omitted from the map.
|
// If an event ID is not in the database then it is omitted from the map.
|
||||||
func (s *eventStatements) BulkSelectUnsentEventNID(ctx context.Context, txn *sql.Tx, eventIDs []string) (map[string]types.EventNID, error) {
|
func (s *eventStatements) BulkSelectUnsentEventNID(ctx context.Context, txn *sql.Tx, eventIDs []string) (map[string]types.EventMetadata, error) {
|
||||||
return s.bulkSelectEventNID(ctx, txn, eventIDs, true)
|
return s.bulkSelectEventNID(ctx, txn, eventIDs, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
// bulkSelectEventNIDs returns a map from string event ID to numeric event ID.
|
// bulkSelectEventNIDs returns a map from string event ID to numeric event ID.
|
||||||
// If an event ID is not in the database then it is omitted from the map.
|
// If an event ID is not in the database then it is omitted from the map.
|
||||||
func (s *eventStatements) bulkSelectEventNID(ctx context.Context, txn *sql.Tx, eventIDs []string, onlyUnsent bool) (map[string]types.EventNID, error) {
|
func (s *eventStatements) bulkSelectEventNID(ctx context.Context, txn *sql.Tx, eventIDs []string, onlyUnsent bool) (map[string]types.EventMetadata, error) {
|
||||||
///////////////
|
///////////////
|
||||||
iEventIDs := make([]interface{}, len(eventIDs))
|
iEventIDs := make([]interface{}, len(eventIDs))
|
||||||
for k, v := range eventIDs {
|
for k, v := range eventIDs {
|
||||||
|
|
@ -609,14 +609,18 @@ func (s *eventStatements) bulkSelectEventNID(ctx context.Context, txn *sql.Tx, e
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
defer internal.CloseAndLogIfError(ctx, rows, "bulkSelectEventNID: rows.close() failed")
|
defer internal.CloseAndLogIfError(ctx, rows, "bulkSelectEventNID: rows.close() failed")
|
||||||
results := make(map[string]types.EventNID, len(eventIDs))
|
results := make(map[string]types.EventMetadata, len(eventIDs))
|
||||||
var eventID string
|
var eventID string
|
||||||
var eventNID int64
|
var eventNID int64
|
||||||
|
var roomNID int64
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
if err = rows.Scan(&eventID, &eventNID); err != nil {
|
if err = rows.Scan(&eventID, &eventNID, &roomNID); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
results[eventID] = types.EventNID(eventNID)
|
results[eventID] = types.EventMetadata{
|
||||||
|
EventNID: types.EventNID(eventNID),
|
||||||
|
RoomNID: types.RoomNID(roomNID),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return results, nil
|
return results, nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -63,8 +63,8 @@ type Events interface {
|
||||||
BulkSelectEventID(ctx context.Context, txn *sql.Tx, eventNIDs []types.EventNID) (map[types.EventNID]string, error)
|
BulkSelectEventID(ctx context.Context, txn *sql.Tx, eventNIDs []types.EventNID) (map[types.EventNID]string, error)
|
||||||
// BulkSelectEventNIDs returns a map from string event ID to numeric event ID.
|
// BulkSelectEventNIDs returns a map from string event ID to numeric event ID.
|
||||||
// If an event ID is not in the database then it is omitted from the map.
|
// If an event ID is not in the database then it is omitted from the map.
|
||||||
BulkSelectEventNID(ctx context.Context, txn *sql.Tx, eventIDs []string) (map[string]types.EventNID, error)
|
BulkSelectEventNID(ctx context.Context, txn *sql.Tx, eventIDs []string) (map[string]types.EventMetadata, error)
|
||||||
BulkSelectUnsentEventNID(ctx context.Context, txn *sql.Tx, eventIDs []string) (map[string]types.EventNID, error)
|
BulkSelectUnsentEventNID(ctx context.Context, txn *sql.Tx, eventIDs []string) (map[string]types.EventMetadata, error)
|
||||||
SelectMaxEventDepth(ctx context.Context, txn *sql.Tx, eventNIDs []types.EventNID) (int64, error)
|
SelectMaxEventDepth(ctx context.Context, txn *sql.Tx, eventNIDs []types.EventNID) (int64, error)
|
||||||
SelectRoomNIDsForEventNIDs(ctx context.Context, txn *sql.Tx, eventNIDs []types.EventNID) (roomNIDs map[types.EventNID]types.RoomNID, err error)
|
SelectRoomNIDsForEventNIDs(ctx context.Context, txn *sql.Tx, eventNIDs []types.EventNID) (roomNIDs map[types.EventNID]types.RoomNID, err error)
|
||||||
SelectEventRejected(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, eventID string) (rejected bool, err error)
|
SelectEventRejected(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, eventID string) (rejected bool, err error)
|
||||||
|
|
|
||||||
|
|
@ -38,6 +38,11 @@ type EventNID int64
|
||||||
// RoomNID is a numeric ID for a room.
|
// RoomNID is a numeric ID for a room.
|
||||||
type RoomNID int64
|
type RoomNID int64
|
||||||
|
|
||||||
|
type EventMetadata struct {
|
||||||
|
EventNID EventNID
|
||||||
|
RoomNID RoomNID
|
||||||
|
}
|
||||||
|
|
||||||
// StateSnapshotNID is a numeric ID for the state at an event.
|
// StateSnapshotNID is a numeric ID for the state at an event.
|
||||||
type StateSnapshotNID int64
|
type StateSnapshotNID int64
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue