Split senderID & userID fields on PDU

This commit is contained in:
Devon Hudson 2023-06-01 17:52:52 -06:00
parent ea6b368ad4
commit 56448c1c74
No known key found for this signature in database
GPG key ID: CD06B18E77F6A628
29 changed files with 103 additions and 60 deletions

View file

@ -233,10 +233,17 @@ func (s *appserviceState) backoffAndPause(err error) error {
//
// TODO: This should be cached, see https://github.com/matrix-org/dendrite/issues/1682
func (s *OutputRoomEventConsumer) appserviceIsInterestedInEvent(ctx context.Context, event *types.HeaderedEvent, appservice *config.ApplicationService) bool {
userID, err := event.UserID()
if err != nil {
log.WithFields(log.Fields{
"appservice": appservice.ID,
"room_id": event.RoomID(),
}).WithError(err).Errorf("invalid userID")
}
switch {
case appservice.URL == "":
return false
case appservice.IsInterestedInUserID(event.Sender()):
case appservice.IsInterestedInUserID(userID.String()):
return true
case appservice.IsInterestedInRoomID(event.RoomID()):
return true

View file

@ -76,7 +76,14 @@ func SendRedaction(
// "Users may redact their own events, and any user with a power level greater than or equal
// to the redact power level of the room may redact events there"
// https://matrix.org/docs/spec/client_server/r0.6.1#put-matrix-client-r0-rooms-roomid-redact-eventid-txnid
allowedToRedact := ev.Sender() == device.UserID
userID, err := ev.UserID()
if err != nil {
return util.JSONResponse{
Code: 400,
JSON: spec.BadJSON("invalid userID"),
}
}
allowedToRedact := userID.String() == device.UserID
if !allowedToRedact {
plEvent := roomserverAPI.GetStateEvent(req.Context(), rsAPI, roomID, gomatrixserverlib.StateKeyTuple{
EventType: spec.MRoomPowerLevels,
@ -119,7 +126,7 @@ func SendRedaction(
Type: spec.MRoomRedaction,
Redacts: eventID,
}
err := proto.SetContent(r)
err = proto.SetContent(r)
if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("proto.SetContent failed")
return util.JSONResponse{

View file

@ -509,7 +509,7 @@ func (r *FederationInternalAPI) SendInvite(
event gomatrixserverlib.PDU,
strippedState []gomatrixserverlib.InviteStrippedState,
) (gomatrixserverlib.PDU, error) {
_, origin, err := r.cfg.Matrix.SplitLocalID('@', event.Sender())
originUser, err := event.UserID()
if err != nil {
return nil, err
}
@ -542,7 +542,7 @@ func (r *FederationInternalAPI) SendInvite(
return nil, fmt.Errorf("gomatrixserverlib.NewInviteV2Request: %w", err)
}
inviteRes, err := r.federation.SendInviteV2(ctx, origin, destination, inviteReq)
inviteRes, err := r.federation.SendInviteV2(ctx, originUser.Domain(), destination, inviteReq)
if err != nil {
return nil, fmt.Errorf("r.federation.SendInviteV2: failed to send invite: %w", err)
}

View file

@ -213,7 +213,7 @@ func SendLeave(
JSON: spec.BadJSON("No state key was provided in the leave event."),
}
}
if !event.StateKeyEquals(event.Sender()) {
if !event.StateKeyEquals(event.SenderID()) {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: spec.BadJSON("Event state key must match the event sender."),
@ -224,12 +224,14 @@ func SendLeave(
// the request. By this point we've already asserted that the sender
// and the state key are equal so we don't need to check both.
var serverName spec.ServerName
if _, serverName, err = gomatrixserverlib.SplitID('@', event.Sender()); err != nil {
userID, err := event.UserID()
if err != nil {
return util.JSONResponse{
Code: http.StatusForbidden,
Code: http.StatusBadRequest,
JSON: spec.Forbidden("The sender of the join is invalid"),
}
} else if serverName != request.Origin() {
}
if userID.Domain() != request.Origin() {
return util.JSONResponse{
Code: http.StatusForbidden,
JSON: spec.Forbidden("The sender does not match the server that originated the request"),

2
go.mod
View file

@ -22,7 +22,7 @@ require (
github.com/matrix-org/dugong v0.0.0-20210921133753-66e6b1c67e2e
github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91
github.com/matrix-org/gomatrix v0.0.0-20220926102614-ceba4d9f7530
github.com/matrix-org/gomatrixserverlib v0.0.0-20230531155817-0e3adf17bee6
github.com/matrix-org/gomatrixserverlib v0.0.0-20230601235030-c4138953b254
github.com/matrix-org/pinecone v0.11.1-0.20230210171230-8c3b24f2649a
github.com/matrix-org/util v0.0.0-20221111132719-399730281e66
github.com/mattn/go-sqlite3 v1.14.16

4
go.sum
View file

@ -323,8 +323,8 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91 h1:s7fexw
github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91/go.mod h1:e+cg2q7C7yE5QnAXgzo512tgFh1RbQLC0+jozuegKgo=
github.com/matrix-org/gomatrix v0.0.0-20220926102614-ceba4d9f7530 h1:kHKxCOLcHH8r4Fzarl4+Y3K5hjothkVW5z7T1dUM11U=
github.com/matrix-org/gomatrix v0.0.0-20220926102614-ceba4d9f7530/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s=
github.com/matrix-org/gomatrixserverlib v0.0.0-20230531155817-0e3adf17bee6 h1:Kh1TNvJDhWN5CdgtICNUC4G0wV2km51LGr46Dvl153A=
github.com/matrix-org/gomatrixserverlib v0.0.0-20230531155817-0e3adf17bee6/go.mod h1:H9V9N3Uqn1bBJqYJNGK1noqtgJTaCEhtTdcH/mp50uU=
github.com/matrix-org/gomatrixserverlib v0.0.0-20230601235030-c4138953b254 h1:javi7tINJfP86mdYeVg7IzDmeRLeaLA0kRzIbEZlK/I=
github.com/matrix-org/gomatrixserverlib v0.0.0-20230601235030-c4138953b254/go.mod h1:H9V9N3Uqn1bBJqYJNGK1noqtgJTaCEhtTdcH/mp50uU=
github.com/matrix-org/pinecone v0.11.1-0.20230210171230-8c3b24f2649a h1:awrPDf9LEFySxTLKYBMCiObelNx/cBuv/wzllvCCH3A=
github.com/matrix-org/pinecone v0.11.1-0.20230210171230-8c3b24f2649a/go.mod h1:HchJX9oKMXaT2xYFs0Ha/6Zs06mxLU8k6F1ODnrGkeQ=
github.com/matrix-org/util v0.0.0-20221111132719-399730281e66 h1:6z4KxomXSIGWqhHcfzExgkH3Z3UkIXry4ibJS4Aqz2Y=

View file

@ -113,7 +113,7 @@ func ruleMatches(rule *Rule, kind Kind, event gomatrixserverlib.PDU, ec Evaluati
return rule.RuleID == event.RoomID(), nil
case SenderKind:
return rule.RuleID == event.Sender(), nil
return rule.RuleID == event.SenderID(), nil
default:
return false, nil
@ -143,7 +143,7 @@ func conditionMatches(cond *Condition, event gomatrixserverlib.PDU, ec Evaluatio
return cmp(n), nil
case SenderNotificationPermissionCondition:
return ec.HasPowerLevel(event.Sender(), cond.Key)
return ec.HasPowerLevel(event.SenderID(), cond.Key)
default:
return false, nil

View file

@ -173,8 +173,8 @@ func (r *RoomserverInternalAPI) RemoveRoomAlias(
}
sender := request.UserID
if request.UserID != ev.Sender() {
sender = ev.Sender()
if sender != ev.SenderID() {
sender = ev.SenderID()
}
_, senderDomain, err := r.Cfg.Global.SplitLocalID('@', sender)

View file

@ -128,9 +128,9 @@ func (r *Inputer) processRoomEvent(
if roomInfo == nil && !isCreateEvent {
return fmt.Errorf("room %s does not exist for event %s", event.RoomID(), event.EventID())
}
_, senderDomain, err := gomatrixserverlib.SplitID('@', event.Sender())
sender, err := event.UserID()
if err != nil {
return fmt.Errorf("event has invalid sender %q", input.Event.Sender())
return fmt.Errorf("event has invalid sender %q", event.SenderID())
}
// If we already know about this outlier and it hasn't been rejected
@ -193,9 +193,9 @@ func (r *Inputer) processRoomEvent(
serverRes.ServerNames = append(serverRes.ServerNames, input.Origin)
delete(servers, input.Origin)
}
if senderDomain != input.Origin && senderDomain != r.Cfg.Matrix.ServerName {
serverRes.ServerNames = append(serverRes.ServerNames, senderDomain)
delete(servers, senderDomain)
if sender.Domain() != input.Origin && sender.Domain() != r.Cfg.Matrix.ServerName {
serverRes.ServerNames = append(serverRes.ServerNames, sender.Domain())
delete(servers, sender.Domain())
}
for server := range servers {
serverRes.ServerNames = append(serverRes.ServerNames, server)
@ -451,7 +451,7 @@ func (r *Inputer) processRoomEvent(
}
// Handle remote room upgrades, e.g. remove published room
if event.Type() == "m.room.tombstone" && event.StateKeyEquals("") && !r.Cfg.Matrix.IsLocalServerName(senderDomain) {
if event.Type() == "m.room.tombstone" && event.StateKeyEquals("") && !r.Cfg.Matrix.IsLocalServerName(sender.Domain()) {
if err = r.handleRemoteRoomUpgrade(ctx, event); err != nil {
return fmt.Errorf("failed to handle remote room upgrade: %w", err)
}
@ -493,7 +493,7 @@ func (r *Inputer) processRoomEvent(
func (r *Inputer) handleRemoteRoomUpgrade(ctx context.Context, event gomatrixserverlib.PDU) error {
oldRoomID := event.RoomID()
newRoomID := gjson.GetBytes(event.Content(), "replacement_room").Str
return r.DB.UpgradeRoom(ctx, oldRoomID, newRoomID, event.Sender())
return r.DB.UpgradeRoom(ctx, oldRoomID, newRoomID, event.SenderID())
}
// processStateBefore works out what the state is before the event and

View file

@ -484,9 +484,11 @@ FindSuccessor:
// Store the server names in a temporary map to avoid duplicates.
serverSet := make(map[spec.ServerName]bool)
for _, event := range memberEvents {
if _, senderDomain, err := gomatrixserverlib.SplitID('@', event.Sender()); err == nil {
serverSet[senderDomain] = true
sender, err := event.UserID()
if err != nil {
continue
}
serverSet[sender.Domain()] = true
}
var servers []spec.ServerName
for server := range serverSet {

View file

@ -125,7 +125,7 @@ func (r *Inviter) PerformInvite(
) error {
event := req.Event
sender, err := spec.NewUserID(event.Sender(), true)
sender, err := event.UserID()
if err != nil {
return spec.InvalidParam("The user ID is invalid")
}

View file

@ -60,7 +60,7 @@ func (r *RoomEventProducer) ProduceRoomEvents(roomID string, updates []api.Outpu
"adds_state": len(update.NewRoomEvent.AddsStateEventIDs),
"removes_state": len(update.NewRoomEvent.RemovesStateEventIDs),
"send_as_server": update.NewRoomEvent.SendAsServer,
"sender": update.NewRoomEvent.Event.Sender(),
"sender": update.NewRoomEvent.Event.SenderID(),
})
if update.NewRoomEvent.Event.StateKey() != nil {
logger = logger.WithField("state_key", *update.NewRoomEvent.Event.StateKey())

View file

@ -101,7 +101,7 @@ func (u *MembershipUpdater) Update(newMembership tables.MembershipState, event *
var inserted bool // Did the query result in a membership change?
var retired []string // Did we retire any updates in the process?
return inserted, retired, u.d.Writer.Do(u.d.DB, u.txn, func(txn *sql.Tx) error {
senderUserNID, err := u.d.assignStateKeyNID(u.ctx, u.txn, event.Sender())
senderUserNID, err := u.d.assignStateKeyNID(u.ctx, u.txn, event.SenderID())
if err != nil {
return fmt.Errorf("u.d.AssignStateKeyNID: %w", err)
}

View file

@ -988,8 +988,14 @@ func (d *EventDatabase) MaybeRedactEvent(
return nil
}
_, sender1, _ := gomatrixserverlib.SplitID('@', redactedEvent.Sender())
_, sender2, _ := gomatrixserverlib.SplitID('@', redactionEvent.Sender())
sender1, err := redactedEvent.UserID()
if err != nil {
return err
}
sender2, err := redactionEvent.UserID()
if err != nil {
return err
}
var powerlevels *gomatrixserverlib.PowerLevelContent
powerlevels, err = plResolver.Resolve(ctx, redactionEvent.EventID())
if err != nil {
@ -997,9 +1003,9 @@ func (d *EventDatabase) MaybeRedactEvent(
}
switch {
case powerlevels.UserLevel(redactionEvent.Sender()) >= powerlevels.Redact:
case powerlevels.UserLevel(redactionEvent.SenderID()) >= powerlevels.Redact:
// 1. The power level of the redaction events sender is greater than or equal to the redact level.
case sender1 == sender2:
case sender1.Domain() == sender2.Domain():
// 2. The domain of the redaction events sender matches that of the original events sender.
default:
ignoreRedaction = true

View file

@ -730,7 +730,7 @@ func stripped(ev gomatrixserverlib.PDU) *fclient.MSC2946StrippedEvent {
Type: ev.Type(),
StateKey: *ev.StateKey(),
Content: ev.Content(),
Sender: ev.Sender(),
Sender: ev.SenderID(),
OriginServerTS: ev.OriginServerTS(),
}
}

View file

@ -523,7 +523,7 @@ func (s *OutputRoomEventConsumer) updateStateEvent(event *rstypes.HeaderedEvent)
prev := types.PrevEventRef{
PrevContent: prevEvent.Content(),
ReplacesState: prevEvent.EventID(),
PrevSender: prevEvent.Sender(),
PrevSender: prevEvent.SenderID(),
}
event.PDU, err = event.SetUnsigned(prev)

View file

@ -144,7 +144,7 @@ func GetMemberships(
JSON: spec.InternalServerError{},
}
}
res.Joined[ev.Sender()] = joinedMember(content)
res.Joined[ev.SenderID()] = joinedMember(content)
}
return util.JSONResponse{
Code: http.StatusOK,

View file

@ -204,11 +204,11 @@ func Search(req *http.Request, device *api.Device, syncDB storage.Database, fts
profileInfos := make(map[string]ProfileInfoResponse)
for _, ev := range append(eventsBefore, eventsAfter...) {
profile, ok := knownUsersProfiles[event.Sender()]
profile, ok := knownUsersProfiles[event.SenderID()]
if !ok {
stateEvent, err := snapshot.GetStateEvent(ctx, ev.RoomID(), spec.MRoomMember, ev.Sender())
stateEvent, err := snapshot.GetStateEvent(ctx, ev.RoomID(), spec.MRoomMember, ev.SenderID())
if err != nil {
logrus.WithError(err).WithField("user_id", event.Sender()).Warn("failed to query userprofile")
logrus.WithError(err).WithField("user_id", event.SenderID()).Warn("failed to query userprofile")
continue
}
if stateEvent == nil {
@ -218,9 +218,9 @@ func Search(req *http.Request, device *api.Device, syncDB storage.Database, fts
AvatarURL: gjson.GetBytes(stateEvent.Content(), "avatar_url").Str,
DisplayName: gjson.GetBytes(stateEvent.Content(), "displayname").Str,
}
knownUsersProfiles[event.Sender()] = profile
knownUsersProfiles[event.SenderID()] = profile
}
profileInfos[ev.Sender()] = profile
profileInfos[ev.SenderID()] = profile
}
results = append(results, Result{

View file

@ -343,7 +343,7 @@ func (s *currentRoomStateStatements) UpsertRoomState(
event.RoomID(),
event.EventID(),
event.Type(),
event.Sender(),
event.SenderID(),
containsURL,
*event.StateKey(),
headeredJSON,

View file

@ -407,7 +407,7 @@ func (s *outputRoomEventsStatements) InsertEvent(
event.EventID(),
headeredJSON,
event.Type(),
event.Sender(),
event.SenderID(),
containsURL,
pq.StringArray(addState),
pq.StringArray(removeState),

View file

@ -195,7 +195,14 @@ func (d *Database) StreamEventsToEvents(device *userapi.Device, in []types.Strea
for i := 0; i < len(in); i++ {
out[i] = in[i].HeaderedEvent
if device != nil && in[i].TransactionID != nil {
if device.UserID == in[i].Sender() && device.SessionID == in[i].TransactionID.SessionID {
userID, err := in[i].UserID()
if err != nil {
logrus.WithFields(logrus.Fields{
"event_id": out[i].EventID(),
}).WithError(err).Warnf("Event has invalid userID")
continue
}
if device.UserID == userID.String() && device.SessionID == in[i].TransactionID.SessionID {
err := out[i].SetUnsignedField(
"transaction_id", in[i].TransactionID.TransactionID,
)

View file

@ -342,7 +342,7 @@ func (s *currentRoomStateStatements) UpsertRoomState(
event.RoomID(),
event.EventID(),
event.Type(),
event.Sender(),
event.SenderID(),
containsURL,
*event.StateKey(),
headeredJSON,

View file

@ -348,7 +348,7 @@ func (s *outputRoomEventsStatements) InsertEvent(
event.EventID(),
headeredJSON,
event.Type(),
event.Sender(),
event.SenderID(),
containsURL,
string(addStateJSON),
string(removeStateJSON),

View file

@ -63,7 +63,11 @@ func (p *InviteStreamProvider) IncrementalSync(
for roomID, inviteEvent := range invites {
// skip ignored user events
if _, ok := req.IgnoredUsers.List[inviteEvent.Sender()]; ok {
userID, err := inviteEvent.UserID()
if err != nil {
continue
}
if _, ok := req.IgnoredUsers.List[userID.String()]; ok {
continue
}
ir := types.NewInviteResponse(inviteEvent)

View file

@ -425,7 +425,7 @@ func applyHistoryVisibilityFilter(
for _, ev := range recentEvents {
if ev.StateKey() != nil {
stateTypes = append(stateTypes, ev.Type())
senders = append(senders, ev.Sender())
senders = append(senders, ev.SenderID())
}
}
@ -577,8 +577,8 @@ func (p *PDUStreamProvider) lazyLoadMembers(
// Add all users the client doesn't know about yet to a list
for _, event := range timelineEvents {
// Membership is not yet cached, add it to the list
if _, ok := p.lazyLoadCache.IsLazyLoadedUserCached(device, roomID, event.Sender()); !ok {
timelineUsers[event.Sender()] = struct{}{}
if _, ok := p.lazyLoadCache.IsLazyLoadedUserCached(device, roomID, event.SenderID()); !ok {
timelineUsers[event.SenderID()] = struct{}{}
}
}
// Preallocate with the same amount, even if it will end up with fewer values

View file

@ -57,9 +57,13 @@ func ToClientEvents(serverEvs []gomatrixserverlib.PDU, format ClientEventFormat)
// ToClientEvent converts a single server event to a client event.
func ToClientEvent(se gomatrixserverlib.PDU, format ClientEventFormat) ClientEvent {
sender, err := se.UserID()
if err != nil {
panic("uh oh! userID does not exist...")
}
ce := ClientEvent{
Content: spec.RawJSON(se.Content()),
Sender: se.Sender(),
Sender: sender.String(),
Type: se.Type(),
StateKey: se.StateKey(),
Unsigned: spec.RawJSON(se.Unsigned()),

View file

@ -62,8 +62,8 @@ func TestToClientEvent(t *testing.T) { // nolint: gocyclo
if !bytes.Equal(ce.Unsigned, ev.Unsigned()) {
t.Errorf("ClientEvent.Unsigned: wanted %s, got %s", string(ev.Unsigned()), string(ce.Unsigned))
}
if ce.Sender != ev.Sender() {
t.Errorf("ClientEvent.Sender: wanted %s, got %s", ev.Sender(), ce.Sender)
if ce.Sender != ev.SenderID() {
t.Errorf("ClientEvent.Sender: wanted %s, got %s", ev.SenderID(), ce.Sender)
}
j, err := json.Marshal(ce)
if err != nil {

View file

@ -108,7 +108,7 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Ms
}
if s.cfg.Matrix.ReportStats.Enabled {
go s.storeMessageStats(ctx, event.Type(), event.Sender(), event.RoomID())
go s.storeMessageStats(ctx, event.Type(), event.SenderID(), event.RoomID())
}
log.WithFields(log.Fields{
@ -615,7 +615,11 @@ func (s *OutputRoomEventConsumer) notifyLocal(ctx context.Context, event *rstype
// evaluatePushRules fetches and evaluates the push rules of a local
// user. Returns actions (including dont_notify).
func (s *OutputRoomEventConsumer) evaluatePushRules(ctx context.Context, event *rstypes.HeaderedEvent, mem *localMembership, roomSize int) ([]*pushrules.Action, error) {
if event.Sender() == mem.UserID {
userID, err := event.UserID()
if err != nil {
return nil, err
}
if userID.String() == mem.UserID {
// SPEC: Homeservers MUST NOT notify the Push Gateway for
// events that the user has sent themselves.
return nil, nil
@ -632,7 +636,7 @@ func (s *OutputRoomEventConsumer) evaluatePushRules(ctx context.Context, event *
if err != nil {
return nil, err
}
sender := event.Sender()
sender := event.SenderID()
if _, ok := ignored.List[sender]; ok {
return nil, fmt.Errorf("user %s is ignored", sender)
}
@ -767,7 +771,7 @@ func (s *OutputRoomEventConsumer) notifyHTTP(ctx context.Context, event *rstypes
ID: event.EventID(),
RoomID: event.RoomID(),
RoomName: roomName,
Sender: event.Sender(),
Sender: event.SenderID(),
Type: event.Type(),
},
}

View file

@ -61,13 +61,13 @@ func Test_evaluatePushRules(t *testing.T) {
}{
{
name: "m.receipt doesn't notify",
eventContent: `{"type":"m.receipt"}`,
eventContent: `{"type":"m.receipt","sender":"@test:remotehost"}`,
wantAction: pushrules.UnknownAction,
wantActions: nil,
},
{
name: "m.reaction doesn't notify",
eventContent: `{"type":"m.reaction"}`,
eventContent: `{"type":"m.reaction","sender":"@test:remotehost"}`,
wantAction: pushrules.DontNotifyAction,
wantActions: []*pushrules.Action{
{
@ -77,7 +77,7 @@ func Test_evaluatePushRules(t *testing.T) {
},
{
name: "m.room.message notifies",
eventContent: `{"type":"m.room.message"}`,
eventContent: `{"type":"m.room.message","sender":"@test:remotehost"}`,
wantNotify: true,
wantAction: pushrules.NotifyAction,
wantActions: []*pushrules.Action{
@ -86,7 +86,7 @@ func Test_evaluatePushRules(t *testing.T) {
},
{
name: "m.room.message highlights",
eventContent: `{"type":"m.room.message", "content": {"body": "test"} }`,
eventContent: `{"type":"m.room.message", "content": {"body": "test"},"sender":"@test:remotehost" }`,
wantNotify: true,
wantAction: pushrules.NotifyAction,
wantActions: []*pushrules.Action{