Merge branch 'main' into neilalexander/singletxn

This commit is contained in:
Neil Alexander 2022-02-11 17:23:36 +00:00 committed by GitHub
commit 1f0d11f0e7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 25 additions and 37 deletions

View file

@ -63,7 +63,7 @@ jobs:
# Run Complement
- run: |
set -o pipefail &&
go test -v -json -tags dendrite_blacklist ./tests/... 2>&1 | gotestfmt
go test -v -p 1 -json -tags dendrite_blacklist ./tests/... 2>&1 | gotestfmt
shell: bash
name: Run Complement Tests
env:

View file

@ -19,6 +19,10 @@ import (
"encoding/json"
"fmt"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/federationapi/queue"
"github.com/matrix-org/dendrite/federationapi/storage"
"github.com/matrix-org/dendrite/federationapi/types"
@ -26,9 +30,6 @@ import (
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
)
// OutputRoomEventConsumer consumes events that originated in the room server.
@ -97,21 +98,14 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msg *nats.Msg)
}
if err := s.processMessage(*output.NewRoomEvent); err != nil {
switch err.(type) {
case *queue.ErrorFederationDisabled:
log.WithField("error", output.Type).Info(
err.Error(),
)
default:
// panic rather than continue with an inconsistent database
log.WithFields(log.Fields{
"event_id": ev.EventID(),
"event": string(ev.JSON()),
"add": output.NewRoomEvent.AddsStateEventIDs,
"del": output.NewRoomEvent.RemovesStateEventIDs,
log.ErrorKey: err,
}).Panicf("roomserver output log: write room event failure")
}
// panic rather than continue with an inconsistent database
log.WithFields(log.Fields{
"event_id": ev.EventID(),
"event": string(ev.JSON()),
"add": output.NewRoomEvent.AddsStateEventIDs,
"del": output.NewRoomEvent.RemovesStateEventIDs,
log.ErrorKey: err,
}).Panicf("roomserver output log: write room event failure")
}
case api.OutputTypeNewInboundPeek:

View file

@ -22,15 +22,16 @@ import (
"sync"
"time"
"github.com/matrix-org/gomatrixserverlib"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"github.com/tidwall/gjson"
"github.com/matrix-org/dendrite/federationapi/statistics"
"github.com/matrix-org/dendrite/federationapi/storage"
"github.com/matrix-org/dendrite/federationapi/storage/shared"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"github.com/tidwall/gjson"
)
// OutgoingQueues is a collection of queues for sending transactions to other
@ -182,23 +183,14 @@ func (oqs *OutgoingQueues) clearQueue(oq *destinationQueue) {
destinationQueueTotal.Dec()
}
type ErrorFederationDisabled struct {
Message string
}
func (e *ErrorFederationDisabled) Error() string {
return e.Message
}
// SendEvent sends an event to the destinations
func (oqs *OutgoingQueues) SendEvent(
ev *gomatrixserverlib.HeaderedEvent, origin gomatrixserverlib.ServerName,
destinations []gomatrixserverlib.ServerName,
) error {
if oqs.disabled {
return &ErrorFederationDisabled{
Message: "Federation disabled",
}
log.Trace("Federation is disabled, not sending event")
return nil
}
if origin != oqs.origin {
// TODO: Support virtual hosting; gh issue #577.
@ -262,9 +254,8 @@ func (oqs *OutgoingQueues) SendEDU(
destinations []gomatrixserverlib.ServerName,
) error {
if oqs.disabled {
return &ErrorFederationDisabled{
Message: "Federation disabled",
}
log.Trace("Federation is disabled, not sending EDU")
return nil
}
if origin != oqs.origin {
// TODO: Support virtual hosting; gh issue #577.

View file

@ -258,6 +258,9 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res
util.GetLogger(ctx).WithError(err).Debugf("Transaction: Failed to parse event JSON of event %s", string(pdu))
continue
}
if event.Type() == gomatrixserverlib.MRoomCreate && event.StateKeyEquals("") {
continue
}
if api.IsServerBannedFromRoom(ctx, t.rsAPI, event.RoomID(), t.Origin) {
results[event.EventID()] = gomatrixserverlib.PDUResult{
Error: "Forbidden by server ACLs",