Workaround problem with senderID/userID

This commit is contained in:
Till Faelligen 2023-06-15 13:19:51 +02:00
parent fd999253ef
commit b655017b07
No known key found for this signature in database
GPG key ID: ACCDC9606D472758
10 changed files with 66 additions and 29 deletions

View file

@ -18,6 +18,7 @@ import (
"unsafe"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/gomatrixserverlib/spec"
)
// HeaderedEvent is an Event which serialises to the headered form, which includes
@ -25,6 +26,10 @@ import (
type HeaderedEvent struct {
gomatrixserverlib.PDU
Visibility gomatrixserverlib.HistoryVisibility
// TODO: Remove this. This is a temporary workaround to store the userID in the syncAPI.
// It really should be the userKey instead.
UserID spec.UserID
StateKeyResolved *string
}
func (h *HeaderedEvent) CacheCost() int {

View file

@ -256,16 +256,19 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
}
}
pduPos, err := s.db.WriteEvent(
ctx,
ev,
addsStateEvents,
msg.AddsStateEventIDs,
msg.RemovesStateEventIDs,
msg.TransactionID,
false,
msg.HistoryVisibility,
)
validRoomID, err := spec.NewRoomID(ev.RoomID())
if err != nil {
return err
}
userID, err := s.rsAPI.QueryUserIDForSender(ctx, *validRoomID, ev.SenderID())
if err != nil {
return err
}
ev.UserID = *userID
pduPos, err := s.db.WriteEvent(ctx, ev, addsStateEvents, msg.AddsStateEventIDs, msg.RemovesStateEventIDs, msg.TransactionID, false, msg.HistoryVisibility)
if err != nil {
// panic rather than continue with an inconsistent database
log.WithFields(log.Fields{
@ -315,16 +318,19 @@ func (s *OutputRoomEventConsumer) onOldRoomEvent(
// hack but until we have some better strategy for dealing with
// old events in the sync API, this should at least prevent us
// from confusing clients into thinking they've joined/left rooms.
pduPos, err := s.db.WriteEvent(
ctx,
ev,
[]*rstypes.HeaderedEvent{},
[]string{}, // adds no state
[]string{}, // removes no state
nil, // no transaction
ev.StateKey() != nil, // exclude from sync?,
msg.HistoryVisibility,
)
validRoomID, err := spec.NewRoomID(ev.RoomID())
if err != nil {
return err
}
userID, err := s.rsAPI.QueryUserIDForSender(ctx, *validRoomID, ev.SenderID())
if err != nil {
return err
}
ev.UserID = *userID
pduPos, err := s.db.WriteEvent(ctx, ev, []*rstypes.HeaderedEvent{}, []string{}, []string{}, nil, ev.StateKey() != nil, msg.HistoryVisibility)
if err != nil {
// panic rather than continue with an inconsistent database
log.WithFields(log.Fields{
@ -537,6 +543,7 @@ func (s *OutputRoomEventConsumer) onPurgeRoom(
}
func (s *OutputRoomEventConsumer) updateStateEvent(event *rstypes.HeaderedEvent) (*rstypes.HeaderedEvent, error) {
event.StateKeyResolved = event.StateKey()
if event.StateKey() == nil {
return event, nil
}
@ -556,6 +563,28 @@ func (s *OutputRoomEventConsumer) updateStateEvent(event *rstypes.HeaderedEvent)
return event, err
}
validRoomID, err := spec.NewRoomID(event.RoomID())
if err != nil {
return event, err
}
if event.StateKey() != nil {
if *event.StateKey() != "" {
sku, err := s.rsAPI.QueryUserIDForSender(s.ctx, *validRoomID, spec.SenderID(stateKey))
if err == nil && sku != nil {
sKey := sku.String()
event.StateKeyResolved = &sKey
}
}
}
userID, err := s.rsAPI.QueryUserIDForSender(s.ctx, *validRoomID, event.SenderID())
if err != nil {
return event, err
}
event.UserID = *userID
if prevEvent == nil || prevEvent.EventID() == event.EventID() {
return event, nil
}

View file

@ -230,7 +230,8 @@ func TestSearch(t *testing.T) {
stateEvents = append(stateEvents, x)
stateEventIDs = append(stateEventIDs, x.EventID())
}
sp, err = db.WriteEvent(processCtx.Context(), x, stateEvents, stateEventIDs, nil, nil, false, gomatrixserverlib.HistoryVisibilityShared)
sp, err = db.WriteEvent(processCtx.Context(), x, spec.UserID{}, stateEvents, stateEventIDs, nil, nil, false, gomatrixserverlib.HistoryVisibilityShared)
assert.NoError(t, err)
if x.Type() != "m.room.message" {
continue

View file

@ -343,9 +343,9 @@ func (s *currentRoomStateStatements) UpsertRoomState(
event.RoomID(),
event.EventID(),
event.Type(),
event.SenderID(),
event.UserID.String(),
containsURL,
*event.StateKey(),
*event.StateKeyResolved,
headeredJSON,
membership,
addedAt,

View file

@ -109,7 +109,7 @@ func (s *membershipsStatements) UpsertMembership(
_, err = sqlutil.TxStmt(txn, s.upsertMembershipStmt).ExecContext(
ctx,
event.RoomID(),
*event.StateKey(),
event.UserID.String(),
membership,
event.EventID(),
streamPos,

View file

@ -407,7 +407,7 @@ func (s *outputRoomEventsStatements) InsertEvent(
event.EventID(),
headeredJSON,
event.Type(),
event.SenderID(),
event.UserID.String(),
containsURL,
pq.StringArray(addState),
pq.StringArray(removeState),

View file

@ -342,9 +342,9 @@ func (s *currentRoomStateStatements) UpsertRoomState(
event.RoomID(),
event.EventID(),
event.Type(),
event.SenderID(),
event.UserID.String(),
containsURL,
*event.StateKey(),
*event.StateKeyResolved,
headeredJSON,
membership,
addedAt,

View file

@ -112,7 +112,7 @@ func (s *membershipsStatements) UpsertMembership(
_, err = sqlutil.TxStmt(txn, s.upsertMembershipStmt).ExecContext(
ctx,
event.RoomID(),
*event.StateKey(),
event.UserID.String(),
membership,
event.EventID(),
streamPos,

View file

@ -348,7 +348,7 @@ func (s *outputRoomEventsStatements) InsertEvent(
event.EventID(),
headeredJSON,
event.Type(),
event.SenderID(),
event.UserID.String(),
containsURL,
string(addStateJSON),
string(removeStateJSON),

View file

@ -503,6 +503,8 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
}
}
logrus.Infof("%#v", syncReq.Response.Rooms.Join)
return util.JSONResponse{
Code: http.StatusOK,
JSON: syncReq.Response,