Merge branch 'main' into neilalexander/purgeroom

This commit is contained in:
Neil Alexander 2022-08-30 16:38:42 +01:00 committed by GitHub
commit e1eccbbfb7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 38 additions and 20 deletions

View file

@ -18,14 +18,15 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
"github.com/matrix-org/dendrite/appservice/storage" "github.com/matrix-org/dendrite/appservice/storage"
"github.com/matrix-org/dendrite/appservice/types" "github.com/matrix-org/dendrite/appservice/types"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
@ -103,6 +104,7 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msg *nats.Msg)
} }
if len(eventsReq.EventIDs) > 0 { if len(eventsReq.EventIDs) > 0 {
if err := s.rsAPI.QueryEventsByID(s.ctx, eventsReq, eventsRes); err != nil { if err := s.rsAPI.QueryEventsByID(s.ctx, eventsReq, eventsRes); err != nil {
log.WithError(err).Errorf("s.rsAPI.QueryEventsByID failed")
return false return false
} }
events = append(events, eventsRes.Events...) events = append(events, eventsRes.Events...)

2
go.mod
View file

@ -21,7 +21,7 @@ require (
github.com/matrix-org/dugong v0.0.0-20210921133753-66e6b1c67e2e github.com/matrix-org/dugong v0.0.0-20210921133753-66e6b1c67e2e
github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91 github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16
github.com/matrix-org/gomatrixserverlib v0.0.0-20220828085018-5f66df08dc66 github.com/matrix-org/gomatrixserverlib v0.0.0-20220830104348-2020bdc55859
github.com/matrix-org/pinecone v0.0.0-20220803093810-b7a830c08fb9 github.com/matrix-org/pinecone v0.0.0-20220803093810-b7a830c08fb9
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
github.com/mattn/go-sqlite3 v1.14.13 github.com/mattn/go-sqlite3 v1.14.13

4
go.sum
View file

@ -343,8 +343,8 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91/go.mod h1
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0= github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0=
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4= github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4=
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s= github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s=
github.com/matrix-org/gomatrixserverlib v0.0.0-20220828085018-5f66df08dc66 h1:4+ycQUZ4Do5LeEuiesz6gD7LkjRNUtk8oETFqjalZxQ= github.com/matrix-org/gomatrixserverlib v0.0.0-20220830104348-2020bdc55859 h1:14mOIE2sGS6YTG1+3GFbVWmAisW5pzyfn3SxJTjkR2I=
github.com/matrix-org/gomatrixserverlib v0.0.0-20220828085018-5f66df08dc66/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk= github.com/matrix-org/gomatrixserverlib v0.0.0-20220830104348-2020bdc55859/go.mod h1:jX38yp3SSLJNftBg3PXU1ayd0PCLIiDHQ4xAc9DIixk=
github.com/matrix-org/pinecone v0.0.0-20220803093810-b7a830c08fb9 h1:ed8yvWhTLk7+sNeK/eOZRTvESFTOHDRevoRoyeqPtvY= github.com/matrix-org/pinecone v0.0.0-20220803093810-b7a830c08fb9 h1:ed8yvWhTLk7+sNeK/eOZRTvESFTOHDRevoRoyeqPtvY=
github.com/matrix-org/pinecone v0.0.0-20220803093810-b7a830c08fb9/go.mod h1:P4MqPf+u83OPulPJ+XTbSDbbWrdFYNY4LZ/B1PIduFE= github.com/matrix-org/pinecone v0.0.0-20220803093810-b7a830c08fb9/go.mod h1:P4MqPf+u83OPulPJ+XTbSDbbWrdFYNY4LZ/B1PIduFE=
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U= github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=

View file

@ -60,7 +60,7 @@ func NewInternalAPI(
updater := internal.NewDeviceListUpdater(db, ap, keyChangeProducer, fedClient, 8) // 8 workers TODO: configurable updater := internal.NewDeviceListUpdater(db, ap, keyChangeProducer, fedClient, 8) // 8 workers TODO: configurable
ap.Updater = updater ap.Updater = updater
go func() { go func() {
if err = updater.Start(); err != nil { if err := updater.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start device list updater") logrus.WithError(err).Panicf("failed to start device list updater")
} }
}() }()
@ -68,7 +68,7 @@ func NewInternalAPI(
dlConsumer := consumers.NewDeviceListUpdateConsumer( dlConsumer := consumers.NewDeviceListUpdateConsumer(
base.ProcessContext, cfg, js, updater, base.ProcessContext, cfg, js, updater,
) )
if err = dlConsumer.Start(); err != nil { if err := dlConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start device list consumer") logrus.WithError(err).Panic("failed to start device list consumer")
} }

View file

@ -25,21 +25,23 @@ import (
_ "net/http/pprof" _ "net/http/pprof"
"os" "os"
"os/signal" "os/signal"
"sync"
"syscall" "syscall"
"time" "time"
"github.com/getsentry/sentry-go" "github.com/getsentry/sentry-go"
sentryhttp "github.com/getsentry/sentry-go/http" sentryhttp "github.com/getsentry/sentry-go/http"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/internal/pushgateway"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/atomic" "go.uber.org/atomic"
"golang.org/x/net/http2" "golang.org/x/net/http2"
"golang.org/x/net/http2/h2c" "golang.org/x/net/http2/h2c"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/internal/pushgateway"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/setup/process"
@ -47,6 +49,8 @@ import (
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/kardianos/minwinsvc" "github.com/kardianos/minwinsvc"
"github.com/sirupsen/logrus"
appserviceAPI "github.com/matrix-org/dendrite/appservice/api" appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
asinthttp "github.com/matrix-org/dendrite/appservice/inthttp" asinthttp "github.com/matrix-org/dendrite/appservice/inthttp"
federationAPI "github.com/matrix-org/dendrite/federationapi/api" federationAPI "github.com/matrix-org/dendrite/federationapi/api"
@ -58,7 +62,6 @@ import (
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
userapi "github.com/matrix-org/dendrite/userapi/api" userapi "github.com/matrix-org/dendrite/userapi/api"
userapiinthttp "github.com/matrix-org/dendrite/userapi/inthttp" userapiinthttp "github.com/matrix-org/dendrite/userapi/inthttp"
"github.com/sirupsen/logrus"
) )
// BaseDendrite is a base for creating new instances of dendrite. It parses // BaseDendrite is a base for creating new instances of dendrite. It parses
@ -87,6 +90,7 @@ type BaseDendrite struct {
Database *sql.DB Database *sql.DB
DatabaseWriter sqlutil.Writer DatabaseWriter sqlutil.Writer
EnableMetrics bool EnableMetrics bool
startupLock sync.Mutex
} }
const NoListener = "" const NoListener = ""
@ -394,6 +398,9 @@ func (b *BaseDendrite) SetupAndServeHTTP(
internalHTTPAddr, externalHTTPAddr config.HTTPAddress, internalHTTPAddr, externalHTTPAddr config.HTTPAddress,
certFile, keyFile *string, certFile, keyFile *string,
) { ) {
// Manually unlocked right before actually serving requests,
// as we don't return from this method (defer doesn't work).
b.startupLock.Lock()
internalAddr, _ := internalHTTPAddr.Address() internalAddr, _ := internalHTTPAddr.Address()
externalAddr, _ := externalHTTPAddr.Address() externalAddr, _ := externalHTTPAddr.Address()
@ -472,6 +479,7 @@ func (b *BaseDendrite) SetupAndServeHTTP(
externalRouter.PathPrefix(httputil.PublicMediaPathPrefix).Handler(b.PublicMediaAPIMux) externalRouter.PathPrefix(httputil.PublicMediaPathPrefix).Handler(b.PublicMediaAPIMux)
externalRouter.PathPrefix(httputil.PublicWellKnownPrefix).Handler(b.PublicWellKnownAPIMux) externalRouter.PathPrefix(httputil.PublicWellKnownPrefix).Handler(b.PublicWellKnownAPIMux)
b.startupLock.Unlock()
if internalAddr != NoListener && internalAddr != externalAddr { if internalAddr != NoListener && internalAddr != externalAddr {
go func() { go func() {
var internalShutdown atomic.Bool // RegisterOnShutdown can be called more than once var internalShutdown atomic.Bool // RegisterOnShutdown can be called more than once

View file

@ -41,6 +41,8 @@ CREATE TABLE IF NOT EXISTS syncapi_send_to_device (
-- The event content JSON. -- The event content JSON.
content TEXT NOT NULL content TEXT NOT NULL
); );
CREATE INDEX IF NOT EXISTS syncapi_send_to_device_user_id_device_id_idx ON syncapi_send_to_device(user_id, device_id);
` `
const insertSendToDeviceMessageSQL = ` const insertSendToDeviceMessageSQL = `

View file

@ -39,6 +39,8 @@ CREATE TABLE IF NOT EXISTS syncapi_send_to_device (
-- The event content JSON. -- The event content JSON.
content TEXT NOT NULL content TEXT NOT NULL
); );
CREATE INDEX IF NOT EXISTS syncapi_send_to_device_user_id_device_id_idx ON syncapi_send_to_device(user_id, device_id);
` `
const insertSendToDeviceMessageSQL = ` const insertSendToDeviceMessageSQL = `

View file

@ -303,7 +303,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
if stateFilter.LazyLoadMembers { if stateFilter.LazyLoadMembers {
delta.StateEvents, err = p.lazyLoadMembers( delta.StateEvents, err = p.lazyLoadMembers(
ctx, delta.RoomID, true, limited, stateFilter.IncludeRedundantMembers, ctx, delta.RoomID, true, limited, stateFilter,
device, recentEvents, delta.StateEvents, device, recentEvents, delta.StateEvents,
) )
if err != nil && err != sql.ErrNoRows { if err != nil && err != sql.ErrNoRows {
@ -532,7 +532,7 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
return nil, err return nil, err
} }
stateEvents, err = p.lazyLoadMembers(ctx, roomID, stateEvents, err = p.lazyLoadMembers(ctx, roomID,
false, limited, stateFilter.IncludeRedundantMembers, false, limited, stateFilter,
device, recentEvents, stateEvents, device, recentEvents, stateEvents,
) )
if err != nil && err != sql.ErrNoRows { if err != nil && err != sql.ErrNoRows {
@ -551,7 +551,7 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
func (p *PDUStreamProvider) lazyLoadMembers( func (p *PDUStreamProvider) lazyLoadMembers(
ctx context.Context, roomID string, ctx context.Context, roomID string,
incremental, limited, includeRedundant bool, incremental, limited bool, stateFilter *gomatrixserverlib.StateFilter,
device *userapi.Device, device *userapi.Device,
timelineEvents, stateEvents []*gomatrixserverlib.HeaderedEvent, timelineEvents, stateEvents []*gomatrixserverlib.HeaderedEvent,
) ([]*gomatrixserverlib.HeaderedEvent, error) { ) ([]*gomatrixserverlib.HeaderedEvent, error) {
@ -581,7 +581,7 @@ func (p *PDUStreamProvider) lazyLoadMembers(
stateKey := *event.StateKey() stateKey := *event.StateKey()
if _, ok := timelineUsers[stateKey]; ok || isGappedIncremental { if _, ok := timelineUsers[stateKey]; ok || isGappedIncremental {
newStateEvents = append(newStateEvents, event) newStateEvents = append(newStateEvents, event)
if !includeRedundant { if !stateFilter.IncludeRedundantMembers {
p.lazyLoadCache.StoreLazyLoadedUser(device, roomID, stateKey, event.EventID()) p.lazyLoadCache.StoreLazyLoadedUser(device, roomID, stateKey, event.EventID())
} }
delete(timelineUsers, stateKey) delete(timelineUsers, stateKey)
@ -596,6 +596,7 @@ func (p *PDUStreamProvider) lazyLoadMembers(
} }
// Query missing membership events // Query missing membership events
filter := gomatrixserverlib.DefaultStateFilter() filter := gomatrixserverlib.DefaultStateFilter()
filter.Limit = stateFilter.Limit
filter.Senders = &wantUsers filter.Senders = &wantUsers
filter.Types = &[]string{gomatrixserverlib.MRoomMember} filter.Types = &[]string{gomatrixserverlib.MRoomMember}
memberships, err := p.DB.GetStateEventsForRoom(ctx, roomID, &filter) memberships, err := p.DB.GetStateEventsForRoom(ctx, roomID, &filter)

View file

@ -7,6 +7,10 @@ import (
"strings" "strings"
"time" "time"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/internal/pushgateway" "github.com/matrix-org/dendrite/internal/pushgateway"
"github.com/matrix-org/dendrite/internal/pushrules" "github.com/matrix-org/dendrite/internal/pushrules"
@ -20,9 +24,6 @@ import (
"github.com/matrix-org/dendrite/userapi/storage" "github.com/matrix-org/dendrite/userapi/storage"
"github.com/matrix-org/dendrite/userapi/storage/tables" "github.com/matrix-org/dendrite/userapi/storage/tables"
"github.com/matrix-org/dendrite/userapi/util" "github.com/matrix-org/dendrite/userapi/util"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
) )
type OutputStreamEventConsumer struct { type OutputStreamEventConsumer struct {
@ -529,7 +530,9 @@ func (s *OutputStreamEventConsumer) notifyHTTP(ctx context.Context, event *gomat
case "event_id_only": case "event_id_only":
req = pushgateway.NotifyRequest{ req = pushgateway.NotifyRequest{
Notification: pushgateway.Notification{ Notification: pushgateway.Notification{
Counts: &pushgateway.Counts{}, Counts: &pushgateway.Counts{
Unread: userNumUnreadNotifs,
},
Devices: devices, Devices: devices,
EventID: event.EventID(), EventID: event.EventID(),
RoomID: event.RoomID(), RoomID: event.RoomID(),