mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-26 00:03:09 -06:00
Add logic for exploring threads and tracking children; missing storage functions
This commit is contained in:
parent
d222289821
commit
e66c82bc9c
|
|
@ -106,10 +106,18 @@ func Enable(
|
||||||
he := headeredEvent.(*gomatrixserverlib.HeaderedEvent)
|
he := headeredEvent.(*gomatrixserverlib.HeaderedEvent)
|
||||||
hookErr := db.StoreRelation(context.Background(), he)
|
hookErr := db.StoreRelation(context.Background(), he)
|
||||||
if hookErr != nil {
|
if hookErr != nil {
|
||||||
util.GetLogger(context.Background()).WithError(hookErr).Error(
|
util.GetLogger(context.Background()).WithError(hookErr).WithField("event_id", he.EventID()).Error(
|
||||||
"failed to StoreRelation",
|
"failed to StoreRelation",
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
// we need to update child metadata here as well as after doing remote /event_relationships requests
|
||||||
|
// so we catch child metadata originating from /send transactions
|
||||||
|
hookErr = db.UpdateChildMetadata(context.Background(), he)
|
||||||
|
if hookErr != nil {
|
||||||
|
util.GetLogger(context.Background()).WithError(err).WithField("event_id", he.EventID()).Warn(
|
||||||
|
"failed to update child metadata for event",
|
||||||
|
)
|
||||||
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
base.PublicClientAPIMux.Handle("/unstable/event_relationships",
|
base.PublicClientAPIMux.Handle("/unstable/event_relationships",
|
||||||
|
|
@ -516,7 +524,19 @@ func (rc *reqCtx) lookForEvent(eventID string, exploreThread bool) *gomatrixserv
|
||||||
if event == nil {
|
if event == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
} else if exploreThread && rc.hasUnexploredChildren(eventID) {
|
||||||
|
// we have the local event but we may need to do a remote hit anyway if we are exploring the thread and have unknown children.
|
||||||
|
// If we don't do this then we risk never fetching the children.
|
||||||
|
queryRes := rc.remoteEventRelationships(eventID)
|
||||||
|
if queryRes != nil {
|
||||||
|
rc.injectResponseToRoomserver(queryRes)
|
||||||
|
err := rc.db.MarkChildrenExplored(context.Background(), eventID)
|
||||||
|
if err != nil {
|
||||||
|
util.GetLogger(rc.ctx).WithError(err).Warnf("failed to mark children of %s as explored", eventID)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if rc.authorisedToSeeEvent(event) {
|
if rc.authorisedToSeeEvent(event) {
|
||||||
return event
|
return event
|
||||||
}
|
}
|
||||||
|
|
@ -608,12 +628,35 @@ func (rc *reqCtx) injectResponseToRoomserver(res *gomatrixserverlib.MSC2836Event
|
||||||
if err != nil {
|
if err != nil {
|
||||||
util.GetLogger(rc.ctx).WithError(err).Error("failed to inject MSC2836EventRelationshipsResponse into the roomserver")
|
util.GetLogger(rc.ctx).WithError(err).Error("failed to inject MSC2836EventRelationshipsResponse into the roomserver")
|
||||||
}
|
}
|
||||||
|
// update the child count / hash columns for these nodes. We need to do this here because not all events will make it
|
||||||
|
// through to the KindNewEventPersisted hook because the roomserver will ignore duplicates. Duplicates have meaning though
|
||||||
|
// as the `unsigned` field may differ (if the number of children changes).
|
||||||
|
for _, ev := range ires {
|
||||||
|
err = rc.db.UpdateChildMetadata(context.Background(), ev.Event)
|
||||||
|
if err != nil {
|
||||||
|
util.GetLogger(rc.ctx).WithError(err).WithField("event_id", ev.Event.EventID()).Warn("failed to update child metadata for event")
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rc *reqCtx) addChildMetadata(ev *gomatrixserverlib.HeaderedEvent) {
|
func (rc *reqCtx) addChildMetadata(ev *gomatrixserverlib.HeaderedEvent) {
|
||||||
children, err := rc.db.ChildrenForParent(rc.ctx, ev.EventID(), constRelType, false)
|
count, hash := rc.getChildMetadata(ev.EventID())
|
||||||
|
err := ev.SetUnsignedField("children_hash", gomatrixserverlib.Base64Bytes(hash))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
util.GetLogger(rc.ctx).WithError(err).Warn("Failed to get ChildrenForParent for adding child metadata")
|
util.GetLogger(rc.ctx).WithError(err).Warn("Failed to set children_hash")
|
||||||
|
}
|
||||||
|
err = ev.SetUnsignedField("children", map[string]int{
|
||||||
|
constRelType: count,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
util.GetLogger(rc.ctx).WithError(err).Warn("Failed to set children count")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rc *reqCtx) getChildMetadata(eventID string) (count int, hash []byte) {
|
||||||
|
children, err := rc.db.ChildrenForParent(rc.ctx, eventID, constRelType, false)
|
||||||
|
if err != nil {
|
||||||
|
util.GetLogger(rc.ctx).WithError(err).Warn("Failed to get ChildrenForParent for getting child metadata")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if len(children) == 0 {
|
if len(children) == 0 {
|
||||||
|
|
@ -630,16 +673,45 @@ func (rc *reqCtx) addChildMetadata(ev *gomatrixserverlib.HeaderedEvent) {
|
||||||
}
|
}
|
||||||
hashValBytes := sha256.Sum256([]byte(eventIDs.String()))
|
hashValBytes := sha256.Sum256([]byte(eventIDs.String()))
|
||||||
|
|
||||||
err = ev.SetUnsignedField("children_hash", gomatrixserverlib.Base64Bytes(hashValBytes[:]))
|
count = len(children)
|
||||||
if err != nil {
|
hash = hashValBytes[:]
|
||||||
util.GetLogger(rc.ctx).WithError(err).Warn("Failed to set children_hash")
|
return
|
||||||
}
|
}
|
||||||
err = ev.SetUnsignedField("children", map[string]int{
|
|
||||||
constRelType: len(children),
|
// hasUnexploredChildren returns true if this event has unexplored children.
|
||||||
})
|
// "An event has unexplored children if the `unsigned` child count on the parent does not match
|
||||||
if err != nil {
|
// how many children the server believes the parent to have. In addition, if the counts match but
|
||||||
util.GetLogger(rc.ctx).WithError(err).Warn("Failed to set children count")
|
// the hashes do not match, then the event is unexplored."
|
||||||
|
func (rc *reqCtx) hasUnexploredChildren(eventID string) bool {
|
||||||
|
if rc.isFederatedRequest {
|
||||||
|
return false // we only explore children for clients, not servers.
|
||||||
}
|
}
|
||||||
|
// extract largest child count from event
|
||||||
|
eventCount, eventHash, explored, err := rc.db.ChildMetadata(rc.ctx, eventID)
|
||||||
|
if err != nil {
|
||||||
|
util.GetLogger(rc.ctx).WithError(err).WithField("event_id", eventID).Warn(
|
||||||
|
"failed to get ChildMetadata from db",
|
||||||
|
)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
// if there are no recorded children then we know we have >= children.
|
||||||
|
// if the event has already been explored (read: we hit /event_relationships successfully)
|
||||||
|
// then don't do it again. We'll only re-do this if we get an even bigger children count,
|
||||||
|
// see Database.UpdateChildMetadata
|
||||||
|
if eventCount == 0 || explored {
|
||||||
|
return false // short-circuit
|
||||||
|
}
|
||||||
|
|
||||||
|
// calculate child count for event
|
||||||
|
calcCount, calcHash := rc.getChildMetadata(eventID)
|
||||||
|
|
||||||
|
if eventCount < calcCount {
|
||||||
|
return false // we have more children
|
||||||
|
} else if eventCount > calcCount {
|
||||||
|
return true // the event has more children than we know about
|
||||||
|
}
|
||||||
|
// we have the same count, so a mismatched hash means some children are different
|
||||||
|
return !bytes.Equal(eventHash, calcHash)
|
||||||
}
|
}
|
||||||
|
|
||||||
type walkInfo struct {
|
type walkInfo struct {
|
||||||
|
|
|
||||||
|
|
@ -8,13 +8,13 @@ import (
|
||||||
"github.com/matrix-org/dendrite/internal/config"
|
"github.com/matrix-org/dendrite/internal/config"
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
"github.com/matrix-org/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
type eventInfo struct {
|
type eventInfo struct {
|
||||||
EventID string
|
EventID string
|
||||||
OriginServerTS gomatrixserverlib.Timestamp
|
OriginServerTS gomatrixserverlib.Timestamp
|
||||||
RoomID string
|
RoomID string
|
||||||
Servers []string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type Database interface {
|
type Database interface {
|
||||||
|
|
@ -29,6 +29,17 @@ type Database interface {
|
||||||
// there is no parent for this child event, with no error. The parent eventInfo can be missing the
|
// there is no parent for this child event, with no error. The parent eventInfo can be missing the
|
||||||
// timestamp if the event is not known to the server.
|
// timestamp if the event is not known to the server.
|
||||||
ParentForChild(ctx context.Context, eventID, relType string) (*eventInfo, error)
|
ParentForChild(ctx context.Context, eventID, relType string) (*eventInfo, error)
|
||||||
|
// UpdateChildMetadata persists the children_count and children_hash from this event if and only if
|
||||||
|
// the count is greater than what was previously there. If the count is updated, the event will be
|
||||||
|
// updated to be unexplored.
|
||||||
|
UpdateChildMetadata(ctx context.Context, ev *gomatrixserverlib.HeaderedEvent) error
|
||||||
|
// ChildMetadata returns the children_count and children_hash for the event ID in question.
|
||||||
|
// Also returns the `explored` flag, which is set to true when MarkChildrenExplored is called and is set
|
||||||
|
// back to `false` when a larger count is inserted via UpdateChildMetadata.
|
||||||
|
// Returns nil error if the event ID does not exist.
|
||||||
|
ChildMetadata(ctx context.Context, eventID string) (count int, hash []byte, explored bool, err error)
|
||||||
|
// MarkChildrenExplored sets the 'explored' flag on this event to `true`.
|
||||||
|
MarkChildrenExplored(ctx context.Context, eventID string) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type DB struct {
|
type DB struct {
|
||||||
|
|
@ -250,3 +261,19 @@ func roomIDAndServers(ev *gomatrixserverlib.HeaderedEvent) (roomID string, serve
|
||||||
}
|
}
|
||||||
return body.RoomID, body.Servers
|
return body.RoomID, body.Servers
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func extractChildMetadata(ev *gomatrixserverlib.HeaderedEvent) (count int, hash []byte) {
|
||||||
|
unsigned := struct {
|
||||||
|
Counts map[string]int `json:"children"`
|
||||||
|
Hash gomatrixserverlib.Base64Bytes `json:"children_hash"`
|
||||||
|
}{}
|
||||||
|
if err := json.Unmarshal(ev.Unsigned(), &unsigned); err != nil {
|
||||||
|
util.GetLogger(context.Background()).WithError(err).Error("failed to read unsigned field of event")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for _, c := range unsigned.Counts {
|
||||||
|
count += c
|
||||||
|
}
|
||||||
|
hash = unsigned.Hash
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue