mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-07 15:03:09 -06:00
Merge branch 'main' of github.com:matrix-org/dendrite into s7evink/v11
This commit is contained in:
commit
83f6fc6578
|
|
@ -783,4 +783,8 @@ Invited user can reject invite for empty room
|
|||
Invited user can reject local invite after originator leaves
|
||||
Guest users can join guest_access rooms
|
||||
Forgotten room messages cannot be paginated
|
||||
Local device key changes get to remote servers with correct prev_id
|
||||
Local device key changes get to remote servers with correct prev_id
|
||||
HS provides query metadata
|
||||
HS can provide query metadata on a single protocol
|
||||
Invites over federation are correctly pushed
|
||||
Invites over federation are correctly pushed with name
|
||||
|
|
@ -92,18 +92,36 @@ func (s *OutputRoomEventConsumer) Start() error {
|
|||
func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
|
||||
msg := msgs[0] // Guaranteed to exist if onMessage is called
|
||||
// Only handle events we care about
|
||||
if rsapi.OutputType(msg.Header.Get(jetstream.RoomEventType)) != rsapi.OutputTypeNewRoomEvent {
|
||||
return true
|
||||
}
|
||||
var output rsapi.OutputEvent
|
||||
if err := json.Unmarshal(msg.Data, &output); err != nil {
|
||||
// If the message was invalid, log it and move on to the next message in the stream
|
||||
log.WithError(err).Errorf("roomserver output log: message parse failure")
|
||||
return true
|
||||
}
|
||||
event := output.NewRoomEvent.Event
|
||||
if event == nil {
|
||||
log.Errorf("userapi consumer: expected event")
|
||||
|
||||
var event *rstypes.HeaderedEvent
|
||||
var isNewRoomEvent bool
|
||||
switch rsapi.OutputType(msg.Header.Get(jetstream.RoomEventType)) {
|
||||
case rsapi.OutputTypeNewRoomEvent:
|
||||
isNewRoomEvent = true
|
||||
fallthrough
|
||||
case rsapi.OutputTypeNewInviteEvent:
|
||||
var output rsapi.OutputEvent
|
||||
if err := json.Unmarshal(msg.Data, &output); err != nil {
|
||||
// If the message was invalid, log it and move on to the next message in the stream
|
||||
log.WithError(err).Errorf("roomserver output log: message parse failure")
|
||||
return true
|
||||
}
|
||||
if isNewRoomEvent {
|
||||
event = output.NewRoomEvent.Event
|
||||
} else {
|
||||
event = output.NewInviteEvent.Event
|
||||
}
|
||||
|
||||
if event == nil {
|
||||
log.Errorf("userapi consumer: expected event")
|
||||
return true
|
||||
}
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"event_id": event.EventID(),
|
||||
"event_type": event.Type(),
|
||||
}).Tracef("Received message from roomserver: %#v", output)
|
||||
default:
|
||||
return true
|
||||
}
|
||||
|
||||
|
|
@ -111,11 +129,6 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Ms
|
|||
go s.storeMessageStats(ctx, event.Type(), string(event.SenderID()), event.RoomID().String())
|
||||
}
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"event_id": event.EventID(),
|
||||
"event_type": event.Type(),
|
||||
}).Tracef("Received message from roomserver: %#v", output)
|
||||
|
||||
metadata, err := msg.Metadata()
|
||||
if err != nil {
|
||||
return true
|
||||
|
|
@ -448,6 +461,19 @@ func (s *OutputRoomEventConsumer) roomName(ctx context.Context, event *rstypes.H
|
|||
}
|
||||
}
|
||||
|
||||
// Special case for invites, as we don't store any "current state" for these events,
|
||||
// we need to make sure that, if present, the m.room.name is sent as well.
|
||||
if event.Type() == spec.MRoomMember &&
|
||||
gjson.GetBytes(event.Content(), "membership").Str == "invite" {
|
||||
invState := gjson.GetBytes(event.JSON(), "unsigned.invite_room_state")
|
||||
for _, ev := range invState.Array() {
|
||||
if ev.Get("type").Str == spec.MRoomName {
|
||||
name := ev.Get("content.name").Str
|
||||
return name, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
req := &rsapi.QueryCurrentStateRequest{
|
||||
RoomID: event.RoomID().String(),
|
||||
StateTuples: []gomatrixserverlib.StateKeyTuple{roomNameTuple, canonicalAliasTuple},
|
||||
|
|
|
|||
Loading…
Reference in a new issue