mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-04 20:53:09 -06:00
Refactor populatePresence some more
This commit is contained in:
parent
649bd2713e
commit
de55581092
|
|
@ -24,6 +24,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/syncapi/notifier"
|
"github.com/matrix-org/dendrite/syncapi/notifier"
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
type PresenceStreamProvider struct {
|
type PresenceStreamProvider struct {
|
||||||
|
|
@ -47,15 +48,14 @@ func (p *PresenceStreamProvider) CompleteSync(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
req *types.SyncRequest,
|
req *types.SyncRequest,
|
||||||
) types.StreamPosition {
|
) types.StreamPosition {
|
||||||
presences, latest, err := p.DB.RecentPresence(ctx)
|
latest := p.LatestPosition(ctx)
|
||||||
|
presences, _, err := p.DB.RecentPresence(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
req.Log.WithError(err).Error("p.DB.RecentPresence failed")
|
req.Log.WithError(err).Error("p.DB.RecentPresence failed")
|
||||||
return 0
|
|
||||||
}
|
|
||||||
if len(presences) == 0 {
|
|
||||||
return latest
|
return latest
|
||||||
}
|
}
|
||||||
if err := p.populatePresence(ctx, req, presences, true); err != nil {
|
if err = p.populatePresence(ctx, req, presences, true); err != nil {
|
||||||
|
logrus.WithError(err).Errorf("Failed to populate presence")
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
return latest
|
return latest
|
||||||
|
|
@ -74,7 +74,8 @@ func (p *PresenceStreamProvider) IncrementalSync(
|
||||||
if len(presences) == 0 {
|
if len(presences) == 0 {
|
||||||
return to
|
return to
|
||||||
}
|
}
|
||||||
if err := p.populatePresence(ctx, req, presences, false); err != nil {
|
if err = p.populatePresence(ctx, req, presences, false); err != nil {
|
||||||
|
logrus.WithError(err).Errorf("Failed to populate presence")
|
||||||
return from
|
return from
|
||||||
}
|
}
|
||||||
return to
|
return to
|
||||||
|
|
@ -86,40 +87,31 @@ func (p *PresenceStreamProvider) populatePresence(
|
||||||
presences map[string]*types.PresenceInternal,
|
presences map[string]*types.PresenceInternal,
|
||||||
ignoreCache bool,
|
ignoreCache bool,
|
||||||
) error {
|
) error {
|
||||||
// add newly joined rooms user presences
|
for _, room := range req.Response.Rooms.Join {
|
||||||
if newlyJoined := joinedRooms(req.Response, req.Device.UserID); len(newlyJoined) > 0 {
|
for _, stateEvent := range append(room.State.Events, room.Timeline.Events...) {
|
||||||
for _, roomID := range newlyJoined {
|
switch {
|
||||||
room, ok := req.Response.Rooms.Join[roomID]
|
case stateEvent.Type != gomatrixserverlib.MRoomMember:
|
||||||
if !ok {
|
continue
|
||||||
|
case stateEvent.StateKey == nil:
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
for _, stateEvent := range room.State.Events {
|
var memberContent gomatrixserverlib.MemberContent
|
||||||
switch {
|
err := json.Unmarshal(stateEvent.Content, &memberContent)
|
||||||
case stateEvent.Type != gomatrixserverlib.MRoomMember:
|
if err != nil {
|
||||||
fallthrough
|
continue
|
||||||
case stateEvent.StateKey == nil:
|
}
|
||||||
fallthrough
|
if memberContent.Membership != gomatrixserverlib.Join {
|
||||||
case *stateEvent.StateKey == "":
|
continue
|
||||||
continue
|
}
|
||||||
}
|
userID := *stateEvent.StateKey
|
||||||
userID := *stateEvent.StateKey
|
presences[userID], err = p.DB.GetPresence(ctx, userID)
|
||||||
if _, ok := presences[userID]; ok {
|
if err != nil && err != sql.ErrNoRows {
|
||||||
continue
|
return err
|
||||||
}
|
|
||||||
var err error
|
|
||||||
presences[userID], err = p.DB.GetPresence(ctx, userID)
|
|
||||||
if err != nil {
|
|
||||||
if err == sql.ErrNoRows {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := range presences {
|
for _, presence := range presences {
|
||||||
presence := presences[i]
|
|
||||||
// Ignore users we don't share a room with
|
// Ignore users we don't share a room with
|
||||||
if req.Device.UserID != presence.UserID && !p.notifier.IsSharedUser(req.Device.UserID, presence.UserID) {
|
if req.Device.UserID != presence.UserID && !p.notifier.IsSharedUser(req.Device.UserID, presence.UserID) {
|
||||||
continue
|
continue
|
||||||
|
|
@ -162,32 +154,3 @@ func (p *PresenceStreamProvider) populatePresence(
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func joinedRooms(res *types.Response, userID string) []string {
|
|
||||||
var roomIDs []string
|
|
||||||
for roomID, join := range res.Rooms.Join {
|
|
||||||
// we would expect to see our join event somewhere if we newly joined the room.
|
|
||||||
// Normal events get put in the join section so it's not enough to know the room ID is present in 'join'.
|
|
||||||
newlyJoined := membershipEventPresent(join.State.Events, userID)
|
|
||||||
if newlyJoined {
|
|
||||||
roomIDs = append(roomIDs, roomID)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
newlyJoined = membershipEventPresent(join.Timeline.Events, userID)
|
|
||||||
if newlyJoined {
|
|
||||||
roomIDs = append(roomIDs, roomID)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return roomIDs
|
|
||||||
}
|
|
||||||
|
|
||||||
func membershipEventPresent(events []gomatrixserverlib.ClientEvent, userID string) bool {
|
|
||||||
for _, ev := range events {
|
|
||||||
// it's enough to know that we have our member event here, don't need to check membership content
|
|
||||||
// as it's implied by being in the respective section of the sync response.
|
|
||||||
if ev.Type == gomatrixserverlib.MRoomMember && ev.StateKey != nil && *ev.StateKey == userID {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue