Merge branch 'main' into neilalexander/kindoldexcludesync

This commit is contained in:
Neil Alexander 2022-09-20 10:16:07 +01:00 committed by GitHub
commit 6fe6a62843
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
26 changed files with 219 additions and 119 deletions

View file

@ -22,6 +22,7 @@ import (
"math"
"net/http"
"net/url"
"strconv"
"time"
"github.com/matrix-org/gomatrixserverlib"
@ -151,10 +152,17 @@ func (s *OutputRoomEventConsumer) onMessage(
return true
}
txnID := ""
// Try to get the message metadata, if we're able to, use the timestamp as the txnID
metadata, err := msgs[0].Metadata()
if err == nil {
txnID = strconv.Itoa(int(metadata.Timestamp.UnixNano()))
}
// Send event to any relevant application services. If we hit
// an error here, return false, so that we negatively ack.
log.WithField("appservice", state.ID).Debugf("Appservice worker sending %d events(s) from roomserver", len(events))
return s.sendEvents(ctx, state, events) == nil
return s.sendEvents(ctx, state, events, txnID) == nil
}
// sendEvents passes events to the appservice by using the transactions
@ -162,6 +170,7 @@ func (s *OutputRoomEventConsumer) onMessage(
func (s *OutputRoomEventConsumer) sendEvents(
ctx context.Context, state *appserviceState,
events []*gomatrixserverlib.HeaderedEvent,
txnID string,
) error {
// Create the transaction body.
transaction, err := json.Marshal(
@ -173,13 +182,14 @@ func (s *OutputRoomEventConsumer) sendEvents(
return err
}
// TODO: We should probably be more intelligent and pick something not
// in the control of the event. A NATS timestamp header or something maybe.
txnID := events[0].Event.OriginServerTS()
// If txnID is not defined, generate one from the events.
if txnID == "" {
txnID = fmt.Sprintf("%d_%d", events[0].Event.OriginServerTS(), len(transaction))
}
// Send the transaction to the appservice.
// https://matrix.org/docs/spec/application_service/r0.1.2#put-matrix-app-v1-transactions-txnid
address := fmt.Sprintf("%s/transactions/%d?access_token=%s", state.URL, txnID, url.QueryEscape(state.HSToken))
address := fmt.Sprintf("%s/transactions/%s?access_token=%s", state.URL, txnID, url.QueryEscape(state.HSToken))
req, err := http.NewRequestWithContext(ctx, "PUT", address, bytes.NewBuffer(transaction))
if err != nil {
return err

View file

@ -255,7 +255,7 @@ func (m *DendriteMonolith) Start() {
m.logger.SetOutput(BindLogger{})
logrus.SetOutput(BindLogger{})
m.PineconeRouter = pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk, false)
m.PineconeRouter = pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk)
m.PineconeQUIC = pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), m.PineconeRouter, []string{"matrix"})
m.PineconeMulticast = pineconeMulticast.NewMulticast(logrus.WithField("pinecone", "multicast"), m.PineconeRouter)
m.PineconeManager = pineconeConnections.NewConnectionManager(m.PineconeRouter, nil)

View file

@ -21,12 +21,13 @@ import (
"strconv"
"time"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
)
// SyncAPIProducer produces events for the sync API server to consume
@ -61,7 +62,7 @@ func (p *SyncAPIProducer) SendReceipt(
func (p *SyncAPIProducer) SendToDevice(
ctx context.Context, sender, userID, deviceID, eventType string,
message interface{},
message json.RawMessage,
) error {
devices := []string{}
_, domain, err := gomatrixserverlib.SplitID('@', userID)
@ -89,24 +90,19 @@ func (p *SyncAPIProducer) SendToDevice(
devices = append(devices, deviceID)
}
js, err := json.Marshal(message)
if err != nil {
return err
}
log.WithFields(log.Fields{
"user_id": userID,
"num_devices": len(devices),
"type": eventType,
}).Tracef("Producing to topic '%s'", p.TopicSendToDeviceEvent)
for _, device := range devices {
for i, device := range devices {
ote := &types.OutputSendToDeviceEvent{
UserID: userID,
DeviceID: device,
SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{
Sender: sender,
Type: eventType,
Content: js,
Content: message,
},
}
@ -115,15 +111,17 @@ func (p *SyncAPIProducer) SendToDevice(
log.WithError(err).Error("sendToDevice failed json.Marshal")
return err
}
m := &nats.Msg{
Subject: p.TopicSendToDeviceEvent,
Data: eventJSON,
Header: nats.Header{},
}
m := nats.NewMsg(p.TopicSendToDeviceEvent)
m.Data = eventJSON
m.Header.Set("sender", sender)
m.Header.Set(jetstream.UserID, userID)
if _, err = p.JetStream.PublishMsg(m, nats.Context(ctx)); err != nil {
log.WithError(err).Error("sendToDevice failed t.Producer.SendMessage")
if i < len(devices)-1 {
log.WithError(err).Warn("sendToDevice failed to PublishMsg, trying further devices")
continue
}
log.WithError(err).Error("sendToDevice failed to PublishMsg for all devices")
return err
}
}

View file

@ -151,7 +151,7 @@ func main() {
base := base.NewBaseDendrite(cfg, "Monolith")
defer base.Close() // nolint: errcheck
pRouter := pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk, false)
pRouter := pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk)
pQUIC := pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), pRouter, []string{"matrix"})
pMulticast := pineconeMulticast.NewMulticast(logrus.WithField("pinecone", "multicast"), pRouter)
pManager := pineconeConnections.NewConnectionManager(pRouter, nil)

View file

@ -4,6 +4,7 @@ import (
"fmt"
"log"
"strings"
"time"
"github.com/matrix-org/gomatrix"
"github.com/matrix-org/gomatrixserverlib"
@ -81,11 +82,14 @@ func runTests(baseURL, branchName string) error {
client: users[1].client, text: "4: " + branchName,
},
}
wantEventIDs := make(map[string]struct{}, 8)
for _, msg := range msgs {
_, err = msg.client.SendText(dmRoomID, msg.text)
var resp *gomatrix.RespSendEvent
resp, err = msg.client.SendText(dmRoomID, msg.text)
if err != nil {
return fmt.Errorf("failed to send text in dm room: %s", err)
}
wantEventIDs[resp.EventID] = struct{}{}
}
// attempt to create/join the shared public room
@ -113,11 +117,48 @@ func runTests(baseURL, branchName string) error {
}
// send messages
for _, msg := range msgs {
_, err = msg.client.SendText(publicRoomID, "public "+msg.text)
resp, err := msg.client.SendText(publicRoomID, "public "+msg.text)
if err != nil {
return fmt.Errorf("failed to send text in public room: %s", err)
}
wantEventIDs[resp.EventID] = struct{}{}
}
// Sync until we have all expected messages
doneCh := make(chan struct{})
go func() {
syncClient := users[0].client
since := ""
for len(wantEventIDs) > 0 {
select {
case <-doneCh:
return
default:
}
syncResp, err := syncClient.SyncRequest(1000, since, "1", false, "")
if err != nil {
continue
}
for _, room := range syncResp.Rooms.Join {
for _, ev := range room.Timeline.Events {
if ev.Type != "m.room.message" {
continue
}
delete(wantEventIDs, ev.ID)
}
}
since = syncResp.NextBatch
}
close(doneCh)
}()
select {
case <-time.After(time.Second * 10):
close(doneCh)
return fmt.Errorf("failed to receive all expected messages: %+v", wantEventIDs)
case <-doneCh:
}
log.Printf("OK! rooms(public=%s, dm=%s) users(%s, %s)\n", publicRoomID, dmRoomID, users[0].userID, users[1].userID)
return nil
}

View file

@ -11,8 +11,8 @@ possible to get started.
## Sign off
We ask that everyone who contributes to the project signs off their contributions
in accordance with the [DCO](https://github.com/matrix-org/matrix-spec/blob/main/CONTRIBUTING.rst#sign-off).
We require that everyone who contributes to the project signs off their contributions
in accordance with the [Developer Certificate of Origin](https://github.com/matrix-org/matrix-spec/blob/main/CONTRIBUTING.rst#sign-off).
In effect, this means adding a statement to your pull requests or commit messages
along the lines of:
@ -20,7 +20,18 @@ along the lines of:
Signed-off-by: Full Name <email address>
```
Unfortunately we can't accept contributions without it.
Unfortunately we can't accept contributions without a sign-off.
Please note that we can only accept contributions under a legally identifiable name,
such as your name as it appears on government-issued documentation or common-law names
(claimed by legitimate usage or repute). We cannot accept sign-offs from a pseudonym or
alias and cannot accept anonymous contributions.
If you would prefer to sign off privately instead (so as to not reveal your full
name on a public pull request), you can do so by emailing a sign-off declaration
and a link to your pull request directly to the [Matrix.org Foundation](https://matrix.org/foundation/)
at `dco@matrix.org`. Once a private sign-off has been made, you will not be required
to do so for future contributions.
## Getting up and running

View file

@ -10,24 +10,6 @@ permalink: /installation/database
Dendrite uses SQL databases to store data. Depending on the database engine being used, you
may need to perform some manual steps outlined below.
## SQLite
SQLite deployments do not require manual database creation. Simply configure the database
filenames in the Dendrite configuration file and start Dendrite. The databases will be created
and populated automatically.
Note that Dendrite **cannot share a single SQLite database across multiple components**. Each
component must be configured with its own SQLite database filename. You will have to remove
the `global.database` section from your Dendrite config and add it to each individual section
instead in order to use SQLite.
### Connection strings
Connection strings for SQLite databases take the following forms:
* Current working directory path: `file:dendrite_component.db`
* Full specified path: `file:///path/to/dendrite_component.db`
## PostgreSQL
Dendrite can automatically populate the database with the relevant tables and indexes, but
@ -106,3 +88,25 @@ for i in appservice federationapi mediaapi mscs roomserver syncapi keyserver use
sudo -u postgres createdb -O dendrite dendrite_$i
done
```
## SQLite
**WARNING:** The Dendrite SQLite backend is slower, less reliable and not recommended for
production usage. You should use PostgreSQL instead. We may not be able to provide support if
you run into issues with your deployment while using the SQLite backend.
SQLite deployments do not require manual database creation. Simply configure the database
filenames in the Dendrite configuration file and start Dendrite. The databases will be created
and populated automatically.
Note that Dendrite **cannot share a single SQLite database across multiple components**. Each
component must be configured with its own SQLite database filename. You will have to remove
the `global.database` section from your Dendrite config and add it to each individual section
instead in order to use SQLite.
### Connection strings
Connection strings for SQLite databases take the following forms:
* Current working directory path: `file:dendrite_component.db`
* Full specified path: `file:///path/to/dendrite_component.db`

View file

@ -64,7 +64,7 @@ func (p *SyncAPIProducer) SendReceipt(
func (p *SyncAPIProducer) SendToDevice(
ctx context.Context, sender, userID, deviceID, eventType string,
message interface{},
message json.RawMessage,
) error {
devices := []string{}
_, domain, err := gomatrixserverlib.SplitID('@', userID)
@ -92,24 +92,19 @@ func (p *SyncAPIProducer) SendToDevice(
devices = append(devices, deviceID)
}
js, err := json.Marshal(message)
if err != nil {
return err
}
log.WithFields(log.Fields{
"user_id": userID,
"num_devices": len(devices),
"type": eventType,
}).Tracef("Producing to topic '%s'", p.TopicSendToDeviceEvent)
for _, device := range devices {
for i, device := range devices {
ote := &types.OutputSendToDeviceEvent{
UserID: userID,
DeviceID: device,
SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{
Sender: sender,
Type: eventType,
Content: js,
Content: message,
},
}
@ -118,16 +113,17 @@ func (p *SyncAPIProducer) SendToDevice(
log.WithError(err).Error("sendToDevice failed json.Marshal")
return err
}
m := &nats.Msg{
Subject: p.TopicSendToDeviceEvent,
Data: eventJSON,
Header: nats.Header{},
}
m := nats.NewMsg(p.TopicSendToDeviceEvent)
m.Data = eventJSON
m.Header.Set("sender", sender)
m.Header.Set(jetstream.UserID, userID)
if _, err = p.JetStream.PublishMsg(m, nats.Context(ctx)); err != nil {
log.WithError(err).Error("sendToDevice failed t.Producer.SendMessage")
if i < len(devices)-1 {
log.WithError(err).Warn("sendToDevice failed to PublishMsg, trying further devices")
continue
}
log.WithError(err).Error("sendToDevice failed to PublishMsg for all devices")
return err
}
}
@ -167,10 +163,10 @@ func (p *SyncAPIProducer) SendPresence(
}
func (p *SyncAPIProducer) SendDeviceListUpdate(
ctx context.Context, deviceListUpdate gomatrixserverlib.RawJSON, origin string,
ctx context.Context, deviceListUpdate gomatrixserverlib.RawJSON, origin gomatrixserverlib.ServerName,
) (err error) {
m := nats.NewMsg(p.TopicDeviceListUpdate)
m.Header.Set("origin", origin)
m.Header.Set("origin", string(origin))
m.Data = deviceListUpdate
log.Debugf("Sending device list update: %+v", m.Header)
_, err = p.JetStream.PublishMsg(m, nats.Context(ctx))

View file

@ -359,7 +359,7 @@ func (t *txnReq) processEDUs(ctx context.Context) {
}
}
case gomatrixserverlib.MDeviceListUpdate:
if err := t.producer.SendDeviceListUpdate(ctx, e.Content, e.Origin); err != nil {
if err := t.producer.SendDeviceListUpdate(ctx, e.Content, t.Origin); err != nil {
util.GetLogger(ctx).WithError(err).Error("failed to InputDeviceListUpdate")
}
case gomatrixserverlib.MReceipt:

2
go.mod
View file

@ -23,7 +23,7 @@ require (
github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16
github.com/matrix-org/gomatrixserverlib v0.0.0-20220912142654-7b96db48888a
github.com/matrix-org/pinecone v0.0.0-20220912093434-b215925d5534
github.com/matrix-org/pinecone v0.0.0-20220914131522-608215eb1b29
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
github.com/mattn/go-sqlite3 v1.14.13
github.com/nats-io/nats-server/v2 v2.9.0

4
go.sum
View file

@ -390,8 +390,8 @@ github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5d
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s=
github.com/matrix-org/gomatrixserverlib v0.0.0-20220912142654-7b96db48888a h1:XeGBDZZsUe4kgj3myl0EiuDNVWxszJecMTrON3Wn9sI=
github.com/matrix-org/gomatrixserverlib v0.0.0-20220912142654-7b96db48888a/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk=
github.com/matrix-org/pinecone v0.0.0-20220912093434-b215925d5534 h1:XuJYAJNkdG3zj9cO0yQSvL+Sp2xogsTOuZRx7PwdtoA=
github.com/matrix-org/pinecone v0.0.0-20220912093434-b215925d5534/go.mod h1:K0N1ixHQxXoCyqolDqVxPM3ArrDtcMs8yegOx2Lfv9k=
github.com/matrix-org/pinecone v0.0.0-20220914131522-608215eb1b29 h1:/AIaqhK1BBi2sMEVQdgZRV8H8sNloAGCgztLZhsPqD0=
github.com/matrix-org/pinecone v0.0.0-20220914131522-608215eb1b29/go.mod h1:K0N1ixHQxXoCyqolDqVxPM3ArrDtcMs8yegOx2Lfv9k=
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 h1:eCEHXWDv9Rm335MSuB49mFUK44bwZPFSDde3ORE3syk=
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=

View file

@ -155,5 +155,10 @@ func InsertMigration(ctx context.Context, db *sql.DB, migrationName string) erro
time.Now().Format(time.RFC3339),
internal.VersionString(),
)
// If the migration was already executed, we'll get a unique constraint error,
// return nil instead, to avoid unnecessary logging.
if IsUniqueConstraintViolationErr(err) {
return nil
}
return err
}

View file

@ -12,12 +12,27 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build wasm
// +build wasm
//go:build !wasm
// +build !wasm
package sqlutil
// IsUniqueConstraintViolationErr no-ops for this architecture
import (
"github.com/lib/pq"
"github.com/mattn/go-sqlite3"
)
// IsUniqueConstraintViolationErr returns true if the error is an unique_violation error
func IsUniqueConstraintViolationErr(err error) bool {
switch e := err.(type) {
case *pq.Error:
return e.Code == "23505"
case pq.Error:
return e.Code == "23505"
case *sqlite3.Error:
return e.Code == sqlite3.ErrConstraint
case sqlite3.Error:
return e.Code == sqlite3.ErrConstraint
}
return false
}

View file

@ -12,15 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build !wasm
// +build !wasm
//go:build wasm
// +build wasm
package sqlutil
import "github.com/lib/pq"
import "github.com/mattn/go-sqlite3"
// IsUniqueConstraintViolationErr returns true if the error is a postgresql unique_violation error
// IsUniqueConstraintViolationErr returns true if the error is an unique_violation error
func IsUniqueConstraintViolationErr(err error) bool {
pqErr, ok := err.(*pq.Error)
return ok && pqErr.Code == "23505"
switch e := err.(type) {
case *sqlite3.Error:
return e.Code == sqlite3.ErrConstraint
case sqlite3.Error:
return e.Code == sqlite3.ErrConstraint
}
return false
}

View file

@ -18,8 +18,7 @@ import (
"context"
"database/sql"
"errors"
"github.com/sirupsen/logrus"
"fmt"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
@ -81,8 +80,7 @@ func executeMigration(ctx context.Context, db *sql.DB) error {
if err != nil {
if errors.Is(err, sql.ErrNoRows) { // migration was already executed, as the column was removed
if err = sqlutil.InsertMigration(ctx, db, migrationName); err != nil {
// not a fatal error, log and continue
logrus.WithError(err).Warnf("unable to manually insert migration '%s'", migrationName)
return fmt.Errorf("unable to manually insert migration '%s': %w", migrationName, err)
}
return nil
}

View file

@ -18,8 +18,7 @@ import (
"context"
"database/sql"
"errors"
"github.com/sirupsen/logrus"
"fmt"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
@ -80,8 +79,7 @@ func executeMigration(ctx context.Context, db *sql.DB) error {
if err != nil {
if errors.Is(err, sql.ErrNoRows) { // migration was already executed, as the column was removed
if err = sqlutil.InsertMigration(ctx, db, migrationName); err != nil {
// not a fatal error, log and continue
logrus.WithError(err).Warnf("unable to manually insert migration '%s'", migrationName)
return fmt.Errorf("unable to manually insert migration '%s': %w", migrationName, err)
}
return nil
}

View file

@ -439,6 +439,7 @@ type QueryMembershipAtEventRequest struct {
// QueryMembershipAtEventResponse is the response to QueryMembershipAtEventRequest.
type QueryMembershipAtEventResponse struct {
// Memberships is a map from eventID to a list of events (if any).
// Memberships is a map from eventID to a list of events (if any). Events that
// do not have known state will return an empty array here.
Memberships map[string][]*gomatrixserverlib.HeaderedEvent `json:"memberships"`
}

View file

@ -176,7 +176,8 @@ func (r *Inputer) Start() error {
},
nats.HeadersOnly(),
nats.DeliverAll(),
nats.AckAll(),
nats.AckExplicit(),
nats.ReplayInstant(),
nats.BindStream(r.InputRoomEventTopic),
)
return err

View file

@ -264,16 +264,27 @@ func (u *latestEventsUpdater) latestState() error {
return fmt.Errorf("roomState.CalculateAndStoreStateAfterEvents: %w", err)
}
// Now that we have a new state snapshot based on the latest events,
// we can compare that new snapshot to the previous one and see what
// has changed. This gives us one list of removed state events and
// another list of added ones. Replacing a value for a state-key tuple
// will result one removed (the old event) and one added (the new event).
u.removed, u.added, err = roomState.DifferenceBetweeenStateSnapshots(
ctx, u.oldStateNID, u.newStateNID,
)
if err != nil {
return fmt.Errorf("roomState.DifferenceBetweenStateSnapshots: %w", err)
// Include information about what changed in the state transition. If the
// event rewrites the state (i.e. is a federated join) then we will simply
// include the entire state snapshot as added events, as the "RewritesState"
// flag in the output event signals downstream components to purge their
// room state first. If it doesn't rewrite the state then we will work out
// what the difference is between the state snapshots and send that. In all
// cases where a state event is being replaced, the old state event will
// appear in "removed" and the replacement will appear in "added".
if u.rewritesState {
u.removed = []types.StateEntry{}
u.added, err = roomState.LoadStateAtSnapshot(ctx, u.newStateNID)
if err != nil {
return fmt.Errorf("roomState.LoadStateAtSnapshot: %w", err)
}
} else {
u.removed, u.added, err = roomState.DifferenceBetweeenStateSnapshots(
ctx, u.oldStateNID, u.newStateNID,
)
if err != nil {
return fmt.Errorf("roomState.DifferenceBetweenStateSnapshots: %w", err)
}
}
if removed := len(u.removed) - len(u.added); !u.rewritesState && removed > 0 {

View file

@ -106,7 +106,7 @@ func (r *Queryer) QueryStateAfterEvents(
return err
}
if len(request.PrevEventIDs) > 1 && len(request.StateToFetch) == 0 {
if len(request.PrevEventIDs) > 1 {
var authEventIDs []string
for _, e := range stateEvents {
authEventIDs = append(authEventIDs, e.AuthEventIDs()...)
@ -208,6 +208,9 @@ func (r *Queryer) QueryMembershipForUser(
return err
}
// QueryMembershipAtEvent returns the known memberships at a given event.
// If the state before an event is not known, an empty list will be returned
// for that event instead.
func (r *Queryer) QueryMembershipAtEvent(
ctx context.Context,
request *api.QueryMembershipAtEventRequest,
@ -237,7 +240,11 @@ func (r *Queryer) QueryMembershipAtEvent(
}
for _, eventID := range request.EventIDs {
stateEntry := stateEntries[eventID]
stateEntry, ok := stateEntries[eventID]
if !ok {
response.Memberships[eventID] = []*gomatrixserverlib.HeaderedEvent{}
continue
}
memberships, err := helpers.GetMembershipsAtState(ctx, r.DB, stateEntry, false)
if err != nil {
return fmt.Errorf("unable to get memberships at state: %w", err)

View file

@ -18,6 +18,7 @@ package state
import (
"context"
"database/sql"
"fmt"
"sort"
"sync"
@ -134,11 +135,14 @@ func (v *StateResolution) LoadMembershipAtEvent(
for i := range eventIDs {
eventID := eventIDs[i]
snapshotNID, err := v.db.SnapshotNIDFromEventID(ctx, eventID)
if err != nil {
if err != nil && err != sql.ErrNoRows {
return nil, fmt.Errorf("LoadStateAtEvent.SnapshotNIDFromEventID failed for event %s : %w", eventID, err)
}
if snapshotNID == 0 {
return nil, fmt.Errorf("LoadStateAtEvent.SnapshotNIDFromEventID(%s) returned 0 NID, was this event stored?", eventID)
// If we don't know a state snapshot for this event then we can't calculate
// memberships at the time of the event, so skip over it. This means that
// it isn't guaranteed that the response map will contain every single event.
continue
}
snapshotNIDMap[snapshotNID] = append(snapshotNIDMap[snapshotNID], eventID)
}

View file

@ -21,8 +21,6 @@ import (
"errors"
"fmt"
"github.com/sirupsen/logrus"
// Import the postgres database driver.
_ "github.com/lib/pq"
@ -79,8 +77,7 @@ func executeMigration(ctx context.Context, db *sql.DB) error {
if err != nil {
if errors.Is(err, sql.ErrNoRows) { // migration was already executed, as the column was removed
if err = sqlutil.InsertMigration(ctx, db, migrationName); err != nil {
// not a fatal error, log and continue
logrus.WithError(err).Warnf("unable to manually insert migration '%s'", migrationName)
return fmt.Errorf("unable to manually insert migration '%s': %w", migrationName, err)
}
return nil
}

View file

@ -22,7 +22,6 @@ import (
"fmt"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/sqlutil"
@ -87,8 +86,7 @@ func executeMigration(ctx context.Context, db *sql.DB) error {
if err != nil {
if errors.Is(err, sql.ErrNoRows) { // migration was already executed, as the column was removed
if err = sqlutil.InsertMigration(ctx, db, migrationName); err != nil {
// not a fatal error, log and continue
logrus.WithError(err).Warnf("unable to manually insert migration '%s'", migrationName)
return fmt.Errorf("unable to manually insert migration '%s': %w", migrationName, err)
}
return nil
}

View file

@ -19,16 +19,17 @@ import (
"encoding/json"
"github.com/getsentry/sentry-go"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
)
// OutputSendToDeviceEventConsumer consumes events that originated in the EDU server.
@ -79,16 +80,18 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(ctx context.Context, msgs []
_, domain, err := gomatrixserverlib.SplitID('@', userID)
if err != nil {
sentry.CaptureException(err)
log.WithError(err).Errorf("send-to-device: failed to split user id, dropping message")
return true
}
if domain != s.serverName {
log.Tracef("ignoring send-to-device event with destination %s", domain)
return true
}
var output types.OutputSendToDeviceEvent
if err = json.Unmarshal(msg.Data, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("output log: message parse failure")
log.WithError(err).Errorf("send-to-device: message parse failure")
sentry.CaptureException(err)
return true
}
@ -105,7 +108,7 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(ctx context.Context, msgs []
)
if err != nil {
sentry.CaptureException(err)
log.WithError(err).Errorf("failed to store send-to-device message")
log.WithError(err).Errorf("send-to-device: failed to store message")
return false
}

View file

@ -388,7 +388,7 @@ func applyHistoryVisibilityFilter(
if err != nil {
// Not a fatal error, we can continue without the stateEvents,
// they are only needed if there are state events in the timeline.
logrus.WithError(err).Warnf("failed to get current room state")
logrus.WithError(err).Warnf("Failed to get current room state for history visibility")
}
alwaysIncludeIDs := make(map[string]struct{}, len(stateEvents))
for _, ev := range stateEvents {
@ -397,7 +397,6 @@ func applyHistoryVisibilityFilter(
startTime := time.Now()
events, err := internal.ApplyHistoryVisibilityFilter(ctx, db, rsAPI, recentEvents, alwaysIncludeIDs, userID, "sync")
if err != nil {
return nil, err
}
logrus.WithFields(logrus.Fields{
@ -405,7 +404,7 @@ func applyHistoryVisibilityFilter(
"room_id": roomID,
"before": len(recentEvents),
"after": len(events),
}).Debug("applied history visibility (sync)")
}).Trace("Applied history visibility (sync)")
return events, nil
}

View file

@ -624,9 +624,7 @@ func testSendToDevice(t *testing.T, dbType test.DBType) {
// Send to-device messages of type "m.dendrite.test" with content `{"dummy":"message $counter"}`
for i := 0; i < tc.sendMessagesCount; i++ {
msgCounter++
msg := map[string]string{
"dummy": fmt.Sprintf("message %d", msgCounter),
}
msg := json.RawMessage(fmt.Sprintf(`{"dummy":"message %d"}`, msgCounter))
if err := producer.SendToDevice(ctx, user.ID, user.ID, alice.ID, "m.dendrite.test", msg); err != nil {
t.Fatalf("unable to send to device message: %v", err)
}