Try using inputer instead

This commit is contained in:
Neil Alexander 2022-04-28 13:09:11 +01:00
parent a55765c8b6
commit 36cba338cf
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
2 changed files with 79 additions and 12 deletions

View file

@ -166,8 +166,10 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.FederationInternalA
URSAPI: r,
}
r.Admin = &perform.Admin{
DB: r.DB,
Leaver: r.Leaver,
DB: r.DB,
Cfg: r.Cfg,
Inputer: r.Inputer,
Queryer: r.Queryer,
}
if err := r.Inputer.Start(); err != nil {

View file

@ -16,15 +16,24 @@ package perform
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/internal/input"
"github.com/matrix-org/dendrite/roomserver/internal/query"
"github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/gomatrixserverlib"
)
type Admin struct {
DB storage.Database
Leaver *Leaver
DB storage.Database
Cfg *config.RoomServer
Queryer *query.Queryer
Inputer *input.Inputer
}
// PerformEvacuateRoom will remove all local users from the given room.
@ -67,22 +76,78 @@ func (r *Admin) PerformAdminEvacuateRoom(
return
}
inputEvents := make([]api.InputRoomEvent, 0, len(memberEvents))
latestReq := &api.QueryLatestEventsAndStateRequest{
RoomID: req.RoomID,
StateToFetch: []gomatrixserverlib.StateKeyTuple{
{
EventType: "m.room.create",
StateKey: "",
},
},
}
latestRes := &api.QueryLatestEventsAndStateResponse{}
if err = r.Queryer.QueryLatestEventsAndState(ctx, latestReq, latestRes); err != nil {
res.Error = &api.PerformError{
Code: api.PerformErrorBadRequest,
Msg: fmt.Sprintf("r.Queryer.QueryLatestEventsAndState: %s", err),
}
return
}
for _, memberEvent := range memberEvents {
if memberEvent.StateKey() == nil {
continue
}
userID := *memberEvent.StateKey()
leaveReq := &api.PerformLeaveRequest{
RoomID: req.RoomID,
UserID: userID,
}
leaveRes := &api.PerformLeaveResponse{}
if _, err = r.Leaver.PerformLeave(ctx, leaveReq, leaveRes); err != nil {
var memberContent gomatrixserverlib.MemberContent
if err = json.Unmarshal(memberEvent.Content(), &memberContent); err != nil {
res.Error = &api.PerformError{
Code: api.PerformErrorBadRequest,
Msg: fmt.Sprintf("r.Leaver.PerformLeave (%s): %s", userID, err),
Msg: fmt.Sprintf("json.Unmarshal: %s", err),
}
return
}
memberContent.Membership = gomatrixserverlib.Leave
stateKey := *memberEvent.StateKey()
fledglingEvent := &gomatrixserverlib.EventBuilder{
RoomID: req.RoomID,
Type: gomatrixserverlib.MRoomMember,
StateKey: &stateKey,
Sender: stateKey,
}
if fledglingEvent.Content, err = json.Marshal(memberContent); err != nil {
res.Error = &api.PerformError{
Code: api.PerformErrorBadRequest,
Msg: fmt.Sprintf("json.Marshal: %s", err),
}
return
}
event, err := eventutil.BuildEvent(ctx, fledglingEvent, nil, time.Now(), nil, latestRes)
if err != nil {
res.Error = &api.PerformError{
Code: api.PerformErrorBadRequest,
Msg: fmt.Sprintf("eventutil.BuildEvent: %s", err),
}
return
}
inputEvents = append(inputEvents, api.InputRoomEvent{
Kind: api.KindNew,
Event: event,
Origin: r.Cfg.Matrix.ServerName,
SendAsServer: string(r.Cfg.Matrix.ServerName),
})
}
inputReq := &api.InputRoomEventsRequest{
InputRoomEvents: inputEvents,
Asynchronous: true,
}
inputRes := &api.InputRoomEventsResponse{}
r.Inputer.InputRoomEvents(ctx, inputReq, inputRes)
}