mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-01 11:13:12 -06:00
Merge branch 'main' into neilalexander/v083
This commit is contained in:
commit
85f335f3d7
|
|
@ -19,6 +19,7 @@
|
|||
* A number of component interfaces have been refactored for cleanliness and developer ease
|
||||
* Event auth errors in the log should now be much more useful, including the reason for the event failures
|
||||
* The forward extremity calculation in the roomserver has been simplified
|
||||
* A new index has been added to the one-time keys table in the keyserver which should speed up key count lookups
|
||||
|
||||
### Fixes
|
||||
|
||||
|
|
@ -32,6 +33,8 @@
|
|||
* The media `/config` endpoint will no longer return a maximum upload size field if it is configured to be unlimited in the Dendrite config
|
||||
* The server notices room will no longer produce "User is already joined to the room" errors
|
||||
* Consumer errors will no longer flood the logs during a graceful shutdown
|
||||
* Sync API and federation API consumers will no longer unnecessarily query added state events matching the one in the output event
|
||||
* The Sync API will no longer unnecessarily track invites for remote users
|
||||
|
||||
## Dendrite 0.8.2 (2022-04-27)
|
||||
|
||||
|
|
|
|||
|
|
@ -32,7 +32,6 @@ import (
|
|||
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/setup/base"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
|
@ -55,7 +54,7 @@ func NewInternalAPI(
|
|||
gomatrixserverlib.WithSkipVerify(base.Cfg.AppServiceAPI.DisableTLSValidation),
|
||||
)
|
||||
|
||||
js, _ := jetstream.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
|
||||
js, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
|
||||
|
||||
// Create a connection to the appservice postgres DB
|
||||
appserviceDB, err := storage.NewDatabase(base, &base.Cfg.AppServiceAPI.Database)
|
||||
|
|
|
|||
|
|
@ -44,7 +44,7 @@ func AddPublicRoutes(
|
|||
) {
|
||||
cfg := &base.Cfg.ClientAPI
|
||||
mscCfg := &base.Cfg.MSCs
|
||||
js, natsClient := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
|
||||
js, natsClient := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
|
||||
|
||||
syncProducer := &producers.SyncAPIProducer{
|
||||
JetStream: js,
|
||||
|
|
|
|||
|
|
@ -146,28 +146,25 @@ func (s *OutputRoomEventConsumer) processInboundPeek(orp api.OutputNewInboundPee
|
|||
// processMessage updates the list of currently joined hosts in the room
|
||||
// and then sends the event to the hosts that were joined before the event.
|
||||
func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) error {
|
||||
eventsRes := &api.QueryEventsByIDResponse{}
|
||||
if len(ore.AddsStateEventIDs) > 0 {
|
||||
addsStateEvents, missingEventIDs := ore.NeededStateEventIDs()
|
||||
|
||||
// Ask the roomserver and add in the rest of the results into the set.
|
||||
// Finally, work out if there are any more events missing.
|
||||
if len(missingEventIDs) > 0 {
|
||||
eventsReq := &api.QueryEventsByIDRequest{
|
||||
EventIDs: ore.AddsStateEventIDs,
|
||||
EventIDs: missingEventIDs,
|
||||
}
|
||||
eventsRes := &api.QueryEventsByIDResponse{}
|
||||
if err := s.rsAPI.QueryEventsByID(s.ctx, eventsReq, eventsRes); err != nil {
|
||||
return fmt.Errorf("s.rsAPI.QueryEventsByID: %w", err)
|
||||
}
|
||||
|
||||
found := false
|
||||
for _, event := range eventsRes.Events {
|
||||
if event.EventID() == ore.Event.EventID() {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
eventsRes.Events = append(eventsRes.Events, ore.Event)
|
||||
if len(eventsRes.Events) != len(missingEventIDs) {
|
||||
return fmt.Errorf("missing state events")
|
||||
}
|
||||
addsStateEvents = append(addsStateEvents, eventsRes.Events...)
|
||||
}
|
||||
|
||||
addsJoinedHosts, err := joinedHostsFromEvents(gomatrixserverlib.UnwrapEventHeaders(eventsRes.Events))
|
||||
addsJoinedHosts, err := joinedHostsFromEvents(gomatrixserverlib.UnwrapEventHeaders(addsStateEvents))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -56,7 +56,7 @@ func AddPublicRoutes(
|
|||
) {
|
||||
cfg := &base.Cfg.FederationAPI
|
||||
mscCfg := &base.Cfg.MSCs
|
||||
js, _ := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
|
||||
js, _ := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
|
||||
producer := &producers.SyncAPIProducer{
|
||||
JetStream: js,
|
||||
TopicReceiptEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent),
|
||||
|
|
@ -115,7 +115,7 @@ func NewInternalAPI(
|
|||
FailuresUntilBlacklist: cfg.FederationMaxRetries,
|
||||
}
|
||||
|
||||
js, _ := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
|
||||
js, _ := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
|
||||
|
||||
queues := queue.NewOutgoingQueues(
|
||||
federationDB, base.ProcessContext,
|
||||
|
|
|
|||
8
go.mod
8
go.mod
|
|
@ -30,7 +30,7 @@ require (
|
|||
github.com/matrix-org/dugong v0.0.0-20210921133753-66e6b1c67e2e
|
||||
github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91
|
||||
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220506144035-8958f9dfdffd
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220509120958-8d818048c34c
|
||||
github.com/matrix-org/pinecone v0.0.0-20220408153826-2999ea29ed48
|
||||
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
|
||||
github.com/mattn/go-sqlite3 v1.14.10
|
||||
|
|
@ -48,17 +48,17 @@ require (
|
|||
github.com/prometheus/client_golang v1.12.1
|
||||
github.com/sirupsen/logrus v1.8.1
|
||||
github.com/stretchr/testify v1.7.0
|
||||
github.com/tidwall/gjson v1.14.0
|
||||
github.com/tidwall/gjson v1.14.1
|
||||
github.com/tidwall/sjson v1.2.4
|
||||
github.com/uber/jaeger-client-go v2.30.0+incompatible
|
||||
github.com/uber/jaeger-lib v2.4.1+incompatible
|
||||
github.com/yggdrasil-network/yggdrasil-go v0.4.3
|
||||
go.uber.org/atomic v1.9.0
|
||||
golang.org/x/crypto v0.0.0-20220331220935-ae2d96664a29
|
||||
golang.org/x/crypto v0.0.0-20220507011949-2cf3adece122
|
||||
golang.org/x/image v0.0.0-20220321031419-a8550c1d254a
|
||||
golang.org/x/mobile v0.0.0-20220407111146-e579adbbc4a2
|
||||
golang.org/x/net v0.0.0-20220407224826-aac1ed45d8e3
|
||||
golang.org/x/sys v0.0.0-20220406163625-3f8b81556e12 // indirect
|
||||
golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6 // indirect
|
||||
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211
|
||||
gopkg.in/h2non/bimg.v1 v1.1.9
|
||||
gopkg.in/yaml.v2 v2.4.0
|
||||
|
|
|
|||
16
go.sum
16
go.sum
|
|
@ -795,8 +795,8 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91/go.mod h1
|
|||
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0=
|
||||
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4=
|
||||
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s=
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220506144035-8958f9dfdffd h1:11Wh+NMPDE5UDEx50RJnxeYj7zH5HEClzGfVMAjUy9U=
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220506144035-8958f9dfdffd/go.mod h1:V5eO8rn/C3rcxig37A/BCeKerLFS+9Avg/77FIeTZ48=
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220509120958-8d818048c34c h1:KqzqFWxvs90pcDaW9QEveW+Q5JcEYuNnKyaqXc+ohno=
|
||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20220509120958-8d818048c34c/go.mod h1:V5eO8rn/C3rcxig37A/BCeKerLFS+9Avg/77FIeTZ48=
|
||||
github.com/matrix-org/pinecone v0.0.0-20220408153826-2999ea29ed48 h1:W0sjjC6yjskHX4mb0nk3p0fXAlbU5bAFUFeEtlrPASE=
|
||||
github.com/matrix-org/pinecone v0.0.0-20220408153826-2999ea29ed48/go.mod h1:ulJzsVOTssIVp1j/m5eI//4VpAGDkMt5NrRuAVX7wpc=
|
||||
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=
|
||||
|
|
@ -1130,8 +1130,8 @@ github.com/syndtr/gocapability v0.0.0-20200815063812-42c35b437635/go.mod h1:hkRG
|
|||
github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07/go.mod h1:kDXzergiv9cbyO7IOYJZWg1U88JhDg3PB6klq9Hg2pA=
|
||||
github.com/tchap/go-patricia v2.2.6+incompatible/go.mod h1:bmLyhP68RS6kStMGxByiQ23RP/odRBOTVjwp2cDyi6I=
|
||||
github.com/tidwall/gjson v1.12.1/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
|
||||
github.com/tidwall/gjson v1.14.0 h1:6aeJ0bzojgWLa82gDQHcx3S0Lr/O51I9bJ5nv6JFx5w=
|
||||
github.com/tidwall/gjson v1.14.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
|
||||
github.com/tidwall/gjson v1.14.1 h1:iymTbGkQBhveq21bEvAQ81I0LEBork8BFe1CUZXdyuo=
|
||||
github.com/tidwall/gjson v1.14.1/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
|
||||
github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
|
||||
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
|
||||
github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
|
||||
|
|
@ -1281,8 +1281,8 @@ golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm
|
|||
golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
|
||||
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
|
||||
golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
|
||||
golang.org/x/crypto v0.0.0-20220331220935-ae2d96664a29 h1:tkVvjkPTB7pnW3jnid7kNyAMPVWllTNOf/qKDze4p9o=
|
||||
golang.org/x/crypto v0.0.0-20220331220935-ae2d96664a29/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
|
||||
golang.org/x/crypto v0.0.0-20220507011949-2cf3adece122 h1:NvGWuYG8dkDHFSKksI1P9faiVJ9rayE6l0+ouWVIDs8=
|
||||
golang.org/x/crypto v0.0.0-20220507011949-2cf3adece122/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
|
||||
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
|
|
@ -1543,8 +1543,8 @@ golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBc
|
|||
golang.org/x/sys v0.0.0-20220111092808-5a964db01320/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220405052023-b1e9470b6e64/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220406163625-3f8b81556e12 h1:QyVthZKMsyaQwBTJE04jdNN0Pp5Fn9Qga0mrgxyERQM=
|
||||
golang.org/x/sys v0.0.0-20220406163625-3f8b81556e12/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6 h1:nonptSpoQ4vQjyraW20DXPAglgQfVnM9ZC6MmNLMR60=
|
||||
golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
|
|
|
|||
|
|
@ -39,7 +39,7 @@ func AddInternalRoutes(router *mux.Router, intAPI api.KeyInternalAPI) {
|
|||
func NewInternalAPI(
|
||||
base *base.BaseDendrite, cfg *config.KeyServer, fedClient fedsenderapi.FederationClient,
|
||||
) api.KeyInternalAPI {
|
||||
js, _ := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
|
||||
js, _ := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
|
||||
|
||||
db, err := storage.NewDatabase(base, &cfg.Database)
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -39,6 +39,8 @@ CREATE TABLE IF NOT EXISTS keyserver_one_time_keys (
|
|||
-- Clobber based on 4-uple of user/device/key/algorithm.
|
||||
CONSTRAINT keyserver_one_time_keys_unique UNIQUE (user_id, device_id, key_id, algorithm)
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS keyserver_one_time_keys_idx ON keyserver_one_time_keys (user_id, device_id);
|
||||
`
|
||||
|
||||
const upsertKeysSQL = "" +
|
||||
|
|
|
|||
|
|
@ -38,6 +38,8 @@ CREATE TABLE IF NOT EXISTS keyserver_one_time_keys (
|
|||
-- Clobber based on 4-uple of user/device/key/algorithm.
|
||||
UNIQUE (user_id, device_id, key_id, algorithm)
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS keyserver_one_time_keys_idx ON keyserver_one_time_keys (user_id, device_id);
|
||||
`
|
||||
|
||||
const upsertKeysSQL = "" +
|
||||
|
|
|
|||
|
|
@ -163,6 +163,19 @@ type OutputNewRoomEvent struct {
|
|||
TransactionID *TransactionID `json:"transaction_id,omitempty"`
|
||||
}
|
||||
|
||||
func (o *OutputNewRoomEvent) NeededStateEventIDs() ([]*gomatrixserverlib.HeaderedEvent, []string) {
|
||||
addsStateEvents := make([]*gomatrixserverlib.HeaderedEvent, 0, 1)
|
||||
missingEventIDs := make([]string, 0, len(o.AddsStateEventIDs))
|
||||
for _, eventID := range o.AddsStateEventIDs {
|
||||
if eventID != o.Event.EventID() {
|
||||
missingEventIDs = append(missingEventIDs, eventID)
|
||||
} else {
|
||||
addsStateEvents = append(addsStateEvents, o.Event)
|
||||
}
|
||||
}
|
||||
return addsStateEvents, missingEventIDs
|
||||
}
|
||||
|
||||
// An OutputOldRoomEvent is written when the roomserver receives an old event.
|
||||
// This will typically happen as a result of getting either missing events
|
||||
// or backfilling. Downstream components may wish to send these events to
|
||||
|
|
|
|||
|
|
@ -10,9 +10,9 @@ import (
|
|||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/roomserver/internal/input"
|
||||
"github.com/matrix-org/dendrite/roomserver/storage"
|
||||
"github.com/matrix-org/dendrite/setup/base"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||
"github.com/matrix-org/dendrite/setup/process"
|
||||
"github.com/matrix-org/dendrite/test"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/nats-io/nats.go"
|
||||
)
|
||||
|
|
@ -21,11 +21,11 @@ var js nats.JetStreamContext
|
|||
var jc *nats.Conn
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
var pc *process.ProcessContext
|
||||
pc, js, jc = jetstream.PrepareForTests()
|
||||
var b *base.BaseDendrite
|
||||
b, js, jc = test.Base(nil)
|
||||
code := m.Run()
|
||||
pc.ShutdownDendrite()
|
||||
pc.WaitForComponentsToFinish()
|
||||
b.ShutdownDendrite()
|
||||
b.WaitForComponentsToFinish()
|
||||
os.Exit(code)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -50,7 +50,7 @@ func NewInternalAPI(
|
|||
logrus.WithError(err).Panicf("failed to connect to room server db")
|
||||
}
|
||||
|
||||
js, nc := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
|
||||
js, nc := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
|
||||
|
||||
return internal.NewRoomserverAPI(
|
||||
base.ProcessContext, cfg, roomserverDB, js, nc,
|
||||
|
|
|
|||
|
|
@ -59,12 +59,12 @@ type eventJSONStatements struct {
|
|||
bulkSelectEventJSONStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func createEventJSONTable(db *sql.DB) error {
|
||||
func CreateEventJSONTable(db *sql.DB) error {
|
||||
_, err := db.Exec(eventJSONSchema)
|
||||
return err
|
||||
}
|
||||
|
||||
func prepareEventJSONTable(db *sql.DB) (tables.EventJSON, error) {
|
||||
func PrepareEventJSONTable(db *sql.DB) (tables.EventJSON, error) {
|
||||
s := &eventJSONStatements{}
|
||||
|
||||
return s, sqlutil.StatementList{
|
||||
|
|
@ -97,9 +97,9 @@ func (s *eventJSONStatements) BulkSelectEventJSON(
|
|||
// We might get fewer results than NIDs so we adjust the length of the slice before returning it.
|
||||
results := make([]tables.EventJSONPair, len(eventNIDs))
|
||||
i := 0
|
||||
var eventNID int64
|
||||
for ; rows.Next(); i++ {
|
||||
result := &results[i]
|
||||
var eventNID int64
|
||||
if err := rows.Scan(&eventNID, &result.EventJSON); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -76,12 +76,12 @@ type eventStateKeyStatements struct {
|
|||
bulkSelectEventStateKeyStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func createEventStateKeysTable(db *sql.DB) error {
|
||||
func CreateEventStateKeysTable(db *sql.DB) error {
|
||||
_, err := db.Exec(eventStateKeysSchema)
|
||||
return err
|
||||
}
|
||||
|
||||
func prepareEventStateKeysTable(db *sql.DB) (tables.EventStateKeys, error) {
|
||||
func PrepareEventStateKeysTable(db *sql.DB) (tables.EventStateKeys, error) {
|
||||
s := &eventStateKeyStatements{}
|
||||
|
||||
return s, sqlutil.StatementList{
|
||||
|
|
@ -123,9 +123,9 @@ func (s *eventStateKeyStatements) BulkSelectEventStateKeyNID(
|
|||
defer internal.CloseAndLogIfError(ctx, rows, "bulkSelectEventStateKeyNID: rows.close() failed")
|
||||
|
||||
result := make(map[string]types.EventStateKeyNID, len(eventStateKeys))
|
||||
var stateKey string
|
||||
var stateKeyNID int64
|
||||
for rows.Next() {
|
||||
var stateKey string
|
||||
var stateKeyNID int64
|
||||
if err := rows.Scan(&stateKey, &stateKeyNID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -149,9 +149,9 @@ func (s *eventStateKeyStatements) BulkSelectEventStateKey(
|
|||
defer internal.CloseAndLogIfError(ctx, rows, "bulkSelectEventStateKey: rows.close() failed")
|
||||
|
||||
result := make(map[types.EventStateKeyNID]string, len(eventStateKeyNIDs))
|
||||
var stateKey string
|
||||
var stateKeyNID int64
|
||||
for rows.Next() {
|
||||
var stateKey string
|
||||
var stateKeyNID int64
|
||||
if err := rows.Scan(&stateKey, &stateKeyNID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -99,12 +99,12 @@ type eventTypeStatements struct {
|
|||
bulkSelectEventTypeNIDStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func createEventTypesTable(db *sql.DB) error {
|
||||
func CreateEventTypesTable(db *sql.DB) error {
|
||||
_, err := db.Exec(eventTypesSchema)
|
||||
return err
|
||||
}
|
||||
|
||||
func prepareEventTypesTable(db *sql.DB) (tables.EventTypes, error) {
|
||||
func PrepareEventTypesTable(db *sql.DB) (tables.EventTypes, error) {
|
||||
s := &eventTypeStatements{}
|
||||
|
||||
return s, sqlutil.StatementList{
|
||||
|
|
@ -143,9 +143,9 @@ func (s *eventTypeStatements) BulkSelectEventTypeNID(
|
|||
defer internal.CloseAndLogIfError(ctx, rows, "bulkSelectEventTypeNID: rows.close() failed")
|
||||
|
||||
result := make(map[string]types.EventTypeNID, len(eventTypes))
|
||||
var eventType string
|
||||
var eventTypeNID int64
|
||||
for rows.Next() {
|
||||
var eventType string
|
||||
var eventTypeNID int64
|
||||
if err := rows.Scan(&eventType, &eventTypeNID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -155,12 +155,12 @@ type eventStatements struct {
|
|||
selectRoomNIDsForEventNIDsStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func createEventsTable(db *sql.DB) error {
|
||||
func CreateEventsTable(db *sql.DB) error {
|
||||
_, err := db.Exec(eventsSchema)
|
||||
return err
|
||||
}
|
||||
|
||||
func prepareEventsTable(db *sql.DB) (tables.Events, error) {
|
||||
func PrepareEventsTable(db *sql.DB) (tables.Events, error) {
|
||||
s := &eventStatements{}
|
||||
|
||||
return s, sqlutil.StatementList{
|
||||
|
|
@ -380,15 +380,15 @@ func (s *eventStatements) BulkSelectStateAtEventAndReference(
|
|||
defer internal.CloseAndLogIfError(ctx, rows, "bulkSelectStateAtEventAndReference: rows.close() failed")
|
||||
results := make([]types.StateAtEventAndReference, len(eventNIDs))
|
||||
i := 0
|
||||
var (
|
||||
eventTypeNID int64
|
||||
eventStateKeyNID int64
|
||||
eventNID int64
|
||||
stateSnapshotNID int64
|
||||
eventID string
|
||||
eventSHA256 []byte
|
||||
)
|
||||
for ; rows.Next(); i++ {
|
||||
var (
|
||||
eventTypeNID int64
|
||||
eventStateKeyNID int64
|
||||
eventNID int64
|
||||
stateSnapshotNID int64
|
||||
eventID string
|
||||
eventSHA256 []byte
|
||||
)
|
||||
if err = rows.Scan(
|
||||
&eventTypeNID, &eventStateKeyNID, &eventNID, &stateSnapshotNID, &eventID, &eventSHA256,
|
||||
); err != nil {
|
||||
|
|
@ -446,9 +446,9 @@ func (s *eventStatements) BulkSelectEventID(ctx context.Context, txn *sql.Tx, ev
|
|||
defer internal.CloseAndLogIfError(ctx, rows, "bulkSelectEventID: rows.close() failed")
|
||||
results := make(map[types.EventNID]string, len(eventNIDs))
|
||||
i := 0
|
||||
var eventNID int64
|
||||
var eventID string
|
||||
for ; rows.Next(); i++ {
|
||||
var eventNID int64
|
||||
var eventID string
|
||||
if err = rows.Scan(&eventNID, &eventID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -491,9 +491,9 @@ func (s *eventStatements) bulkSelectEventNID(ctx context.Context, txn *sql.Tx, e
|
|||
}
|
||||
defer internal.CloseAndLogIfError(ctx, rows, "bulkSelectEventNID: rows.close() failed")
|
||||
results := make(map[string]types.EventNID, len(eventIDs))
|
||||
var eventID string
|
||||
var eventNID int64
|
||||
for rows.Next() {
|
||||
var eventID string
|
||||
var eventNID int64
|
||||
if err = rows.Scan(&eventID, &eventNID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -522,9 +522,9 @@ func (s *eventStatements) SelectRoomNIDsForEventNIDs(
|
|||
}
|
||||
defer internal.CloseAndLogIfError(ctx, rows, "selectRoomNIDsForEventNIDsStmt: rows.close() failed")
|
||||
result := make(map[types.EventNID]types.RoomNID)
|
||||
var eventNID types.EventNID
|
||||
var roomNID types.RoomNID
|
||||
for rows.Next() {
|
||||
var eventNID types.EventNID
|
||||
var roomNID types.RoomNID
|
||||
if err = rows.Scan(&eventNID, &roomNID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -68,16 +68,16 @@ func Open(base *base.BaseDendrite, dbProperties *config.DatabaseOptions, cache c
|
|||
}
|
||||
|
||||
func (d *Database) create(db *sql.DB) error {
|
||||
if err := createEventStateKeysTable(db); err != nil {
|
||||
if err := CreateEventStateKeysTable(db); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := createEventTypesTable(db); err != nil {
|
||||
if err := CreateEventTypesTable(db); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := createEventJSONTable(db); err != nil {
|
||||
if err := CreateEventJSONTable(db); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := createEventsTable(db); err != nil {
|
||||
if err := CreateEventsTable(db); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := createRoomsTable(db); err != nil {
|
||||
|
|
@ -112,19 +112,19 @@ func (d *Database) create(db *sql.DB) error {
|
|||
}
|
||||
|
||||
func (d *Database) prepare(db *sql.DB, writer sqlutil.Writer, cache caching.RoomServerCaches) error {
|
||||
eventStateKeys, err := prepareEventStateKeysTable(db)
|
||||
eventStateKeys, err := PrepareEventStateKeysTable(db)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
eventTypes, err := prepareEventTypesTable(db)
|
||||
eventTypes, err := PrepareEventTypesTable(db)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
eventJSON, err := prepareEventJSONTable(db)
|
||||
eventJSON, err := PrepareEventJSONTable(db)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
events, err := prepareEventsTable(db)
|
||||
events, err := PrepareEventsTable(db)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -52,12 +52,12 @@ type eventJSONStatements struct {
|
|||
bulkSelectEventJSONStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func createEventJSONTable(db *sql.DB) error {
|
||||
func CreateEventJSONTable(db *sql.DB) error {
|
||||
_, err := db.Exec(eventJSONSchema)
|
||||
return err
|
||||
}
|
||||
|
||||
func prepareEventJSONTable(db *sql.DB) (tables.EventJSON, error) {
|
||||
func PrepareEventJSONTable(db *sql.DB) (tables.EventJSON, error) {
|
||||
s := &eventJSONStatements{
|
||||
db: db,
|
||||
}
|
||||
|
|
@ -101,9 +101,9 @@ func (s *eventJSONStatements) BulkSelectEventJSON(
|
|||
// We might get fewer results than NIDs so we adjust the length of the slice before returning it.
|
||||
results := make([]tables.EventJSONPair, len(eventNIDs))
|
||||
i := 0
|
||||
var eventNID int64
|
||||
for ; rows.Next(); i++ {
|
||||
result := &results[i]
|
||||
var eventNID int64
|
||||
if err := rows.Scan(&eventNID, &result.EventJSON); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -71,12 +71,12 @@ type eventStateKeyStatements struct {
|
|||
bulkSelectEventStateKeyStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func createEventStateKeysTable(db *sql.DB) error {
|
||||
func CreateEventStateKeysTable(db *sql.DB) error {
|
||||
_, err := db.Exec(eventStateKeysSchema)
|
||||
return err
|
||||
}
|
||||
|
||||
func prepareEventStateKeysTable(db *sql.DB) (tables.EventStateKeys, error) {
|
||||
func PrepareEventStateKeysTable(db *sql.DB) (tables.EventStateKeys, error) {
|
||||
s := &eventStateKeyStatements{
|
||||
db: db,
|
||||
}
|
||||
|
|
@ -128,9 +128,9 @@ func (s *eventStateKeyStatements) BulkSelectEventStateKeyNID(
|
|||
}
|
||||
defer internal.CloseAndLogIfError(ctx, rows, "bulkSelectEventStateKeyNID: rows.close() failed")
|
||||
result := make(map[string]types.EventStateKeyNID, len(eventStateKeys))
|
||||
var stateKey string
|
||||
var stateKeyNID int64
|
||||
for rows.Next() {
|
||||
var stateKey string
|
||||
var stateKeyNID int64
|
||||
if err := rows.Scan(&stateKey, &stateKeyNID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -159,9 +159,9 @@ func (s *eventStateKeyStatements) BulkSelectEventStateKey(
|
|||
}
|
||||
defer internal.CloseAndLogIfError(ctx, rows, "bulkSelectEventStateKey: rows.close() failed")
|
||||
result := make(map[types.EventStateKeyNID]string, len(eventStateKeyNIDs))
|
||||
var stateKey string
|
||||
var stateKeyNID int64
|
||||
for rows.Next() {
|
||||
var stateKey string
|
||||
var stateKeyNID int64
|
||||
if err := rows.Scan(&stateKey, &stateKeyNID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -79,12 +79,12 @@ type eventTypeStatements struct {
|
|||
bulkSelectEventTypeNIDStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func createEventTypesTable(db *sql.DB) error {
|
||||
func CreateEventTypesTable(db *sql.DB) error {
|
||||
_, err := db.Exec(eventTypesSchema)
|
||||
return err
|
||||
}
|
||||
|
||||
func prepareEventTypesTable(db *sql.DB) (tables.EventTypes, error) {
|
||||
func PrepareEventTypesTable(db *sql.DB) (tables.EventTypes, error) {
|
||||
s := &eventTypeStatements{
|
||||
db: db,
|
||||
}
|
||||
|
|
@ -139,9 +139,9 @@ func (s *eventTypeStatements) BulkSelectEventTypeNID(
|
|||
defer internal.CloseAndLogIfError(ctx, rows, "bulkSelectEventTypeNID: rows.close() failed")
|
||||
|
||||
result := make(map[string]types.EventTypeNID, len(eventTypes))
|
||||
var eventType string
|
||||
var eventTypeNID int64
|
||||
for rows.Next() {
|
||||
var eventType string
|
||||
var eventTypeNID int64
|
||||
if err := rows.Scan(&eventType, &eventTypeNID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -68,7 +68,8 @@ const bulkSelectStateEventByIDSQL = "" +
|
|||
const bulkSelectStateEventByNIDSQL = "" +
|
||||
"SELECT event_type_nid, event_state_key_nid, event_nid FROM roomserver_events" +
|
||||
" WHERE event_nid IN ($1)"
|
||||
// Rest of query is built by BulkSelectStateEventByNID
|
||||
|
||||
// Rest of query is built by BulkSelectStateEventByNID
|
||||
|
||||
const bulkSelectStateAtEventByIDSQL = "" +
|
||||
"SELECT event_type_nid, event_state_key_nid, event_nid, state_snapshot_nid, is_rejected FROM roomserver_events" +
|
||||
|
|
@ -126,12 +127,12 @@ type eventStatements struct {
|
|||
//selectRoomNIDsForEventNIDsStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func createEventsTable(db *sql.DB) error {
|
||||
func CreateEventsTable(db *sql.DB) error {
|
||||
_, err := db.Exec(eventsSchema)
|
||||
return err
|
||||
}
|
||||
|
||||
func prepareEventsTable(db *sql.DB) (tables.Events, error) {
|
||||
func PrepareEventsTable(db *sql.DB) (tables.Events, error) {
|
||||
s := &eventStatements{
|
||||
db: db,
|
||||
}
|
||||
|
|
@ -404,15 +405,15 @@ func (s *eventStatements) BulkSelectStateAtEventAndReference(
|
|||
defer internal.CloseAndLogIfError(ctx, rows, "bulkSelectStateAtEventAndReference: rows.close() failed")
|
||||
results := make([]types.StateAtEventAndReference, len(eventNIDs))
|
||||
i := 0
|
||||
var (
|
||||
eventTypeNID int64
|
||||
eventStateKeyNID int64
|
||||
eventNID int64
|
||||
stateSnapshotNID int64
|
||||
eventID string
|
||||
eventSHA256 []byte
|
||||
)
|
||||
for ; rows.Next(); i++ {
|
||||
var (
|
||||
eventTypeNID int64
|
||||
eventStateKeyNID int64
|
||||
eventNID int64
|
||||
stateSnapshotNID int64
|
||||
eventID string
|
||||
eventSHA256 []byte
|
||||
)
|
||||
if err = rows.Scan(
|
||||
&eventTypeNID, &eventStateKeyNID, &eventNID, &stateSnapshotNID, &eventID, &eventSHA256,
|
||||
); err != nil {
|
||||
|
|
@ -491,9 +492,9 @@ func (s *eventStatements) BulkSelectEventID(ctx context.Context, txn *sql.Tx, ev
|
|||
defer internal.CloseAndLogIfError(ctx, rows, "bulkSelectEventID: rows.close() failed")
|
||||
results := make(map[types.EventNID]string, len(eventNIDs))
|
||||
i := 0
|
||||
var eventNID int64
|
||||
var eventID string
|
||||
for ; rows.Next(); i++ {
|
||||
var eventNID int64
|
||||
var eventID string
|
||||
if err = rows.Scan(&eventNID, &eventID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -545,9 +546,9 @@ func (s *eventStatements) bulkSelectEventNID(ctx context.Context, txn *sql.Tx, e
|
|||
}
|
||||
defer internal.CloseAndLogIfError(ctx, rows, "bulkSelectEventNID: rows.close() failed")
|
||||
results := make(map[string]types.EventNID, len(eventIDs))
|
||||
var eventID string
|
||||
var eventNID int64
|
||||
for rows.Next() {
|
||||
var eventID string
|
||||
var eventNID int64
|
||||
if err = rows.Scan(&eventID, &eventNID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -595,9 +596,9 @@ func (s *eventStatements) SelectRoomNIDsForEventNIDs(
|
|||
}
|
||||
defer internal.CloseAndLogIfError(ctx, rows, "selectRoomNIDsForEventNIDsStmt: rows.close() failed")
|
||||
result := make(map[types.EventNID]types.RoomNID)
|
||||
var eventNID types.EventNID
|
||||
var roomNID types.RoomNID
|
||||
for rows.Next() {
|
||||
var eventNID types.EventNID
|
||||
var roomNID types.RoomNID
|
||||
if err = rows.Scan(&eventNID, &roomNID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -77,16 +77,16 @@ func Open(base *base.BaseDendrite, dbProperties *config.DatabaseOptions, cache c
|
|||
}
|
||||
|
||||
func (d *Database) create(db *sql.DB) error {
|
||||
if err := createEventStateKeysTable(db); err != nil {
|
||||
if err := CreateEventStateKeysTable(db); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := createEventTypesTable(db); err != nil {
|
||||
if err := CreateEventTypesTable(db); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := createEventJSONTable(db); err != nil {
|
||||
if err := CreateEventJSONTable(db); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := createEventsTable(db); err != nil {
|
||||
if err := CreateEventsTable(db); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := createRoomsTable(db); err != nil {
|
||||
|
|
@ -121,19 +121,19 @@ func (d *Database) create(db *sql.DB) error {
|
|||
}
|
||||
|
||||
func (d *Database) prepare(db *sql.DB, writer sqlutil.Writer, cache caching.RoomServerCaches) error {
|
||||
eventStateKeys, err := prepareEventStateKeysTable(db)
|
||||
eventStateKeys, err := PrepareEventStateKeysTable(db)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
eventTypes, err := prepareEventTypesTable(db)
|
||||
eventTypes, err := PrepareEventTypesTable(db)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
eventJSON, err := prepareEventJSONTable(db)
|
||||
eventJSON, err := PrepareEventJSONTable(db)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
events, err := prepareEventsTable(db)
|
||||
events, err := PrepareEventsTable(db)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
95
roomserver/storage/tables/event_json_table_test.go
Normal file
95
roomserver/storage/tables/event_json_table_test.go
Normal file
|
|
@ -0,0 +1,95 @@
|
|||
package tables_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
"github.com/matrix-org/dendrite/roomserver/storage/postgres"
|
||||
"github.com/matrix-org/dendrite/roomserver/storage/sqlite3"
|
||||
"github.com/matrix-org/dendrite/roomserver/storage/tables"
|
||||
"github.com/matrix-org/dendrite/roomserver/types"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/dendrite/test"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func mustCreateEventJSONTable(t *testing.T, dbType test.DBType) (tables.EventJSON, func()) {
|
||||
t.Helper()
|
||||
connStr, close := test.PrepareDBConnectionString(t, dbType)
|
||||
db, err := sqlutil.Open(&config.DatabaseOptions{
|
||||
ConnectionString: config.DataSource(connStr),
|
||||
}, sqlutil.NewExclusiveWriter())
|
||||
assert.NoError(t, err)
|
||||
var tab tables.EventJSON
|
||||
switch dbType {
|
||||
case test.DBTypePostgres:
|
||||
err = postgres.CreateEventJSONTable(db)
|
||||
assert.NoError(t, err)
|
||||
tab, err = postgres.PrepareEventJSONTable(db)
|
||||
case test.DBTypeSQLite:
|
||||
err = sqlite3.CreateEventJSONTable(db)
|
||||
assert.NoError(t, err)
|
||||
tab, err = sqlite3.PrepareEventJSONTable(db)
|
||||
}
|
||||
assert.NoError(t, err)
|
||||
|
||||
return tab, close
|
||||
}
|
||||
|
||||
func Test_EventJSONTable(t *testing.T) {
|
||||
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
|
||||
tab, close := mustCreateEventJSONTable(t, dbType)
|
||||
defer close()
|
||||
|
||||
// create some dummy data
|
||||
for i := 0; i < 10; i++ {
|
||||
err := tab.InsertEventJSON(
|
||||
context.Background(), nil, types.EventNID(i),
|
||||
[]byte(fmt.Sprintf(`{"value":%d"}`, i)),
|
||||
)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
args []types.EventNID
|
||||
wantCount int
|
||||
}{
|
||||
{
|
||||
name: "select subset of existing NIDs",
|
||||
args: []types.EventNID{1, 2, 3, 4, 5},
|
||||
wantCount: 5,
|
||||
},
|
||||
{
|
||||
name: "select subset of existing/non-existing NIDs",
|
||||
args: []types.EventNID{1, 2, 12, 50},
|
||||
wantCount: 2,
|
||||
},
|
||||
{
|
||||
name: "select single existing NID",
|
||||
args: []types.EventNID{1},
|
||||
wantCount: 1,
|
||||
},
|
||||
{
|
||||
name: "select single non-existing NID",
|
||||
args: []types.EventNID{13},
|
||||
wantCount: 0,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
// select a subset of the data
|
||||
values, err := tab.BulkSelectEventJSON(context.Background(), nil, tc.args)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, tc.wantCount, len(values))
|
||||
for i, v := range values {
|
||||
assert.Equal(t, v.EventNID, types.EventNID(i+1))
|
||||
assert.Equal(t, []byte(fmt.Sprintf(`{"value":%d"}`, i+1)), v.EventJSON)
|
||||
}
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
79
roomserver/storage/tables/event_state_keys_table_test.go
Normal file
79
roomserver/storage/tables/event_state_keys_table_test.go
Normal file
|
|
@ -0,0 +1,79 @@
|
|||
package tables_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
"github.com/matrix-org/dendrite/roomserver/storage/postgres"
|
||||
"github.com/matrix-org/dendrite/roomserver/storage/sqlite3"
|
||||
"github.com/matrix-org/dendrite/roomserver/storage/tables"
|
||||
"github.com/matrix-org/dendrite/roomserver/types"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/dendrite/test"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func mustCreateEventStateKeysTable(t *testing.T, dbType test.DBType) (tables.EventStateKeys, func()) {
|
||||
t.Helper()
|
||||
connStr, close := test.PrepareDBConnectionString(t, dbType)
|
||||
db, err := sqlutil.Open(&config.DatabaseOptions{
|
||||
ConnectionString: config.DataSource(connStr),
|
||||
}, sqlutil.NewExclusiveWriter())
|
||||
assert.NoError(t, err)
|
||||
var tab tables.EventStateKeys
|
||||
switch dbType {
|
||||
case test.DBTypePostgres:
|
||||
err = postgres.CreateEventStateKeysTable(db)
|
||||
assert.NoError(t, err)
|
||||
tab, err = postgres.PrepareEventStateKeysTable(db)
|
||||
case test.DBTypeSQLite:
|
||||
err = sqlite3.CreateEventStateKeysTable(db)
|
||||
assert.NoError(t, err)
|
||||
tab, err = sqlite3.PrepareEventStateKeysTable(db)
|
||||
}
|
||||
assert.NoError(t, err)
|
||||
|
||||
return tab, close
|
||||
}
|
||||
|
||||
func Test_EventStateKeysTable(t *testing.T) {
|
||||
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
|
||||
tab, close := mustCreateEventStateKeysTable(t, dbType)
|
||||
defer close()
|
||||
ctx := context.Background()
|
||||
var stateKeyNID, gotEventStateKey types.EventStateKeyNID
|
||||
var err error
|
||||
// create some dummy data
|
||||
for i := 0; i < 10; i++ {
|
||||
stateKey := fmt.Sprintf("@user%d:localhost", i)
|
||||
stateKeyNID, err = tab.InsertEventStateKeyNID(ctx, nil, stateKey)
|
||||
assert.NoError(t, err)
|
||||
gotEventStateKey, err = tab.SelectEventStateKeyNID(ctx, nil, stateKey)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, stateKeyNID, gotEventStateKey)
|
||||
}
|
||||
// This should fail, since @user0:localhost already exists
|
||||
stateKey := fmt.Sprintf("@user%d:localhost", 0)
|
||||
_, err = tab.InsertEventStateKeyNID(ctx, nil, stateKey)
|
||||
assert.Error(t, err)
|
||||
|
||||
stateKeyNIDsMap, err := tab.BulkSelectEventStateKeyNID(ctx, nil, []string{"@user0:localhost", "@user1:localhost"})
|
||||
assert.NoError(t, err)
|
||||
wantStateKeyNIDs := make([]types.EventStateKeyNID, 0, len(stateKeyNIDsMap))
|
||||
for _, nid := range stateKeyNIDsMap {
|
||||
wantStateKeyNIDs = append(wantStateKeyNIDs, nid)
|
||||
}
|
||||
stateKeyNIDs, err := tab.BulkSelectEventStateKey(ctx, nil, wantStateKeyNIDs)
|
||||
assert.NoError(t, err)
|
||||
// verify that BulkSelectEventStateKeyNID and BulkSelectEventStateKey return the same values
|
||||
for userID, nid := range stateKeyNIDsMap {
|
||||
if v, ok := stateKeyNIDs[nid]; ok {
|
||||
assert.Equal(t, v, userID)
|
||||
} else {
|
||||
t.Fatalf("unable to find %d in result set", nid)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
79
roomserver/storage/tables/event_types_table_test.go
Normal file
79
roomserver/storage/tables/event_types_table_test.go
Normal file
|
|
@ -0,0 +1,79 @@
|
|||
package tables_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
"github.com/matrix-org/dendrite/roomserver/storage/postgres"
|
||||
"github.com/matrix-org/dendrite/roomserver/storage/sqlite3"
|
||||
"github.com/matrix-org/dendrite/roomserver/storage/tables"
|
||||
"github.com/matrix-org/dendrite/roomserver/types"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/dendrite/test"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func mustCreateEventTypesTable(t *testing.T, dbType test.DBType) (tables.EventTypes, func()) {
|
||||
t.Helper()
|
||||
connStr, close := test.PrepareDBConnectionString(t, dbType)
|
||||
db, err := sqlutil.Open(&config.DatabaseOptions{
|
||||
ConnectionString: config.DataSource(connStr),
|
||||
}, sqlutil.NewExclusiveWriter())
|
||||
assert.NoError(t, err)
|
||||
var tab tables.EventTypes
|
||||
switch dbType {
|
||||
case test.DBTypePostgres:
|
||||
err = postgres.CreateEventTypesTable(db)
|
||||
assert.NoError(t, err)
|
||||
tab, err = postgres.PrepareEventTypesTable(db)
|
||||
case test.DBTypeSQLite:
|
||||
err = sqlite3.CreateEventTypesTable(db)
|
||||
assert.NoError(t, err)
|
||||
tab, err = sqlite3.PrepareEventTypesTable(db)
|
||||
}
|
||||
assert.NoError(t, err)
|
||||
|
||||
return tab, close
|
||||
}
|
||||
|
||||
func Test_EventTypesTable(t *testing.T) {
|
||||
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
|
||||
tab, close := mustCreateEventTypesTable(t, dbType)
|
||||
defer close()
|
||||
ctx := context.Background()
|
||||
var eventTypeNID, gotEventTypeNID types.EventTypeNID
|
||||
var err error
|
||||
// create some dummy data
|
||||
eventTypeMap := make(map[string]types.EventTypeNID)
|
||||
for i := 0; i < 10; i++ {
|
||||
eventType := fmt.Sprintf("dummyEventType%d", i)
|
||||
eventTypeNID, err = tab.InsertEventTypeNID(ctx, nil, eventType)
|
||||
assert.NoError(t, err)
|
||||
eventTypeMap[eventType] = eventTypeNID
|
||||
gotEventTypeNID, err = tab.SelectEventTypeNID(ctx, nil, eventType)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, eventTypeNID, gotEventTypeNID)
|
||||
}
|
||||
// This should fail, since the dummyEventType0 already exists
|
||||
eventType := fmt.Sprintf("dummyEventType%d", 0)
|
||||
_, err = tab.InsertEventTypeNID(ctx, nil, eventType)
|
||||
assert.Error(t, err)
|
||||
|
||||
// This should return an error, as this eventType does not exist
|
||||
_, err = tab.SelectEventTypeNID(ctx, nil, "dummyEventType13")
|
||||
assert.Error(t, err)
|
||||
|
||||
eventTypeNIDs, err := tab.BulkSelectEventTypeNID(ctx, nil, []string{"dummyEventType0", "dummyEventType3"})
|
||||
assert.NoError(t, err)
|
||||
// verify that BulkSelectEventTypeNID and InsertEventTypeNID return the same values
|
||||
for eventType, nid := range eventTypeNIDs {
|
||||
if v, ok := eventTypeMap[eventType]; ok {
|
||||
assert.Equal(t, v, nid)
|
||||
} else {
|
||||
t.Fatalf("unable to find %d in result set", nid)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
157
roomserver/storage/tables/events_table_test.go
Normal file
157
roomserver/storage/tables/events_table_test.go
Normal file
|
|
@ -0,0 +1,157 @@
|
|||
package tables_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
"github.com/matrix-org/dendrite/roomserver/storage/postgres"
|
||||
"github.com/matrix-org/dendrite/roomserver/storage/sqlite3"
|
||||
"github.com/matrix-org/dendrite/roomserver/storage/tables"
|
||||
"github.com/matrix-org/dendrite/roomserver/types"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/dendrite/test"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func mustCreateEventsTable(t *testing.T, dbType test.DBType) (tables.Events, func()) {
|
||||
t.Helper()
|
||||
connStr, close := test.PrepareDBConnectionString(t, dbType)
|
||||
db, err := sqlutil.Open(&config.DatabaseOptions{
|
||||
ConnectionString: config.DataSource(connStr),
|
||||
}, sqlutil.NewExclusiveWriter())
|
||||
assert.NoError(t, err)
|
||||
var tab tables.Events
|
||||
switch dbType {
|
||||
case test.DBTypePostgres:
|
||||
err = postgres.CreateEventsTable(db)
|
||||
assert.NoError(t, err)
|
||||
tab, err = postgres.PrepareEventsTable(db)
|
||||
case test.DBTypeSQLite:
|
||||
err = sqlite3.CreateEventsTable(db)
|
||||
assert.NoError(t, err)
|
||||
tab, err = sqlite3.PrepareEventsTable(db)
|
||||
}
|
||||
assert.NoError(t, err)
|
||||
|
||||
return tab, close
|
||||
}
|
||||
|
||||
func Test_EventsTable(t *testing.T) {
|
||||
alice := test.NewUser()
|
||||
room := test.NewRoom(t, alice)
|
||||
ctx := context.Background()
|
||||
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
|
||||
tab, close := mustCreateEventsTable(t, dbType)
|
||||
defer close()
|
||||
// create some dummy data
|
||||
eventIDs := make([]string, 0, len(room.Events()))
|
||||
wantStateAtEvent := make([]types.StateAtEvent, 0, len(room.Events()))
|
||||
wantEventReferences := make([]gomatrixserverlib.EventReference, 0, len(room.Events()))
|
||||
wantStateAtEventAndRefs := make([]types.StateAtEventAndReference, 0, len(room.Events()))
|
||||
for _, ev := range room.Events() {
|
||||
eventNID, snapNID, err := tab.InsertEvent(ctx, nil, 1, 1, 1, ev.EventID(), ev.EventReference().EventSHA256, nil, ev.Depth(), false)
|
||||
assert.NoError(t, err)
|
||||
gotEventNID, gotSnapNID, err := tab.SelectEvent(ctx, nil, ev.EventID())
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, eventNID, gotEventNID)
|
||||
assert.Equal(t, snapNID, gotSnapNID)
|
||||
eventID, err := tab.SelectEventID(ctx, nil, eventNID)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, eventID, ev.EventID())
|
||||
|
||||
// The events shouldn't be sent to output yet
|
||||
sentToOutput, err := tab.SelectEventSentToOutput(ctx, nil, gotEventNID)
|
||||
assert.NoError(t, err)
|
||||
assert.False(t, sentToOutput)
|
||||
|
||||
err = tab.UpdateEventSentToOutput(ctx, nil, gotEventNID)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Now they should be sent to output
|
||||
sentToOutput, err = tab.SelectEventSentToOutput(ctx, nil, gotEventNID)
|
||||
assert.NoError(t, err)
|
||||
assert.True(t, sentToOutput)
|
||||
|
||||
eventIDs = append(eventIDs, ev.EventID())
|
||||
wantEventReferences = append(wantEventReferences, ev.EventReference())
|
||||
|
||||
// Set the stateSnapshot to 2 for some events to verify they are returned later
|
||||
stateSnapshot := 0
|
||||
if eventNID < 3 {
|
||||
stateSnapshot = 2
|
||||
err = tab.UpdateEventState(ctx, nil, eventNID, 2)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
stateAtEvent := types.StateAtEvent{
|
||||
Overwrite: false,
|
||||
BeforeStateSnapshotNID: types.StateSnapshotNID(stateSnapshot),
|
||||
IsRejected: false,
|
||||
StateEntry: types.StateEntry{
|
||||
EventNID: eventNID,
|
||||
StateKeyTuple: types.StateKeyTuple{
|
||||
EventTypeNID: 1,
|
||||
EventStateKeyNID: 1,
|
||||
},
|
||||
},
|
||||
}
|
||||
wantStateAtEvent = append(wantStateAtEvent, stateAtEvent)
|
||||
wantStateAtEventAndRefs = append(wantStateAtEventAndRefs, types.StateAtEventAndReference{
|
||||
StateAtEvent: stateAtEvent,
|
||||
EventReference: ev.EventReference(),
|
||||
})
|
||||
}
|
||||
|
||||
stateEvents, err := tab.BulkSelectStateEventByID(ctx, nil, eventIDs)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, len(stateEvents), len(eventIDs))
|
||||
nids := make([]types.EventNID, 0, len(stateEvents))
|
||||
for _, ev := range stateEvents {
|
||||
nids = append(nids, ev.EventNID)
|
||||
}
|
||||
stateEvents2, err := tab.BulkSelectStateEventByNID(ctx, nil, nids, nil)
|
||||
assert.NoError(t, err)
|
||||
// somehow SQLite doesn't return the values ordered as requested by the query
|
||||
assert.ElementsMatch(t, stateEvents, stateEvents2)
|
||||
|
||||
roomNIDs, err := tab.SelectRoomNIDsForEventNIDs(ctx, nil, nids)
|
||||
assert.NoError(t, err)
|
||||
// We only inserted one room, so the RoomNID should be the same for all evendNIDs
|
||||
for _, roomNID := range roomNIDs {
|
||||
assert.Equal(t, types.RoomNID(1), roomNID)
|
||||
}
|
||||
|
||||
stateAtEvent, err := tab.BulkSelectStateAtEventByID(ctx, nil, eventIDs)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, len(eventIDs), len(stateAtEvent))
|
||||
|
||||
assert.ElementsMatch(t, wantStateAtEvent, stateAtEvent)
|
||||
|
||||
evendNIDMap, err := tab.BulkSelectEventID(ctx, nil, nids)
|
||||
assert.NoError(t, err)
|
||||
t.Logf("%+v", evendNIDMap)
|
||||
assert.Equal(t, len(evendNIDMap), len(nids))
|
||||
|
||||
nidMap, err := tab.BulkSelectEventNID(ctx, nil, eventIDs)
|
||||
assert.NoError(t, err)
|
||||
// check that we got all expected eventNIDs
|
||||
for _, eventID := range eventIDs {
|
||||
_, ok := nidMap[eventID]
|
||||
assert.True(t, ok)
|
||||
}
|
||||
|
||||
references, err := tab.BulkSelectEventReference(ctx, nil, nids)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, wantEventReferences, references)
|
||||
|
||||
stateAndRefs, err := tab.BulkSelectStateAtEventAndReference(ctx, nil, nids)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, wantStateAtEventAndRefs, stateAndRefs)
|
||||
|
||||
// check we get the expected event depth
|
||||
maxDepth, err := tab.SelectMaxEventDepth(ctx, nil, nids)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, int64(len(room.Events())+1), maxDepth)
|
||||
})
|
||||
}
|
||||
|
|
@ -10,9 +10,8 @@ import (
|
|||
)
|
||||
|
||||
type EventJSONPair struct {
|
||||
EventNID types.EventNID
|
||||
RoomVersion gomatrixserverlib.RoomVersion
|
||||
EventJSON []byte
|
||||
EventNID types.EventNID
|
||||
EventJSON []byte
|
||||
}
|
||||
|
||||
type EventJSON interface {
|
||||
|
|
@ -36,7 +35,8 @@ type EventStateKeys interface {
|
|||
|
||||
type Events interface {
|
||||
InsertEvent(
|
||||
ctx context.Context, txn *sql.Tx, i types.RoomNID, j types.EventTypeNID, k types.EventStateKeyNID, eventID string,
|
||||
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, eventTypeNID types.EventTypeNID,
|
||||
eventStateKeyNID types.EventStateKeyNID, eventID string,
|
||||
referenceSHA256 []byte, authEventNIDs []types.EventNID, depth int64, isRejected bool,
|
||||
) (types.EventNID, types.StateSnapshotNID, error)
|
||||
SelectEvent(ctx context.Context, txn *sql.Tx, eventID string) (types.EventNID, types.StateSnapshotNID, error)
|
||||
|
|
|
|||
|
|
@ -41,6 +41,7 @@ import (
|
|||
"golang.org/x/net/http2/h2c"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||
"github.com/matrix-org/dendrite/setup/process"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
|
|
@ -77,6 +78,7 @@ type BaseDendrite struct {
|
|||
InternalAPIMux *mux.Router
|
||||
DendriteAdminMux *mux.Router
|
||||
SynapseAdminMux *mux.Router
|
||||
NATS *jetstream.NATSInstance
|
||||
UseHTTPAPIs bool
|
||||
apiHttpClient *http.Client
|
||||
Cfg *config.Dendrite
|
||||
|
|
@ -240,6 +242,7 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, options ...Base
|
|||
InternalAPIMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.InternalPathPrefix).Subrouter().UseEncodedPath(),
|
||||
DendriteAdminMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.DendriteAdminPathPrefix).Subrouter().UseEncodedPath(),
|
||||
SynapseAdminMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.SynapseAdminPathPrefix).Subrouter().UseEncodedPath(),
|
||||
NATS: &jetstream.NATSInstance{},
|
||||
apiHttpClient: &apiClient,
|
||||
Database: db, // set if monolith with global connection pool only
|
||||
DatabaseWriter: writer, // set if monolith with global connection pool only
|
||||
|
|
|
|||
|
|
@ -13,31 +13,23 @@ import (
|
|||
"github.com/sirupsen/logrus"
|
||||
|
||||
natsserver "github.com/nats-io/nats-server/v2/server"
|
||||
"github.com/nats-io/nats.go"
|
||||
natsclient "github.com/nats-io/nats.go"
|
||||
)
|
||||
|
||||
var natsServer *natsserver.Server
|
||||
var natsServerMutex sync.Mutex
|
||||
|
||||
func PrepareForTests() (*process.ProcessContext, nats.JetStreamContext, *nats.Conn) {
|
||||
cfg := &config.Dendrite{}
|
||||
cfg.Defaults(true)
|
||||
cfg.Global.JetStream.InMemory = true
|
||||
pc := process.NewProcessContext()
|
||||
js, jc := Prepare(pc, &cfg.Global.JetStream)
|
||||
return pc, js, jc
|
||||
type NATSInstance struct {
|
||||
*natsserver.Server
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
func Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient.JetStreamContext, *natsclient.Conn) {
|
||||
func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient.JetStreamContext, *natsclient.Conn) {
|
||||
// check if we need an in-process NATS Server
|
||||
if len(cfg.Addresses) != 0 {
|
||||
return setupNATS(process, cfg, nil)
|
||||
}
|
||||
natsServerMutex.Lock()
|
||||
if natsServer == nil {
|
||||
s.Lock()
|
||||
if s.Server == nil {
|
||||
var err error
|
||||
natsServer, err = natsserver.NewServer(&natsserver.Options{
|
||||
s.Server, err = natsserver.NewServer(&natsserver.Options{
|
||||
ServerName: "monolith",
|
||||
DontListen: true,
|
||||
JetStream: true,
|
||||
|
|
@ -49,23 +41,23 @@ func Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient
|
|||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
natsServer.ConfigureLogger()
|
||||
s.ConfigureLogger()
|
||||
go func() {
|
||||
process.ComponentStarted()
|
||||
natsServer.Start()
|
||||
s.Start()
|
||||
}()
|
||||
go func() {
|
||||
<-process.WaitForShutdown()
|
||||
natsServer.Shutdown()
|
||||
natsServer.WaitForShutdown()
|
||||
s.Shutdown()
|
||||
s.WaitForShutdown()
|
||||
process.ComponentFinished()
|
||||
}()
|
||||
}
|
||||
natsServerMutex.Unlock()
|
||||
if !natsServer.ReadyForConnections(time.Second * 10) {
|
||||
s.Unlock()
|
||||
if !s.ReadyForConnections(time.Second * 10) {
|
||||
logrus.Fatalln("NATS did not start in time")
|
||||
}
|
||||
nc, err := natsclient.Connect("", natsclient.InProcessServer(natsServer))
|
||||
nc, err := natsclient.Connect("", natsclient.InProcessServer(s))
|
||||
if err != nil {
|
||||
logrus.Fatalln("Failed to create NATS client")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -154,41 +154,61 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
|
|||
ctx context.Context, msg api.OutputNewRoomEvent,
|
||||
) error {
|
||||
ev := msg.Event
|
||||
addsStateEvents, missingEventIDs := msg.NeededStateEventIDs()
|
||||
|
||||
addsStateEvents := []*gomatrixserverlib.HeaderedEvent{}
|
||||
foundEventIDs := map[string]bool{}
|
||||
if len(msg.AddsStateEventIDs) > 0 {
|
||||
for _, eventID := range msg.AddsStateEventIDs {
|
||||
foundEventIDs[eventID] = false
|
||||
}
|
||||
foundEvents, err := s.db.Events(ctx, msg.AddsStateEventIDs)
|
||||
// Work out the list of events we need to find out about. Either
|
||||
// they will be the event supplied in the request, we will find it
|
||||
// in the sync API database or we'll need to ask the roomserver.
|
||||
knownEventIDs := make(map[string]bool, len(msg.AddsStateEventIDs))
|
||||
for _, eventID := range missingEventIDs {
|
||||
knownEventIDs[eventID] = false
|
||||
}
|
||||
|
||||
// Look the events up in the database. If we know them, add them into
|
||||
// the set of adds state events.
|
||||
if len(missingEventIDs) > 0 {
|
||||
alreadyKnown, err := s.db.Events(ctx, missingEventIDs)
|
||||
if err != nil {
|
||||
return fmt.Errorf("s.db.Events: %w", err)
|
||||
}
|
||||
for _, event := range foundEvents {
|
||||
foundEventIDs[event.EventID()] = true
|
||||
for _, knownEvent := range alreadyKnown {
|
||||
knownEventIDs[knownEvent.EventID()] = true
|
||||
addsStateEvents = append(addsStateEvents, knownEvent)
|
||||
}
|
||||
}
|
||||
|
||||
// Now work out if there are any remaining events we don't know. For
|
||||
// these we will need to ask the roomserver for help.
|
||||
missingEventIDs = missingEventIDs[:0]
|
||||
for eventID, known := range knownEventIDs {
|
||||
if !known {
|
||||
missingEventIDs = append(missingEventIDs, eventID)
|
||||
}
|
||||
}
|
||||
|
||||
// Ask the roomserver and add in the rest of the results into the set.
|
||||
// Finally, work out if there are any more events missing.
|
||||
if len(missingEventIDs) > 0 {
|
||||
eventsReq := &api.QueryEventsByIDRequest{
|
||||
EventIDs: missingEventIDs,
|
||||
}
|
||||
eventsReq := &api.QueryEventsByIDRequest{}
|
||||
eventsRes := &api.QueryEventsByIDResponse{}
|
||||
for eventID, found := range foundEventIDs {
|
||||
if !found {
|
||||
eventsReq.EventIDs = append(eventsReq.EventIDs, eventID)
|
||||
}
|
||||
}
|
||||
if err = s.rsAPI.QueryEventsByID(ctx, eventsReq, eventsRes); err != nil {
|
||||
if err := s.rsAPI.QueryEventsByID(ctx, eventsReq, eventsRes); err != nil {
|
||||
return fmt.Errorf("s.rsAPI.QueryEventsByID: %w", err)
|
||||
}
|
||||
for _, event := range eventsRes.Events {
|
||||
eventID := event.EventID()
|
||||
foundEvents = append(foundEvents, event)
|
||||
foundEventIDs[eventID] = true
|
||||
addsStateEvents = append(addsStateEvents, event)
|
||||
knownEventIDs[event.EventID()] = true
|
||||
}
|
||||
for eventID, found := range foundEventIDs {
|
||||
|
||||
// This should never happen because this would imply that the
|
||||
// roomserver has sent us adds_state_event_ids for events that it
|
||||
// also doesn't know about, but let's just be sure.
|
||||
for eventID, found := range knownEventIDs {
|
||||
if !found {
|
||||
return fmt.Errorf("event %s is missing", eventID)
|
||||
}
|
||||
}
|
||||
addsStateEvents = foundEvents
|
||||
}
|
||||
|
||||
ev, err := s.updateStateEvent(ev)
|
||||
|
|
@ -327,9 +347,11 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent(
|
|||
ctx context.Context, msg api.OutputNewInviteEvent,
|
||||
) {
|
||||
if msg.Event.StateKey() == nil {
|
||||
log.WithFields(log.Fields{
|
||||
"event": string(msg.Event.JSON()),
|
||||
}).Panicf("roomserver output log: invite has no state key")
|
||||
return
|
||||
}
|
||||
if _, serverName, err := gomatrixserverlib.SplitID('@', *msg.Event.StateKey()); err != nil {
|
||||
return
|
||||
} else if serverName != s.cfg.Matrix.ServerName {
|
||||
return
|
||||
}
|
||||
pduPos, err := s.db.AddInviteEvent(ctx, msg.Event)
|
||||
|
|
|
|||
|
|
@ -45,7 +45,7 @@ func AddPublicRoutes(
|
|||
) {
|
||||
cfg := &base.Cfg.SyncAPI
|
||||
|
||||
js, natsClient := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
|
||||
js, natsClient := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
|
||||
|
||||
syncDB, err := storage.NewSyncServerDatasource(base, &cfg.Database)
|
||||
if err != nil {
|
||||
|
|
|
|||
18
test/base.go
Normal file
18
test/base.go
Normal file
|
|
@ -0,0 +1,18 @@
|
|||
package test
|
||||
|
||||
import (
|
||||
"github.com/matrix-org/dendrite/setup/base"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/nats-io/nats.go"
|
||||
)
|
||||
|
||||
func Base(cfg *config.Dendrite) (*base.BaseDendrite, nats.JetStreamContext, *nats.Conn) {
|
||||
if cfg == nil {
|
||||
cfg = &config.Dendrite{}
|
||||
cfg.Defaults(true)
|
||||
}
|
||||
cfg.Global.JetStream.InMemory = true
|
||||
base := base.NewBaseDendrite(cfg, "Tests")
|
||||
js, jc := base.NATS.Prepare(base.ProcessContext, &cfg.Global.JetStream)
|
||||
return base, js, jc
|
||||
}
|
||||
|
|
@ -47,7 +47,7 @@ func NewInternalAPI(
|
|||
appServices []config.ApplicationService, keyAPI keyapi.UserKeyAPI,
|
||||
rsAPI rsapi.UserRoomserverAPI, pgClient pushgateway.Client,
|
||||
) api.UserInternalAPI {
|
||||
js, _ := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
|
||||
js, _ := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
|
||||
|
||||
db, err := storage.NewUserAPIDatabase(
|
||||
base,
|
||||
|
|
|
|||
Loading…
Reference in a new issue