Merge branch 'main' into neilalexander/purgeroom

This commit is contained in:
Neil Alexander 2022-09-13 11:15:14 +01:00 committed by GitHub
commit 1ca58ec9d1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 76 additions and 65 deletions

View file

@ -21,12 +21,13 @@ import (
"strconv" "strconv"
"time" "time"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
) )
// SyncAPIProducer produces events for the sync API server to consume // SyncAPIProducer produces events for the sync API server to consume
@ -61,7 +62,7 @@ func (p *SyncAPIProducer) SendReceipt(
func (p *SyncAPIProducer) SendToDevice( func (p *SyncAPIProducer) SendToDevice(
ctx context.Context, sender, userID, deviceID, eventType string, ctx context.Context, sender, userID, deviceID, eventType string,
message interface{}, message json.RawMessage,
) error { ) error {
devices := []string{} devices := []string{}
_, domain, err := gomatrixserverlib.SplitID('@', userID) _, domain, err := gomatrixserverlib.SplitID('@', userID)
@ -89,24 +90,19 @@ func (p *SyncAPIProducer) SendToDevice(
devices = append(devices, deviceID) devices = append(devices, deviceID)
} }
js, err := json.Marshal(message)
if err != nil {
return err
}
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"user_id": userID, "user_id": userID,
"num_devices": len(devices), "num_devices": len(devices),
"type": eventType, "type": eventType,
}).Tracef("Producing to topic '%s'", p.TopicSendToDeviceEvent) }).Tracef("Producing to topic '%s'", p.TopicSendToDeviceEvent)
for _, device := range devices { for i, device := range devices {
ote := &types.OutputSendToDeviceEvent{ ote := &types.OutputSendToDeviceEvent{
UserID: userID, UserID: userID,
DeviceID: device, DeviceID: device,
SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{ SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{
Sender: sender, Sender: sender,
Type: eventType, Type: eventType,
Content: js, Content: message,
}, },
} }
@ -115,15 +111,17 @@ func (p *SyncAPIProducer) SendToDevice(
log.WithError(err).Error("sendToDevice failed json.Marshal") log.WithError(err).Error("sendToDevice failed json.Marshal")
return err return err
} }
m := &nats.Msg{ m := nats.NewMsg(p.TopicSendToDeviceEvent)
Subject: p.TopicSendToDeviceEvent, m.Data = eventJSON
Data: eventJSON,
Header: nats.Header{},
}
m.Header.Set("sender", sender) m.Header.Set("sender", sender)
m.Header.Set(jetstream.UserID, userID) m.Header.Set(jetstream.UserID, userID)
if _, err = p.JetStream.PublishMsg(m, nats.Context(ctx)); err != nil { if _, err = p.JetStream.PublishMsg(m, nats.Context(ctx)); err != nil {
log.WithError(err).Error("sendToDevice failed t.Producer.SendMessage") if i < len(devices)-1 {
log.WithError(err).Warn("sendToDevice failed to PublishMsg, trying further devices")
continue
}
log.WithError(err).Error("sendToDevice failed to PublishMsg for all devices")
return err return err
} }
} }

View file

@ -64,7 +64,7 @@ func (p *SyncAPIProducer) SendReceipt(
func (p *SyncAPIProducer) SendToDevice( func (p *SyncAPIProducer) SendToDevice(
ctx context.Context, sender, userID, deviceID, eventType string, ctx context.Context, sender, userID, deviceID, eventType string,
message interface{}, message json.RawMessage,
) error { ) error {
devices := []string{} devices := []string{}
_, domain, err := gomatrixserverlib.SplitID('@', userID) _, domain, err := gomatrixserverlib.SplitID('@', userID)
@ -92,24 +92,19 @@ func (p *SyncAPIProducer) SendToDevice(
devices = append(devices, deviceID) devices = append(devices, deviceID)
} }
js, err := json.Marshal(message)
if err != nil {
return err
}
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"user_id": userID, "user_id": userID,
"num_devices": len(devices), "num_devices": len(devices),
"type": eventType, "type": eventType,
}).Tracef("Producing to topic '%s'", p.TopicSendToDeviceEvent) }).Tracef("Producing to topic '%s'", p.TopicSendToDeviceEvent)
for _, device := range devices { for i, device := range devices {
ote := &types.OutputSendToDeviceEvent{ ote := &types.OutputSendToDeviceEvent{
UserID: userID, UserID: userID,
DeviceID: device, DeviceID: device,
SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{ SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{
Sender: sender, Sender: sender,
Type: eventType, Type: eventType,
Content: js, Content: message,
}, },
} }
@ -118,16 +113,17 @@ func (p *SyncAPIProducer) SendToDevice(
log.WithError(err).Error("sendToDevice failed json.Marshal") log.WithError(err).Error("sendToDevice failed json.Marshal")
return err return err
} }
m := &nats.Msg{ m := nats.NewMsg(p.TopicSendToDeviceEvent)
Subject: p.TopicSendToDeviceEvent, m.Data = eventJSON
Data: eventJSON,
Header: nats.Header{},
}
m.Header.Set("sender", sender) m.Header.Set("sender", sender)
m.Header.Set(jetstream.UserID, userID) m.Header.Set(jetstream.UserID, userID)
if _, err = p.JetStream.PublishMsg(m, nats.Context(ctx)); err != nil { if _, err = p.JetStream.PublishMsg(m, nats.Context(ctx)); err != nil {
log.WithError(err).Error("sendToDevice failed t.Producer.SendMessage") if i < len(devices)-1 {
log.WithError(err).Warn("sendToDevice failed to PublishMsg, trying further devices")
continue
}
log.WithError(err).Error("sendToDevice failed to PublishMsg for all devices")
return err return err
} }
} }

View file

@ -155,5 +155,10 @@ func InsertMigration(ctx context.Context, db *sql.DB, migrationName string) erro
time.Now().Format(time.RFC3339), time.Now().Format(time.RFC3339),
internal.VersionString(), internal.VersionString(),
) )
// If the migration was already executed, we'll get a unique constraint error,
// return nil instead, to avoid unnecessary logging.
if IsUniqueConstraintViolationErr(err) {
return nil
}
return err return err
} }

View file

@ -12,12 +12,27 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
//go:build wasm //go:build !wasm
// +build wasm // +build !wasm
package sqlutil package sqlutil
// IsUniqueConstraintViolationErr no-ops for this architecture import (
"github.com/lib/pq"
"github.com/mattn/go-sqlite3"
)
// IsUniqueConstraintViolationErr returns true if the error is an unique_violation error
func IsUniqueConstraintViolationErr(err error) bool { func IsUniqueConstraintViolationErr(err error) bool {
switch e := err.(type) {
case *pq.Error:
return e.Code == "23505"
case pq.Error:
return e.Code == "23505"
case *sqlite3.Error:
return e.Code == sqlite3.ErrConstraint
case sqlite3.Error:
return e.Code == sqlite3.ErrConstraint
}
return false return false
} }

View file

@ -12,15 +12,20 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
//go:build !wasm //go:build wasm
// +build !wasm // +build wasm
package sqlutil package sqlutil
import "github.com/lib/pq" import "github.com/mattn/go-sqlite3"
// IsUniqueConstraintViolationErr returns true if the error is a postgresql unique_violation error // IsUniqueConstraintViolationErr returns true if the error is an unique_violation error
func IsUniqueConstraintViolationErr(err error) bool { func IsUniqueConstraintViolationErr(err error) bool {
pqErr, ok := err.(*pq.Error) switch e := err.(type) {
return ok && pqErr.Code == "23505" case *sqlite3.Error:
return e.Code == sqlite3.ErrConstraint
case sqlite3.Error:
return e.Code == sqlite3.ErrConstraint
}
return false
} }

View file

@ -18,8 +18,7 @@ import (
"context" "context"
"database/sql" "database/sql"
"errors" "errors"
"fmt"
"github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
@ -81,8 +80,7 @@ func executeMigration(ctx context.Context, db *sql.DB) error {
if err != nil { if err != nil {
if errors.Is(err, sql.ErrNoRows) { // migration was already executed, as the column was removed if errors.Is(err, sql.ErrNoRows) { // migration was already executed, as the column was removed
if err = sqlutil.InsertMigration(ctx, db, migrationName); err != nil { if err = sqlutil.InsertMigration(ctx, db, migrationName); err != nil {
// not a fatal error, log and continue return fmt.Errorf("unable to manually insert migration '%s': %w", migrationName, err)
logrus.WithError(err).Warnf("unable to manually insert migration '%s'", migrationName)
} }
return nil return nil
} }

View file

@ -18,8 +18,7 @@ import (
"context" "context"
"database/sql" "database/sql"
"errors" "errors"
"fmt"
"github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
@ -80,8 +79,7 @@ func executeMigration(ctx context.Context, db *sql.DB) error {
if err != nil { if err != nil {
if errors.Is(err, sql.ErrNoRows) { // migration was already executed, as the column was removed if errors.Is(err, sql.ErrNoRows) { // migration was already executed, as the column was removed
if err = sqlutil.InsertMigration(ctx, db, migrationName); err != nil { if err = sqlutil.InsertMigration(ctx, db, migrationName); err != nil {
// not a fatal error, log and continue return fmt.Errorf("unable to manually insert migration '%s': %w", migrationName, err)
logrus.WithError(err).Warnf("unable to manually insert migration '%s'", migrationName)
} }
return nil return nil
} }

View file

@ -106,7 +106,7 @@ func (r *Queryer) QueryStateAfterEvents(
return err return err
} }
if len(request.PrevEventIDs) > 1 && len(request.StateToFetch) == 0 { if len(request.PrevEventIDs) > 1 {
var authEventIDs []string var authEventIDs []string
for _, e := range stateEvents { for _, e := range stateEvents {
authEventIDs = append(authEventIDs, e.AuthEventIDs()...) authEventIDs = append(authEventIDs, e.AuthEventIDs()...)

View file

@ -21,8 +21,6 @@ import (
"errors" "errors"
"fmt" "fmt"
"github.com/sirupsen/logrus"
// Import the postgres database driver. // Import the postgres database driver.
_ "github.com/lib/pq" _ "github.com/lib/pq"
@ -79,8 +77,7 @@ func executeMigration(ctx context.Context, db *sql.DB) error {
if err != nil { if err != nil {
if errors.Is(err, sql.ErrNoRows) { // migration was already executed, as the column was removed if errors.Is(err, sql.ErrNoRows) { // migration was already executed, as the column was removed
if err = sqlutil.InsertMigration(ctx, db, migrationName); err != nil { if err = sqlutil.InsertMigration(ctx, db, migrationName); err != nil {
// not a fatal error, log and continue return fmt.Errorf("unable to manually insert migration '%s': %w", migrationName, err)
logrus.WithError(err).Warnf("unable to manually insert migration '%s'", migrationName)
} }
return nil return nil
} }

View file

@ -22,7 +22,6 @@ import (
"fmt" "fmt"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
@ -87,8 +86,7 @@ func executeMigration(ctx context.Context, db *sql.DB) error {
if err != nil { if err != nil {
if errors.Is(err, sql.ErrNoRows) { // migration was already executed, as the column was removed if errors.Is(err, sql.ErrNoRows) { // migration was already executed, as the column was removed
if err = sqlutil.InsertMigration(ctx, db, migrationName); err != nil { if err = sqlutil.InsertMigration(ctx, db, migrationName); err != nil {
// not a fatal error, log and continue return fmt.Errorf("unable to manually insert migration '%s': %w", migrationName, err)
logrus.WithError(err).Warnf("unable to manually insert migration '%s'", migrationName)
} }
return nil return nil
} }

View file

@ -19,16 +19,17 @@ import (
"encoding/json" "encoding/json"
"github.com/getsentry/sentry-go" "github.com/getsentry/sentry-go"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
) )
// OutputSendToDeviceEventConsumer consumes events that originated in the EDU server. // OutputSendToDeviceEventConsumer consumes events that originated in the EDU server.
@ -79,16 +80,18 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(ctx context.Context, msgs []
_, domain, err := gomatrixserverlib.SplitID('@', userID) _, domain, err := gomatrixserverlib.SplitID('@', userID)
if err != nil { if err != nil {
sentry.CaptureException(err) sentry.CaptureException(err)
log.WithError(err).Errorf("send-to-device: failed to split user id, dropping message")
return true return true
} }
if domain != s.serverName { if domain != s.serverName {
log.Tracef("ignoring send-to-device event with destination %s", domain)
return true return true
} }
var output types.OutputSendToDeviceEvent var output types.OutputSendToDeviceEvent
if err = json.Unmarshal(msg.Data, &output); err != nil { if err = json.Unmarshal(msg.Data, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream // If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("output log: message parse failure") log.WithError(err).Errorf("send-to-device: message parse failure")
sentry.CaptureException(err) sentry.CaptureException(err)
return true return true
} }
@ -105,7 +108,7 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(ctx context.Context, msgs []
) )
if err != nil { if err != nil {
sentry.CaptureException(err) sentry.CaptureException(err)
log.WithError(err).Errorf("failed to store send-to-device message") log.WithError(err).Errorf("send-to-device: failed to store message")
return false return false
} }

View file

@ -624,9 +624,7 @@ func testSendToDevice(t *testing.T, dbType test.DBType) {
// Send to-device messages of type "m.dendrite.test" with content `{"dummy":"message $counter"}` // Send to-device messages of type "m.dendrite.test" with content `{"dummy":"message $counter"}`
for i := 0; i < tc.sendMessagesCount; i++ { for i := 0; i < tc.sendMessagesCount; i++ {
msgCounter++ msgCounter++
msg := map[string]string{ msg := json.RawMessage(fmt.Sprintf(`{"dummy":"message %d"}`, msgCounter))
"dummy": fmt.Sprintf("message %d", msgCounter),
}
if err := producer.SendToDevice(ctx, user.ID, user.ID, alice.ID, "m.dendrite.test", msg); err != nil { if err := producer.SendToDevice(ctx, user.ID, user.ID, alice.ID, "m.dendrite.test", msg); err != nil {
t.Fatalf("unable to send to device message: %v", err) t.Fatalf("unable to send to device message: %v", err)
} }