mirror of
https://github.com/matrix-org/dendrite.git
synced 2024-11-27 00:31:55 -06:00
18231f25b4
* WIP Event rejection * Still send back errors for rejected events Instead, discard them at the federationapi /send layer rather than re-implementing checks at the clientapi/PerformJoin layer. * Implement rejected events Critically, rejected events CAN cause state resolution to happen as it can merge forks in the DAG. This is fine, _provided_ we do not add the rejected event when performing state resolution, which is what this PR does. It also fixes the error handling when NotAllowed happens, as we were checking too early and needlessly handling NotAllowed in more than one place. * Update test to match reality * Modify InputRoomEvents to no longer return an error Errors do not serialise across HTTP boundaries in polylith mode, so instead set fields on the InputRoomEventsResponse. Add `Err()` function to make the API shape basically the same. * Remove redundant returns; linting * Update blacklist
166 lines
4.9 KiB
Go
166 lines
4.9 KiB
Go
// Copyright 2017 Vector Creations Ltd
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
// Package input contains the code processes new room events
|
|
package input
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/Shopify/sarama"
|
|
"github.com/matrix-org/dendrite/roomserver/acls"
|
|
"github.com/matrix-org/dendrite/roomserver/api"
|
|
"github.com/matrix-org/dendrite/roomserver/storage"
|
|
"github.com/matrix-org/gomatrixserverlib"
|
|
log "github.com/sirupsen/logrus"
|
|
"go.uber.org/atomic"
|
|
)
|
|
|
|
type Inputer struct {
|
|
DB storage.Database
|
|
Producer sarama.SyncProducer
|
|
ServerName gomatrixserverlib.ServerName
|
|
ACLs *acls.ServerACLs
|
|
OutputRoomEventTopic string
|
|
|
|
workers sync.Map // room ID -> *inputWorker
|
|
}
|
|
|
|
type inputTask struct {
|
|
ctx context.Context
|
|
event *api.InputRoomEvent
|
|
wg *sync.WaitGroup
|
|
err error // written back by worker, only safe to read when all tasks are done
|
|
}
|
|
|
|
type inputWorker struct {
|
|
r *Inputer
|
|
running atomic.Bool
|
|
input chan *inputTask
|
|
}
|
|
|
|
func (w *inputWorker) start() {
|
|
if !w.running.CAS(false, true) {
|
|
return
|
|
}
|
|
defer w.running.Store(false)
|
|
for {
|
|
select {
|
|
case task := <-w.input:
|
|
_, task.err = w.r.processRoomEvent(task.ctx, task.event)
|
|
task.wg.Done()
|
|
case <-time.After(time.Second * 5):
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// WriteOutputEvents implements OutputRoomEventWriter
|
|
func (r *Inputer) WriteOutputEvents(roomID string, updates []api.OutputEvent) error {
|
|
messages := make([]*sarama.ProducerMessage, len(updates))
|
|
for i := range updates {
|
|
value, err := json.Marshal(updates[i])
|
|
if err != nil {
|
|
return err
|
|
}
|
|
logger := log.WithFields(log.Fields{
|
|
"room_id": roomID,
|
|
"type": updates[i].Type,
|
|
})
|
|
if updates[i].NewRoomEvent != nil {
|
|
logger = logger.WithFields(log.Fields{
|
|
"event_type": updates[i].NewRoomEvent.Event.Type(),
|
|
"event_id": updates[i].NewRoomEvent.Event.EventID(),
|
|
"adds_state": len(updates[i].NewRoomEvent.AddsStateEventIDs),
|
|
"removes_state": len(updates[i].NewRoomEvent.RemovesStateEventIDs),
|
|
"send_as_server": updates[i].NewRoomEvent.SendAsServer,
|
|
"sender": updates[i].NewRoomEvent.Event.Sender(),
|
|
})
|
|
if updates[i].NewRoomEvent.Event.Type() == "m.room.server_acl" && updates[i].NewRoomEvent.Event.StateKeyEquals("") {
|
|
ev := updates[i].NewRoomEvent.Event.Unwrap()
|
|
defer r.ACLs.OnServerACLUpdate(&ev)
|
|
}
|
|
}
|
|
logger.Infof("Producing to topic '%s'", r.OutputRoomEventTopic)
|
|
messages[i] = &sarama.ProducerMessage{
|
|
Topic: r.OutputRoomEventTopic,
|
|
Key: sarama.StringEncoder(roomID),
|
|
Value: sarama.ByteEncoder(value),
|
|
}
|
|
}
|
|
return r.Producer.SendMessages(messages)
|
|
}
|
|
|
|
// InputRoomEvents implements api.RoomserverInternalAPI
|
|
func (r *Inputer) InputRoomEvents(
|
|
ctx context.Context,
|
|
request *api.InputRoomEventsRequest,
|
|
response *api.InputRoomEventsResponse,
|
|
) {
|
|
// Create a wait group. Each task that we dispatch will call Done on
|
|
// this wait group so that we know when all of our events have been
|
|
// processed.
|
|
wg := &sync.WaitGroup{}
|
|
wg.Add(len(request.InputRoomEvents))
|
|
tasks := make([]*inputTask, len(request.InputRoomEvents))
|
|
|
|
for i, e := range request.InputRoomEvents {
|
|
// Work out if we are running per-room workers or if we're just doing
|
|
// it on a global basis (e.g. SQLite).
|
|
roomID := "global"
|
|
if r.DB.SupportsConcurrentRoomInputs() {
|
|
roomID = e.Event.RoomID()
|
|
}
|
|
|
|
// Look up the worker, or create it if it doesn't exist. This channel
|
|
// is buffered to reduce the chance that we'll be blocked by another
|
|
// room - the channel will be quite small as it's just pointer types.
|
|
w, _ := r.workers.LoadOrStore(roomID, &inputWorker{
|
|
r: r,
|
|
input: make(chan *inputTask, 10),
|
|
})
|
|
worker := w.(*inputWorker)
|
|
|
|
// Create a task. This contains the input event and a reference to
|
|
// the wait group, so that the worker can notify us when this specific
|
|
// task has been finished.
|
|
tasks[i] = &inputTask{
|
|
ctx: ctx,
|
|
event: &request.InputRoomEvents[i],
|
|
wg: wg,
|
|
}
|
|
|
|
// Send the task to the worker.
|
|
go worker.start()
|
|
worker.input <- tasks[i]
|
|
}
|
|
|
|
// Wait for all of the workers to return results about our tasks.
|
|
wg.Wait()
|
|
|
|
// If any of the tasks returned an error, we should probably report
|
|
// that back to the caller.
|
|
for _, task := range tasks {
|
|
if task.err != nil {
|
|
response.ErrMsg = task.err.Error()
|
|
_, rejected := task.err.(*gomatrixserverlib.NotAllowed)
|
|
response.NotAllowed = rejected
|
|
return
|
|
}
|
|
}
|
|
}
|